Files
manual_slop/tests/test_log_registry.py
Ed_ 60396f03f8 refactor(types): auto -> None sweep across entire codebase
Applied 236 return type annotations to functions with no return values
across 100+ files (core modules, tests, scripts, simulations).
Added Phase 4 to python_style_refactor track for remaining 597 items
(untyped params, vars, and functions with return values).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 11:16:56 -05:00

157 lines
8.4 KiB
Python

import unittest
import tempfile
import os
from datetime import datetime, timedelta
# This import is expected to fail with ImportError until log_registry.py is created.
from log_registry import LogRegistry
class TestLogRegistry(unittest.TestCase):
def setUp(self) -> None:
"""Set up a temporary directory and registry file for each test."""
self.temp_dir = tempfile.TemporaryDirectory()
self.registry_path = os.path.join(self.temp_dir.name, "registry.toml")
# Ensure the file is created and empty initially for a clean state.
# LogRegistry is assumed to load from this file on instantiation.
with open(self.registry_path, 'w') as f:
f.write("# Initial empty registry\n")
# Instantiate LogRegistry. This will load from the empty file.
self.registry = LogRegistry(self.registry_path)
def tearDown(self) -> None:
"""Clean up the temporary directory and its contents after each test."""
self.temp_dir.cleanup()
def test_instantiation(self) -> None:
"""Test LogRegistry instantiation with a file path."""
self.assertIsInstance(self.registry, LogRegistry)
self.assertEqual(self.registry.registry_path, self.registry_path)
# Check if the file exists. LogRegistry is assumed to create it if not.
self.assertTrue(os.path.exists(self.registry_path))
# We will verify content in other tests that explicitly save and reload.
def test_register_session(self) -> None:
"""Test registering a new session."""
session_id = "session-123"
path = "/path/to/session/123"
start_time = datetime.utcnow()
self.registry.register_session(session_id, path, start_time)
# Verify session was added to internal data (assuming LogRegistry has a public 'data' attribute for testing)
self.assertIn(session_id, self.registry.data)
session_data = self.registry.data[session_id]
self.assertEqual(session_data['path'], path)
# Convert stored ISO string back to datetime for comparison
stored_start_time = datetime.fromisoformat(session_data['start_time'])
self.assertAlmostEqual(stored_start_time, start_time, delta=timedelta(seconds=1)) # Allow for minor time differences
self.assertFalse(session_data.get('whitelisted', False)) # Default to not whitelisted
self.assertIsNone(session_data.get('metadata'))
# Verify data was written to the TOML file by reloading
reloaded_registry = LogRegistry(self.registry_path)
self.assertIn(session_id, reloaded_registry.data)
reloaded_session_data = reloaded_registry.data[session_id]
reloaded_start_time = datetime.fromisoformat(reloaded_session_data['start_time'])
self.assertAlmostEqual(reloaded_start_time, start_time, delta=timedelta(seconds=1))
def test_update_session_metadata(self) -> None:
"""Test updating session metadata."""
session_id = "session-456"
path = "/path/to/session/456"
start_time = datetime.utcnow()
self.registry.register_session(session_id, path, start_time)
message_count = 100
errors = 5
size_kb = 1024
whitelisted = True
reason = "Automated process"
self.registry.update_session_metadata(session_id, message_count, errors, size_kb, whitelisted, reason)
# Verify metadata was updated in internal data
self.assertIn(session_id, self.registry.data)
session_data = self.registry.data[session_id]
self.assertIsNotNone(session_data.get('metadata'))
metadata = session_data['metadata']
self.assertEqual(metadata['message_count'], message_count)
self.assertEqual(metadata['errors'], errors)
self.assertEqual(metadata['size_kb'], size_kb)
self.assertEqual(metadata['whitelisted'], whitelisted)
self.assertEqual(metadata['reason'], reason)
# Also check if the whitelisted flag in the main session data is updated
self.assertTrue(session_data.get('whitelisted', False))
# Verify data was written to the TOML file by reloading
reloaded_registry = LogRegistry(self.registry_path)
self.assertIn(session_id, reloaded_registry.data)
reloaded_session_data = reloaded_registry.data[session_id]
self.assertTrue(reloaded_session_data.get('metadata', {}).get('whitelisted', False))
self.assertTrue(reloaded_session_data.get('whitelisted', False)) # Check main flag too
def test_is_session_whitelisted(self) -> None:
"""Test checking if a session is whitelisted."""
session_id_whitelisted = "session-789-whitelisted"
path_w = "/path/to/session/789"
start_time_w = datetime.utcnow()
self.registry.register_session(session_id_whitelisted, path_w, start_time_w)
self.registry.update_session_metadata(session_id_whitelisted, 10, 0, 100, True, "Manual whitelist")
session_id_not_whitelisted = "session-abc-not-whitelisted"
path_nw = "/path/to/session/abc"
start_time_nw = datetime.utcnow()
self.registry.register_session(session_id_not_whitelisted, path_nw, start_time_nw)
# Test explicitly whitelisted session
self.assertTrue(self.registry.is_session_whitelisted(session_id_whitelisted))
# Test session registered but not updated, should default to not whitelisted
self.assertFalse(self.registry.is_session_whitelisted(session_id_not_whitelisted))
# Test for a non-existent session, should be treated as not whitelisted
self.assertFalse(self.registry.is_session_whitelisted("non-existent-session"))
def test_get_old_non_whitelisted_sessions(self) -> None:
"""Test retrieving old, non-whitelisted sessions."""
now = datetime.utcnow()
# Define a cutoff time that is 7 days ago
cutoff_time = now - timedelta(days=7)
# Session 1: Old and not whitelisted
session_id_old_nw = "session-old-nw"
path_old_nw = "/path/to/session/old_nw"
start_time_old_nw = now - timedelta(days=10) # Older than cutoff
self.registry.register_session(session_id_old_nw, path_old_nw, start_time_old_nw)
# Session 2: Recent and not whitelisted
session_id_recent_nw = "session-recent-nw"
path_recent_nw = "/path/to/session/recent_nw"
start_time_recent_nw = now - timedelta(days=3) # Newer than cutoff
self.registry.register_session(session_id_recent_nw, path_recent_nw, start_time_recent_nw)
# Session 3: Old and whitelisted
session_id_old_w = "session-old-w"
path_old_w = "/path/to/session/old_w"
start_time_old_w = now - timedelta(days=15) # Older than cutoff
self.registry.register_session(session_id_old_w, path_old_w, start_time_old_w)
self.registry.update_session_metadata(session_id_old_w, 50, 0, 500, True, "Whitelisted")
# Session 4: Old, not whitelisted explicitly, but with metadata that doesn't set 'whitelisted' to True.
# The 'is_session_whitelisted' logic should correctly interpret this as not whitelisted.
session_id_old_nw_incomplete = "session-old-nw-incomplete"
path_old_nw_incomplete = "/path/to/session/old_nw_incomplete"
start_time_old_nw_incomplete = now - timedelta(days=20) # Older than cutoff
self.registry.register_session(session_id_old_nw_incomplete, path_old_nw_incomplete, start_time_old_nw_incomplete)
# Update with some metadata, but set 'whitelisted' to False explicitly
self.registry.update_session_metadata(session_id_old_nw_incomplete, 10, 0, 100, False, "Manual review needed")
# Get sessions older than cutoff_time and not whitelisted
old_sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
# Collect session IDs from the result
found_session_ids = {s['session_id'] for s in old_sessions}
# Expected: session_id_old_nw (old, not whitelisted) and session_id_old_nw_incomplete (old, explicitly not whitelisted)
self.assertIn(session_id_old_nw, found_session_ids)
self.assertIn(session_id_old_nw_incomplete, found_session_ids)
# Not expected: session_id_recent_nw (too recent), session_id_old_w (whitelisted)
self.assertNotIn(session_id_recent_nw, found_session_ids)
self.assertNotIn(session_id_old_w, found_session_ids)
# Ensure only the expected sessions are in the result
self.assertEqual(len(found_session_ids), 2)
# Test with a cutoff that includes all sessions, and ensure only non-whitelisted are returned
future_cutoff = now + timedelta(days=1) # All sessions are older than this
all_old_sessions = self.registry.get_old_non_whitelisted_sessions(future_cutoff)
all_found_session_ids = {s['session_id'] for s in all_old_sessions}
# Expected: session_id_old_nw, session_id_old_nw_incomplete, AND session_id_recent_nw
# Not expected: session_id_old_w (whitelisted)
self.assertEqual(len(all_found_session_ids), 3)
self.assertIn(session_id_old_nw, all_found_session_ids)
self.assertIn(session_id_old_nw_incomplete, all_found_session_ids)
self.assertIn(session_id_recent_nw, all_found_session_ids)
self.assertNotIn(session_id_old_w, all_found_session_ids)