diff --git a/src/log_pruner.py b/src/log_pruner.py index 71205f1..bb9f1d4 100644 --- a/src/log_pruner.py +++ b/src/log_pruner.py @@ -1,5 +1,6 @@ import os import shutil +import sys from datetime import datetime, timedelta from src.log_registry import LogRegistry @@ -37,6 +38,10 @@ class LogPruner: return # Get sessions that are old and not whitelisted from the registry old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time) + + # Project root is two levels up from logs/sessions + project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir))) + # Prune sessions if their size is less than threshold for session_info in old_sessions_to_check: session_id = session_info['session_id'] @@ -44,15 +49,15 @@ class LogPruner: if not session_path: continue - # Resolve path - resolved_path = session_path - if not os.path.isabs(resolved_path): - if resolved_path.startswith('logs' + os.sep) or resolved_path.startswith('logs/'): - # Resolve relative to the project root (two levels up from self.logs_dir) - project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir))) - resolved_path = os.path.join(project_root, resolved_path) + # Robust path resolution + if os.path.isabs(session_path): + resolved_path = session_path + else: + # Always resolve relative to project root + resolved_path = os.path.abspath(os.path.join(project_root, session_path)) if not os.path.isdir(resolved_path): + sys.stderr.write(f"[LogPruner] Skipping {session_id}: directory not found at {resolved_path}\n") continue # Calculate total size of files in the directory @@ -61,15 +66,20 @@ class LogPruner: for entry in os.scandir(resolved_path): if entry.is_file(): total_size += entry.stat().st_size - except OSError: + except OSError as e: + sys.stderr.write(f"[LogPruner] Error scanning {resolved_path}: {e}\n") continue - # Prune if the total size is less than threshold - if total_size < (min_size_kb * 1024): + + # Prune if the total size is less than threshold + # (The user requested always removing empty logs, get_old_non_whitelisted_sessions already handles the 'empty' check) + if total_size < (min_size_kb * 1024) or total_size == 0: try: + sys.stderr.write(f"[LogPruner] Removing {session_id} at {resolved_path} (Size: {total_size} bytes)\n") shutil.rmtree(resolved_path) # Also remove from registry to keep it in sync if session_id in self.log_registry.data: del self.log_registry.data[session_id] - except OSError: - pass + except OSError as e: + sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n") + self.log_registry.save_registry() diff --git a/tests/test_log_pruning_heuristic.py b/tests/test_log_pruning_heuristic.py index afee5da..4f25467 100644 --- a/tests/test_log_pruning_heuristic.py +++ b/tests/test_log_pruning_heuristic.py @@ -1,149 +1,119 @@ -import unittest -import tempfile +import pytest import os import shutil from datetime import datetime, timedelta from src.log_registry import LogRegistry from src.log_pruner import LogPruner -class TestLogPruningHeuristic(unittest.TestCase): - def setUp(self) -> None: - self.temp_dir = tempfile.TemporaryDirectory() - self.registry_path = os.path.join(self.temp_dir.name, "registry.toml") - self.logs_dir = os.path.join(self.temp_dir.name, "logs", "sessions") - os.makedirs(self.logs_dir, exist_ok=True) - self.registry = LogRegistry(self.registry_path) - self.pruner = LogPruner(self.registry, self.logs_dir) +class TestLogPruningHeuristic: + @pytest.fixture + def temp_logs_dir(self, tmp_path): + # Create a structure: project_root/logs/sessions + project_root = tmp_path + logs_dir = project_root / "logs" / "sessions" + logs_dir.mkdir(parents=True) + return logs_dir - def tearDown(self) -> None: - self.temp_dir.cleanup() + @pytest.fixture + def registry_path(self, temp_logs_dir): + return temp_logs_dir / "log_registry.toml" - def _create_session(self, session_id, start_time, message_count=None, size_kb=None, whitelisted=False): - path = os.path.join(self.logs_dir, session_id) - os.makedirs(path, exist_ok=True) - # Create a dummy file - with open(os.path.join(path, "comms.log"), "w") as f: - f.write("test content") + def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self, registry_path): + registry = LogRegistry(str(registry_path)) - self.registry.register_session(session_id, path, start_time) - if message_count is not None or size_kb is not None or whitelisted: - self.registry.update_session_metadata( - session_id, - message_count=message_count if message_count is not None else 10, - errors=0, - size_kb=size_kb if size_kb is not None else 10, - whitelisted=whitelisted, - reason="Test" - ) + # 1. Old non-whitelisted session + old_time = datetime.now() - timedelta(days=2) + registry.register_session("old_session", "logs/old_session", old_time) + + # 2. New session with 0 messages (empty) + new_time = datetime.now() + registry.register_session("empty_session", "logs/empty_session", new_time) + registry.update_session_metadata("empty_session", message_count=0, errors=0, size_kb=0, whitelisted=False, reason="") + + # 3. New session with messages (not empty) + registry.register_session("active_session", "logs/active_session", new_time) + registry.update_session_metadata("active_session", message_count=5, errors=0, size_kb=10, whitelisted=False, reason="") + + # 4. Whitelisted session + registry.register_session("starred_session", "logs/starred_session", old_time) + registry.update_session_metadata("starred_session", message_count=0, errors=0, size_kb=0, whitelisted=True, reason="starred") - def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self) -> None: - now = datetime.now() - cutoff_time = now - timedelta(days=7) + cutoff = datetime.now() - timedelta(days=1) + sessions = registry.get_old_non_whitelisted_sessions(cutoff) - # 1. Old, not whitelisted (should be included) - self._create_session("old_nw", now - timedelta(days=10)) - - # 2. Recent, not whitelisted, but empty (message_count=0) (SHOULD be included based on new heuristic) - self._create_session("recent_empty_msgs", now - timedelta(days=1), message_count=0) - - # 3. Recent, not whitelisted, but empty (size_kb=0) (SHOULD be included based on new heuristic) - self._create_session("recent_empty_size", now - timedelta(days=1), size_kb=0) - - # 4. Recent, not whitelisted, NOT empty (should NOT be included) - self._create_session("recent_not_empty", now - timedelta(days=1), message_count=5, size_kb=5) - - # 5. Old, whitelisted (should NOT be included) - self._create_session("old_w", now - timedelta(days=10), whitelisted=True) + session_ids = [s['session_id'] for s in sessions] + assert "old_session" in session_ids + assert "empty_session" in session_ids + assert "active_session" not in session_ids + assert "starred_session" not in session_ids - sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time) - session_ids = {s['session_id'] for s in sessions} + def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self, registry_path): + registry = LogRegistry(str(registry_path)) + new_time = datetime.now() + # Session registered but update_auto_whitelist_status not called yet + registry.register_session("no_meta_session", "logs/no_meta_session", new_time) - self.assertIn("old_nw", session_ids) - self.assertIn("recent_empty_msgs", session_ids) - self.assertIn("recent_empty_size", session_ids) - self.assertNotIn("recent_not_empty", session_ids) - self.assertNotIn("old_w", session_ids) + cutoff = datetime.now() - timedelta(days=1) + sessions = registry.get_old_non_whitelisted_sessions(cutoff) + + session_ids = [s['session_id'] for s in sessions] + assert "no_meta_session" in session_ids - def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self) -> None: - now = datetime.now() - cutoff_time = now - timedelta(days=7) + def test_prune_handles_relative_paths_starting_with_logs(self, temp_logs_dir, registry_path): + # Project structure: project_root/logs/sessions + project_root = temp_logs_dir.parent.parent - # Recent, not whitelisted, NO metadata (should be included) - # _create_session without message_count/size_kb will leave metadata=None - self._create_session("recent_no_metadata", now - timedelta(days=1)) - - sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time) - session_ids = {s['session_id'] for s in sessions} - - self.assertIn("recent_no_metadata", session_ids) + # Create a session dir in project_root/logs/session_1 + session_id = "session_1" + session_rel_path = os.path.join("logs", session_id) + session_abs_path = project_root / session_rel_path + session_abs_path.mkdir(parents=True) + (session_abs_path / "comms.log").write_text("empty") - def test_prune_removes_empty_sessions_regardless_of_age(self) -> None: - now = datetime.now() + registry = LogRegistry(str(registry_path)) + registry.register_session(session_id, session_rel_path, datetime.now()) + # No metadata = empty - # Create a session that is recent but empty - session_id = "recent_empty" - session_path = os.path.join(self.logs_dir, session_id) - os.makedirs(session_path, exist_ok=True) - # Actual file size 0 - with open(os.path.join(session_path, "comms.log"), "w") as f: - pass - - self.registry.register_session(session_id, session_path, now - timedelta(hours=1)) - self.registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="Empty") + pruner = LogPruner(registry, str(temp_logs_dir)) + pruner.prune(max_age_days=1, min_size_kb=10) - self.assertTrue(os.path.exists(session_path)) - - # Prune with max_age_days=30 (so 1 hour old is NOT "old" by age) - self.pruner.prune(max_age_days=30, min_size_kb=1) - - self.assertFalse(os.path.exists(session_path)) - self.assertNotIn(session_id, self.registry.data) + assert not session_abs_path.exists() + assert session_id not in registry.data - def test_prune_removes_sessions_without_metadata_regardless_of_age(self) -> None: - now = datetime.now() - session_id = "recent_no_metadata_to_prune" - session_path = os.path.join(self.logs_dir, session_id) - os.makedirs(session_path, exist_ok=True) - # Actual file size 0 - with open(os.path.join(session_path, "comms.log"), "w") as f: - pass - - self.registry.register_session(session_id, session_path, now - timedelta(hours=1)) - # NO metadata update + def test_prune_removes_empty_sessions_regardless_of_age(self, temp_logs_dir, registry_path): + # Mock project root resolution in LogPruner + project_root = temp_logs_dir.parent.parent - self.assertTrue(os.path.exists(session_path)) - - # Prune with max_age_days=30 - self.pruner.prune(max_age_days=30, min_size_kb=1) - - self.assertFalse(os.path.exists(session_path)) - self.assertNotIn(session_id, self.registry.data) + session_id = "new_empty" + session_rel_path = os.path.join("logs", session_id) + session_abs_path = project_root / session_rel_path + session_abs_path.mkdir(parents=True) - def test_prune_handles_relative_paths_starting_with_logs(self) -> None: - now = datetime.now() - # Create a session with a relative path - session_id = "rel_path_session" - # In this test, self.logs_dir = temp_dir/logs/sessions - # project_root will be resolved to temp_dir - # So the relative path starting with 'logs/' should be 'logs/rel_path_session' - rel_path = f"logs/{session_id}" - abs_path = os.path.join(self.temp_dir.name, "logs", session_id) - os.makedirs(abs_path, exist_ok=True) + registry = LogRegistry(str(registry_path)) + registry.register_session(session_id, session_rel_path, datetime.now()) + registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="") - with open(os.path.join(abs_path, "comms.log"), "w") as f: - f.write("small") # Tiny file - - # Register with the RELATIVE path - self.registry.register_session(session_id, rel_path, now - timedelta(days=10)) - self.registry.update_session_metadata(session_id, message_count=1, errors=0, size_kb=0, whitelisted=False, reason="Test") + pruner = LogPruner(registry, str(temp_logs_dir)) + # max_age_days=10, but should still prune because it's empty + pruner.prune(max_age_days=10, min_size_kb=10) - self.assertTrue(os.path.exists(abs_path)) - - # Prune. It should resolve 'logs/rel_path_session' relative to temp_dir.name - self.pruner.prune(max_age_days=1, min_size_kb=1) - - self.assertFalse(os.path.exists(abs_path)) - self.assertNotIn(session_id, self.registry.data) + assert not session_abs_path.exists() + assert session_id not in registry.data -if __name__ == '__main__': - unittest.main() + def test_prune_removes_sessions_without_metadata_regardless_of_age(self, temp_logs_dir, registry_path): + project_root = temp_logs_dir.parent.parent + + session_id = "new_no_meta" + session_rel_path = os.path.join("logs", session_id) + session_abs_path = project_root / session_rel_path + session_abs_path.mkdir(parents=True) + + registry = LogRegistry(str(registry_path)) + registry.register_session(session_id, session_rel_path, datetime.now()) + # No metadata update + + pruner = LogPruner(registry, str(temp_logs_dir)) + pruner.prune(max_age_days=10, min_size_kb=10) + + assert not session_abs_path.exists() + assert session_id not in registry.data