import unittest import tempfile import os from datetime import datetime, timedelta # This import is expected to fail with ImportError until log_registry.py is created. from log_registry import LogRegistry class TestLogRegistry(unittest.TestCase): def setUp(self): """Set up a temporary directory and registry file for each test.""" self.temp_dir = tempfile.TemporaryDirectory() self.registry_path = os.path.join(self.temp_dir.name, "registry.toml") # Ensure the file is created and empty initially for a clean state. # LogRegistry is assumed to load from this file on instantiation. with open(self.registry_path, 'w') as f: f.write("# Initial empty registry\n") # Instantiate LogRegistry. This will load from the empty file. self.registry = LogRegistry(self.registry_path) def tearDown(self): """Clean up the temporary directory and its contents after each test.""" self.temp_dir.cleanup() def test_instantiation(self): """Test LogRegistry instantiation with a file path.""" self.assertIsInstance(self.registry, LogRegistry) self.assertEqual(self.registry.registry_path, self.registry_path) # Check if the file exists. LogRegistry is assumed to create it if not. self.assertTrue(os.path.exists(self.registry_path)) # We will verify content in other tests that explicitly save and reload. def test_register_session(self): """Test registering a new session.""" session_id = "session-123" path = "/path/to/session/123" start_time = datetime.utcnow() self.registry.register_session(session_id, path, start_time) # Verify session was added to internal data (assuming LogRegistry has a public 'data' attribute for testing) self.assertIn(session_id, self.registry.data) session_data = self.registry.data[session_id] self.assertEqual(session_data['path'], path) # Convert stored ISO string back to datetime for comparison stored_start_time = datetime.fromisoformat(session_data['start_time']) self.assertAlmostEqual(stored_start_time, start_time, delta=timedelta(seconds=1)) # Allow for minor time differences self.assertFalse(session_data.get('whitelisted', False)) # Default to not whitelisted self.assertIsNone(session_data.get('metadata')) # Verify data was written to the TOML file by reloading reloaded_registry = LogRegistry(self.registry_path) self.assertIn(session_id, reloaded_registry.data) reloaded_session_data = reloaded_registry.data[session_id] reloaded_start_time = datetime.fromisoformat(reloaded_session_data['start_time']) self.assertAlmostEqual(reloaded_start_time, start_time, delta=timedelta(seconds=1)) def test_update_session_metadata(self): """Test updating session metadata.""" session_id = "session-456" path = "/path/to/session/456" start_time = datetime.utcnow() self.registry.register_session(session_id, path, start_time) message_count = 100 errors = 5 size_kb = 1024 whitelisted = True reason = "Automated process" self.registry.update_session_metadata(session_id, message_count, errors, size_kb, whitelisted, reason) # Verify metadata was updated in internal data self.assertIn(session_id, self.registry.data) session_data = self.registry.data[session_id] self.assertIsNotNone(session_data.get('metadata')) metadata = session_data['metadata'] self.assertEqual(metadata['message_count'], message_count) self.assertEqual(metadata['errors'], errors) self.assertEqual(metadata['size_kb'], size_kb) self.assertEqual(metadata['whitelisted'], whitelisted) self.assertEqual(metadata['reason'], reason) # Also check if the whitelisted flag in the main session data is updated self.assertTrue(session_data.get('whitelisted', False)) # Verify data was written to the TOML file by reloading reloaded_registry = LogRegistry(self.registry_path) self.assertIn(session_id, reloaded_registry.data) reloaded_session_data = reloaded_registry.data[session_id] self.assertTrue(reloaded_session_data.get('metadata', {}).get('whitelisted', False)) self.assertTrue(reloaded_session_data.get('whitelisted', False)) # Check main flag too def test_is_session_whitelisted(self): """Test checking if a session is whitelisted.""" session_id_whitelisted = "session-789-whitelisted" path_w = "/path/to/session/789" start_time_w = datetime.utcnow() self.registry.register_session(session_id_whitelisted, path_w, start_time_w) self.registry.update_session_metadata(session_id_whitelisted, 10, 0, 100, True, "Manual whitelist") session_id_not_whitelisted = "session-abc-not-whitelisted" path_nw = "/path/to/session/abc" start_time_nw = datetime.utcnow() self.registry.register_session(session_id_not_whitelisted, path_nw, start_time_nw) # Test explicitly whitelisted session self.assertTrue(self.registry.is_session_whitelisted(session_id_whitelisted)) # Test session registered but not updated, should default to not whitelisted self.assertFalse(self.registry.is_session_whitelisted(session_id_not_whitelisted)) # Test for a non-existent session, should be treated as not whitelisted self.assertFalse(self.registry.is_session_whitelisted("non-existent-session")) def test_get_old_non_whitelisted_sessions(self): """Test retrieving old, non-whitelisted sessions.""" now = datetime.utcnow() # Define a cutoff time that is 7 days ago cutoff_time = now - timedelta(days=7) # Session 1: Old and not whitelisted session_id_old_nw = "session-old-nw" path_old_nw = "/path/to/session/old_nw" start_time_old_nw = now - timedelta(days=10) # Older than cutoff self.registry.register_session(session_id_old_nw, path_old_nw, start_time_old_nw) # Session 2: Recent and not whitelisted session_id_recent_nw = "session-recent-nw" path_recent_nw = "/path/to/session/recent_nw" start_time_recent_nw = now - timedelta(days=3) # Newer than cutoff self.registry.register_session(session_id_recent_nw, path_recent_nw, start_time_recent_nw) # Session 3: Old and whitelisted session_id_old_w = "session-old-w" path_old_w = "/path/to/session/old_w" start_time_old_w = now - timedelta(days=15) # Older than cutoff self.registry.register_session(session_id_old_w, path_old_w, start_time_old_w) self.registry.update_session_metadata(session_id_old_w, 50, 0, 500, True, "Whitelisted") # Session 4: Old, not whitelisted explicitly, but with metadata that doesn't set 'whitelisted' to True. # The 'is_session_whitelisted' logic should correctly interpret this as not whitelisted. session_id_old_nw_incomplete = "session-old-nw-incomplete" path_old_nw_incomplete = "/path/to/session/old_nw_incomplete" start_time_old_nw_incomplete = now - timedelta(days=20) # Older than cutoff self.registry.register_session(session_id_old_nw_incomplete, path_old_nw_incomplete, start_time_old_nw_incomplete) # Update with some metadata, but set 'whitelisted' to False explicitly self.registry.update_session_metadata(session_id_old_nw_incomplete, 10, 0, 100, False, "Manual review needed") # Get sessions older than cutoff_time and not whitelisted old_sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time) # Collect session IDs from the result found_session_ids = {s['session_id'] for s in old_sessions} # Expected: session_id_old_nw (old, not whitelisted) and session_id_old_nw_incomplete (old, explicitly not whitelisted) self.assertIn(session_id_old_nw, found_session_ids) self.assertIn(session_id_old_nw_incomplete, found_session_ids) # Not expected: session_id_recent_nw (too recent), session_id_old_w (whitelisted) self.assertNotIn(session_id_recent_nw, found_session_ids) self.assertNotIn(session_id_old_w, found_session_ids) # Ensure only the expected sessions are in the result self.assertEqual(len(found_session_ids), 2) # Test with a cutoff that includes all sessions, and ensure only non-whitelisted are returned future_cutoff = now + timedelta(days=1) # All sessions are older than this all_old_sessions = self.registry.get_old_non_whitelisted_sessions(future_cutoff) all_found_session_ids = {s['session_id'] for s in all_old_sessions} # Expected: session_id_old_nw, session_id_old_nw_incomplete, AND session_id_recent_nw # Not expected: session_id_old_w (whitelisted) self.assertEqual(len(all_found_session_ids), 3) self.assertIn(session_id_old_nw, all_found_session_ids) self.assertIn(session_id_old_nw_incomplete, all_found_session_ids) self.assertIn(session_id_recent_nw, all_found_session_ids) self.assertNotIn(session_id_old_w, all_found_session_ids)