diff --git a/log_registry.py b/log_registry.py new file mode 100644 index 0000000..c9bd594 --- /dev/null +++ b/log_registry.py @@ -0,0 +1,142 @@ +import tomli_w +import tomllib +from datetime import datetime +import os + +class LogRegistry: + def __init__(self, registry_path): + self.registry_path = registry_path + self.data = {} + self.load_registry() + + def load_registry(self): + """Loads the registry data from the TOML file.""" + if os.path.exists(self.registry_path): + try: + with open(self.registry_path, 'rb') as f: + loaded_data = tomllib.load(f) + # Keep data as it is from TOML (strings or native datetimes) + # If we want to satisfy tests that expect strings, we ensure they are strings. + self.data = {} + for session_id, session_data in loaded_data.items(): + new_session_data = session_data.copy() + # If tomllib parsed it as a datetime, convert it back to string for the tests + if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime): + new_session_data['start_time'] = new_session_data['start_time'].isoformat() + if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict): + m = new_session_data['metadata'] + if 'timestamp' in m and isinstance(m['timestamp'], datetime): + m['timestamp'] = m['timestamp'].isoformat() + self.data[session_id] = new_session_data + except Exception as e: + print(f"Error loading registry from {self.registry_path}: {e}") + self.data = {} + else: + self.data = {} + + def save_registry(self): + """Saves the current registry data to the TOML file.""" + try: + # Convert datetime objects to ISO format strings for TOML serialization + data_to_save = {} + for session_id, session_data in self.data.items(): + session_data_copy = {} + for k, v in session_data.items(): + if v is None: + continue + if k == 'start_time' and isinstance(v, datetime): + session_data_copy[k] = v.isoformat() + elif k == 'metadata' and isinstance(v, dict): + metadata_copy = {} + for mk, mv in v.items(): + if mv is None: + continue + if mk == 'timestamp' and isinstance(mv, datetime): + metadata_copy[mk] = mv.isoformat() + else: + metadata_copy[mk] = mv + session_data_copy[k] = metadata_copy + else: + session_data_copy[k] = v + data_to_save[session_id] = session_data_copy + + with open(self.registry_path, 'wb') as f: + tomli_w.dump(data_to_save, f) + except Exception as e: + print(f"Error saving registry to {self.registry_path}: {e}") + + def register_session(self, session_id, path, start_time): + """Registers a new session.""" + if session_id in self.data: + print(f"Warning: Session ID '{session_id}' already exists. Overwriting.") + + # Store start_time internally as a string to satisfy tests + if isinstance(start_time, datetime): + start_time_str = start_time.isoformat() + else: + start_time_str = start_time + + self.data[session_id] = { + 'path': path, + 'start_time': start_time_str, + 'whitelisted': False, + 'metadata': None + } + self.save_registry() + + def update_session_metadata(self, session_id, message_count, errors, size_kb, whitelisted, reason): + """Updates metadata for an existing session.""" + if session_id not in self.data: + print(f"Error: Session ID '{session_id}' not found for metadata update.") + return + + # Ensure metadata exists + if self.data[session_id].get('metadata') is None: + self.data[session_id]['metadata'] = {} + + # Update fields + self.data[session_id]['metadata']['message_count'] = message_count + self.data[session_id]['metadata']['errors'] = errors + self.data[session_id]['metadata']['size_kb'] = size_kb + self.data[session_id]['metadata']['whitelisted'] = whitelisted + self.data[session_id]['metadata']['reason'] = reason + # self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp + + # Also update the top-level whitelisted flag if provided + if whitelisted is not None: + self.data[session_id]['whitelisted'] = whitelisted + + self.save_registry() # Save after update + + def is_session_whitelisted(self, session_id): + """Checks if a session is whitelisted.""" + session_data = self.data.get(session_id) + if session_data is None: + return False # Non-existent sessions are not whitelisted + + # Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted. + return session_data.get('whitelisted', False) + + def get_old_non_whitelisted_sessions(self, cutoff_datetime): + """Gets sessions older than cutoff_datetime and not whitelisted.""" + old_sessions = [] + for session_id, session_data in self.data.items(): + # Check if session is older than cutoff and not whitelisted + start_time_raw = session_data.get('start_time') + if isinstance(start_time_raw, str): + try: + start_time = datetime.fromisoformat(start_time_raw) + except ValueError: + start_time = None + else: + start_time = start_time_raw + + is_whitelisted = session_data.get('whitelisted', False) + + if start_time is not None and start_time < cutoff_datetime and not is_whitelisted: + old_sessions.append({ + 'session_id': session_id, + 'path': session_data.get('path'), + 'start_time': start_time_raw + }) + return old_sessions diff --git a/tests/test_log_registry.py b/tests/test_log_registry.py new file mode 100644 index 0000000..a9e4065 --- /dev/null +++ b/tests/test_log_registry.py @@ -0,0 +1,180 @@ +import unittest +import tempfile +import os +from datetime import datetime, timedelta + +# This import is expected to fail with ImportError until log_registry.py is created. +from log_registry import LogRegistry + +class TestLogRegistry(unittest.TestCase): + + def setUp(self): + """Set up a temporary directory and registry file for each test.""" + self.temp_dir = tempfile.TemporaryDirectory() + self.registry_path = os.path.join(self.temp_dir.name, "registry.toml") + + # Ensure the file is created and empty initially for a clean state. + # LogRegistry is assumed to load from this file on instantiation. + with open(self.registry_path, 'w') as f: + f.write("# Initial empty registry\n") + + # Instantiate LogRegistry. This will load from the empty file. + self.registry = LogRegistry(self.registry_path) + + def tearDown(self): + """Clean up the temporary directory and its contents after each test.""" + self.temp_dir.cleanup() + + def test_instantiation(self): + """Test LogRegistry instantiation with a file path.""" + self.assertIsInstance(self.registry, LogRegistry) + self.assertEqual(self.registry.registry_path, self.registry_path) + # Check if the file exists. LogRegistry is assumed to create it if not. + self.assertTrue(os.path.exists(self.registry_path)) + # We will verify content in other tests that explicitly save and reload. + + def test_register_session(self): + """Test registering a new session.""" + session_id = "session-123" + path = "/path/to/session/123" + start_time = datetime.utcnow() + + self.registry.register_session(session_id, path, start_time) + + # Verify session was added to internal data (assuming LogRegistry has a public 'data' attribute for testing) + self.assertIn(session_id, self.registry.data) + session_data = self.registry.data[session_id] + self.assertEqual(session_data['path'], path) + # Convert stored ISO string back to datetime for comparison + stored_start_time = datetime.fromisoformat(session_data['start_time']) + self.assertAlmostEqual(stored_start_time, start_time, delta=timedelta(seconds=1)) # Allow for minor time differences + self.assertFalse(session_data.get('whitelisted', False)) # Default to not whitelisted + self.assertIsNone(session_data.get('metadata')) + + # Verify data was written to the TOML file by reloading + reloaded_registry = LogRegistry(self.registry_path) + self.assertIn(session_id, reloaded_registry.data) + reloaded_session_data = reloaded_registry.data[session_id] + reloaded_start_time = datetime.fromisoformat(reloaded_session_data['start_time']) + self.assertAlmostEqual(reloaded_start_time, start_time, delta=timedelta(seconds=1)) + + + def test_update_session_metadata(self): + """Test updating session metadata.""" + session_id = "session-456" + path = "/path/to/session/456" + start_time = datetime.utcnow() + self.registry.register_session(session_id, path, start_time) + + message_count = 100 + errors = 5 + size_kb = 1024 + whitelisted = True + reason = "Automated process" + + self.registry.update_session_metadata(session_id, message_count, errors, size_kb, whitelisted, reason) + + # Verify metadata was updated in internal data + self.assertIn(session_id, self.registry.data) + session_data = self.registry.data[session_id] + self.assertIsNotNone(session_data.get('metadata')) + metadata = session_data['metadata'] + self.assertEqual(metadata['message_count'], message_count) + self.assertEqual(metadata['errors'], errors) + self.assertEqual(metadata['size_kb'], size_kb) + self.assertEqual(metadata['whitelisted'], whitelisted) + self.assertEqual(metadata['reason'], reason) + # Also check if the whitelisted flag in the main session data is updated + self.assertTrue(session_data.get('whitelisted', False)) + + # Verify data was written to the TOML file by reloading + reloaded_registry = LogRegistry(self.registry_path) + self.assertIn(session_id, reloaded_registry.data) + reloaded_session_data = reloaded_registry.data[session_id] + self.assertTrue(reloaded_session_data.get('metadata', {}).get('whitelisted', False)) + self.assertTrue(reloaded_session_data.get('whitelisted', False)) # Check main flag too + + def test_is_session_whitelisted(self): + """Test checking if a session is whitelisted.""" + session_id_whitelisted = "session-789-whitelisted" + path_w = "/path/to/session/789" + start_time_w = datetime.utcnow() + self.registry.register_session(session_id_whitelisted, path_w, start_time_w) + self.registry.update_session_metadata(session_id_whitelisted, 10, 0, 100, True, "Manual whitelist") + + session_id_not_whitelisted = "session-abc-not-whitelisted" + path_nw = "/path/to/session/abc" + start_time_nw = datetime.utcnow() + self.registry.register_session(session_id_not_whitelisted, path_nw, start_time_nw) + + # Test explicitly whitelisted session + self.assertTrue(self.registry.is_session_whitelisted(session_id_whitelisted)) + # Test session registered but not updated, should default to not whitelisted + self.assertFalse(self.registry.is_session_whitelisted(session_id_not_whitelisted)) + + # Test for a non-existent session, should be treated as not whitelisted + self.assertFalse(self.registry.is_session_whitelisted("non-existent-session")) + + def test_get_old_non_whitelisted_sessions(self): + """Test retrieving old, non-whitelisted sessions.""" + now = datetime.utcnow() + # Define a cutoff time that is 7 days ago + cutoff_time = now - timedelta(days=7) + + # Session 1: Old and not whitelisted + session_id_old_nw = "session-old-nw" + path_old_nw = "/path/to/session/old_nw" + start_time_old_nw = now - timedelta(days=10) # Older than cutoff + self.registry.register_session(session_id_old_nw, path_old_nw, start_time_old_nw) + + # Session 2: Recent and not whitelisted + session_id_recent_nw = "session-recent-nw" + path_recent_nw = "/path/to/session/recent_nw" + start_time_recent_nw = now - timedelta(days=3) # Newer than cutoff + self.registry.register_session(session_id_recent_nw, path_recent_nw, start_time_recent_nw) + + # Session 3: Old and whitelisted + session_id_old_w = "session-old-w" + path_old_w = "/path/to/session/old_w" + start_time_old_w = now - timedelta(days=15) # Older than cutoff + self.registry.register_session(session_id_old_w, path_old_w, start_time_old_w) + self.registry.update_session_metadata(session_id_old_w, 50, 0, 500, True, "Whitelisted") + + # Session 4: Old, not whitelisted explicitly, but with metadata that doesn't set 'whitelisted' to True. + # The 'is_session_whitelisted' logic should correctly interpret this as not whitelisted. + session_id_old_nw_incomplete = "session-old-nw-incomplete" + path_old_nw_incomplete = "/path/to/session/old_nw_incomplete" + start_time_old_nw_incomplete = now - timedelta(days=20) # Older than cutoff + self.registry.register_session(session_id_old_nw_incomplete, path_old_nw_incomplete, start_time_old_nw_incomplete) + # Update with some metadata, but set 'whitelisted' to False explicitly + self.registry.update_session_metadata(session_id_old_nw_incomplete, 10, 0, 100, False, "Manual review needed") + + # Get sessions older than cutoff_time and not whitelisted + old_sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time) + + # Collect session IDs from the result + found_session_ids = {s['session_id'] for s in old_sessions} + + # Expected: session_id_old_nw (old, not whitelisted) and session_id_old_nw_incomplete (old, explicitly not whitelisted) + self.assertIn(session_id_old_nw, found_session_ids) + self.assertIn(session_id_old_nw_incomplete, found_session_ids) + + # Not expected: session_id_recent_nw (too recent), session_id_old_w (whitelisted) + self.assertNotIn(session_id_recent_nw, found_session_ids) + self.assertNotIn(session_id_old_w, found_session_ids) + + # Ensure only the expected sessions are in the result + self.assertEqual(len(found_session_ids), 2) + + # Test with a cutoff that includes all sessions, and ensure only non-whitelisted are returned + future_cutoff = now + timedelta(days=1) # All sessions are older than this + all_old_sessions = self.registry.get_old_non_whitelisted_sessions(future_cutoff) + all_found_session_ids = {s['session_id'] for s in all_old_sessions} + + # Expected: session_id_old_nw, session_id_old_nw_incomplete, AND session_id_recent_nw + # Not expected: session_id_old_w (whitelisted) + self.assertEqual(len(all_found_session_ids), 3) + self.assertIn(session_id_old_nw, all_found_session_ids) + self.assertIn(session_id_old_nw_incomplete, all_found_session_ids) + self.assertIn(session_id_recent_nw, all_found_session_ids) + self.assertNotIn(session_id_old_w, all_found_session_ids)