|
|
|
|
@@ -1,123 +1,119 @@
|
|
|
|
|
import unittest
|
|
|
|
|
import tempfile
|
|
|
|
|
import pytest
|
|
|
|
|
import os
|
|
|
|
|
import shutil
|
|
|
|
|
from datetime import datetime, timedelta
|
|
|
|
|
from src.log_registry import LogRegistry
|
|
|
|
|
from src.log_pruner import LogPruner
|
|
|
|
|
|
|
|
|
|
class TestLogPruningHeuristic(unittest.TestCase):
|
|
|
|
|
def setUp(self) -> None:
|
|
|
|
|
self.temp_dir = tempfile.TemporaryDirectory()
|
|
|
|
|
self.registry_path = os.path.join(self.temp_dir.name, "registry.toml")
|
|
|
|
|
self.logs_dir = os.path.join(self.temp_dir.name, "logs")
|
|
|
|
|
os.makedirs(self.logs_dir, exist_ok=True)
|
|
|
|
|
self.registry = LogRegistry(self.registry_path)
|
|
|
|
|
self.pruner = LogPruner(self.registry, self.logs_dir)
|
|
|
|
|
class TestLogPruningHeuristic:
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def temp_logs_dir(self, tmp_path):
|
|
|
|
|
# Create a structure: project_root/logs/sessions
|
|
|
|
|
project_root = tmp_path
|
|
|
|
|
logs_dir = project_root / "logs" / "sessions"
|
|
|
|
|
logs_dir.mkdir(parents=True)
|
|
|
|
|
return logs_dir
|
|
|
|
|
|
|
|
|
|
def tearDown(self) -> None:
|
|
|
|
|
self.temp_dir.cleanup()
|
|
|
|
|
@pytest.fixture
|
|
|
|
|
def registry_path(self, temp_logs_dir):
|
|
|
|
|
return temp_logs_dir / "log_registry.toml"
|
|
|
|
|
|
|
|
|
|
def _create_session(self, session_id, start_time, message_count=None, size_kb=None, whitelisted=False):
|
|
|
|
|
path = os.path.join(self.logs_dir, session_id)
|
|
|
|
|
os.makedirs(path, exist_ok=True)
|
|
|
|
|
# Create a dummy file
|
|
|
|
|
with open(os.path.join(path, "comms.log"), "w") as f:
|
|
|
|
|
f.write("test content")
|
|
|
|
|
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self, registry_path):
|
|
|
|
|
registry = LogRegistry(str(registry_path))
|
|
|
|
|
|
|
|
|
|
self.registry.register_session(session_id, path, start_time)
|
|
|
|
|
if message_count is not None or size_kb is not None or whitelisted:
|
|
|
|
|
self.registry.update_session_metadata(
|
|
|
|
|
session_id,
|
|
|
|
|
message_count=message_count if message_count is not None else 10,
|
|
|
|
|
errors=0,
|
|
|
|
|
size_kb=size_kb if size_kb is not None else 10,
|
|
|
|
|
whitelisted=whitelisted,
|
|
|
|
|
reason="Test"
|
|
|
|
|
)
|
|
|
|
|
# 1. Old non-whitelisted session
|
|
|
|
|
old_time = datetime.now() - timedelta(days=2)
|
|
|
|
|
registry.register_session("old_session", "logs/old_session", old_time)
|
|
|
|
|
|
|
|
|
|
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self) -> None:
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
cutoff_time = now - timedelta(days=7)
|
|
|
|
|
# 2. New session with 0 messages (empty)
|
|
|
|
|
new_time = datetime.now()
|
|
|
|
|
registry.register_session("empty_session", "logs/empty_session", new_time)
|
|
|
|
|
registry.update_session_metadata("empty_session", message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
|
|
|
|
|
|
|
|
|
|
# 1. Old, not whitelisted (should be included)
|
|
|
|
|
self._create_session("old_nw", now - timedelta(days=10))
|
|
|
|
|
# 3. New session with messages (not empty)
|
|
|
|
|
registry.register_session("active_session", "logs/active_session", new_time)
|
|
|
|
|
registry.update_session_metadata("active_session", message_count=5, errors=0, size_kb=10, whitelisted=False, reason="")
|
|
|
|
|
|
|
|
|
|
# 2. Recent, not whitelisted, but empty (message_count=0) (SHOULD be included based on new heuristic)
|
|
|
|
|
self._create_session("recent_empty_msgs", now - timedelta(days=1), message_count=0)
|
|
|
|
|
# 4. Whitelisted session
|
|
|
|
|
registry.register_session("starred_session", "logs/starred_session", old_time)
|
|
|
|
|
registry.update_session_metadata("starred_session", message_count=0, errors=0, size_kb=0, whitelisted=True, reason="starred")
|
|
|
|
|
|
|
|
|
|
# 3. Recent, not whitelisted, but empty (size_kb=0) (SHOULD be included based on new heuristic)
|
|
|
|
|
self._create_session("recent_empty_size", now - timedelta(days=1), size_kb=0)
|
|
|
|
|
cutoff = datetime.now() - timedelta(days=1)
|
|
|
|
|
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
|
|
|
|
|
|
|
|
|
|
# 4. Recent, not whitelisted, NOT empty (should NOT be included)
|
|
|
|
|
self._create_session("recent_not_empty", now - timedelta(days=1), message_count=5, size_kb=5)
|
|
|
|
|
session_ids = [s['session_id'] for s in sessions]
|
|
|
|
|
assert "old_session" in session_ids
|
|
|
|
|
assert "empty_session" in session_ids
|
|
|
|
|
assert "active_session" not in session_ids
|
|
|
|
|
assert "starred_session" not in session_ids
|
|
|
|
|
|
|
|
|
|
# 5. Old, whitelisted (should NOT be included)
|
|
|
|
|
self._create_session("old_w", now - timedelta(days=10), whitelisted=True)
|
|
|
|
|
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self, registry_path):
|
|
|
|
|
registry = LogRegistry(str(registry_path))
|
|
|
|
|
new_time = datetime.now()
|
|
|
|
|
# Session registered but update_auto_whitelist_status not called yet
|
|
|
|
|
registry.register_session("no_meta_session", "logs/no_meta_session", new_time)
|
|
|
|
|
|
|
|
|
|
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
|
|
|
|
|
session_ids = {s['session_id'] for s in sessions}
|
|
|
|
|
cutoff = datetime.now() - timedelta(days=1)
|
|
|
|
|
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
|
|
|
|
|
|
|
|
|
|
self.assertIn("old_nw", session_ids)
|
|
|
|
|
self.assertIn("recent_empty_msgs", session_ids)
|
|
|
|
|
self.assertIn("recent_empty_size", session_ids)
|
|
|
|
|
self.assertNotIn("recent_not_empty", session_ids)
|
|
|
|
|
self.assertNotIn("old_w", session_ids)
|
|
|
|
|
session_ids = [s['session_id'] for s in sessions]
|
|
|
|
|
assert "no_meta_session" in session_ids
|
|
|
|
|
|
|
|
|
|
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self) -> None:
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
cutoff_time = now - timedelta(days=7)
|
|
|
|
|
def test_prune_handles_relative_paths_starting_with_logs(self, temp_logs_dir, registry_path):
|
|
|
|
|
# Project structure: project_root/logs/sessions
|
|
|
|
|
project_root = temp_logs_dir.parent.parent
|
|
|
|
|
|
|
|
|
|
# Recent, not whitelisted, NO metadata (should be included)
|
|
|
|
|
# _create_session without message_count/size_kb will leave metadata=None
|
|
|
|
|
self._create_session("recent_no_metadata", now - timedelta(days=1))
|
|
|
|
|
# Create a session dir in project_root/logs/session_1
|
|
|
|
|
session_id = "session_1"
|
|
|
|
|
session_rel_path = os.path.join("logs", session_id)
|
|
|
|
|
session_abs_path = project_root / session_rel_path
|
|
|
|
|
session_abs_path.mkdir(parents=True)
|
|
|
|
|
(session_abs_path / "comms.log").write_text("empty")
|
|
|
|
|
|
|
|
|
|
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
|
|
|
|
|
session_ids = {s['session_id'] for s in sessions}
|
|
|
|
|
registry = LogRegistry(str(registry_path))
|
|
|
|
|
registry.register_session(session_id, session_rel_path, datetime.now())
|
|
|
|
|
# No metadata = empty
|
|
|
|
|
|
|
|
|
|
self.assertIn("recent_no_metadata", session_ids)
|
|
|
|
|
pruner = LogPruner(registry, str(temp_logs_dir))
|
|
|
|
|
pruner.prune(max_age_days=1, min_size_kb=10)
|
|
|
|
|
|
|
|
|
|
def test_prune_removes_empty_sessions_regardless_of_age(self) -> None:
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
assert not session_abs_path.exists()
|
|
|
|
|
assert session_id not in registry.data
|
|
|
|
|
|
|
|
|
|
# Create a session that is recent but empty
|
|
|
|
|
session_id = "recent_empty"
|
|
|
|
|
session_path = os.path.join(self.logs_dir, session_id)
|
|
|
|
|
os.makedirs(session_path, exist_ok=True)
|
|
|
|
|
# Actual file size 0
|
|
|
|
|
with open(os.path.join(session_path, "comms.log"), "w") as f:
|
|
|
|
|
pass
|
|
|
|
|
def test_prune_removes_empty_sessions_regardless_of_age(self, temp_logs_dir, registry_path):
|
|
|
|
|
# Mock project root resolution in LogPruner
|
|
|
|
|
project_root = temp_logs_dir.parent.parent
|
|
|
|
|
|
|
|
|
|
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
|
|
|
|
|
self.registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="Empty")
|
|
|
|
|
session_id = "new_empty"
|
|
|
|
|
session_rel_path = os.path.join("logs", session_id)
|
|
|
|
|
session_abs_path = project_root / session_rel_path
|
|
|
|
|
session_abs_path.mkdir(parents=True)
|
|
|
|
|
|
|
|
|
|
self.assertTrue(os.path.exists(session_path))
|
|
|
|
|
registry = LogRegistry(str(registry_path))
|
|
|
|
|
registry.register_session(session_id, session_rel_path, datetime.now())
|
|
|
|
|
registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
|
|
|
|
|
|
|
|
|
|
# Prune with max_age_days=30 (so 1 hour old is NOT "old" by age)
|
|
|
|
|
self.pruner.prune(max_age_days=30, min_size_kb=1)
|
|
|
|
|
pruner = LogPruner(registry, str(temp_logs_dir))
|
|
|
|
|
# max_age_days=10, but should still prune because it's empty
|
|
|
|
|
pruner.prune(max_age_days=10, min_size_kb=10)
|
|
|
|
|
|
|
|
|
|
self.assertFalse(os.path.exists(session_path))
|
|
|
|
|
self.assertNotIn(session_id, self.registry.data)
|
|
|
|
|
assert not session_abs_path.exists()
|
|
|
|
|
assert session_id not in registry.data
|
|
|
|
|
|
|
|
|
|
def test_prune_removes_sessions_without_metadata_regardless_of_age(self) -> None:
|
|
|
|
|
now = datetime.now()
|
|
|
|
|
session_id = "recent_no_metadata_to_prune"
|
|
|
|
|
session_path = os.path.join(self.logs_dir, session_id)
|
|
|
|
|
os.makedirs(session_path, exist_ok=True)
|
|
|
|
|
# Actual file size 0
|
|
|
|
|
with open(os.path.join(session_path, "comms.log"), "w") as f:
|
|
|
|
|
pass
|
|
|
|
|
def test_prune_removes_sessions_without_metadata_regardless_of_age(self, temp_logs_dir, registry_path):
|
|
|
|
|
project_root = temp_logs_dir.parent.parent
|
|
|
|
|
|
|
|
|
|
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
|
|
|
|
|
# NO metadata update
|
|
|
|
|
session_id = "new_no_meta"
|
|
|
|
|
session_rel_path = os.path.join("logs", session_id)
|
|
|
|
|
session_abs_path = project_root / session_rel_path
|
|
|
|
|
session_abs_path.mkdir(parents=True)
|
|
|
|
|
|
|
|
|
|
self.assertTrue(os.path.exists(session_path))
|
|
|
|
|
registry = LogRegistry(str(registry_path))
|
|
|
|
|
registry.register_session(session_id, session_rel_path, datetime.now())
|
|
|
|
|
# No metadata update
|
|
|
|
|
|
|
|
|
|
# Prune with max_age_days=30
|
|
|
|
|
self.pruner.prune(max_age_days=30, min_size_kb=1)
|
|
|
|
|
pruner = LogPruner(registry, str(temp_logs_dir))
|
|
|
|
|
pruner.prune(max_age_days=10, min_size_kb=10)
|
|
|
|
|
|
|
|
|
|
self.assertFalse(os.path.exists(session_path))
|
|
|
|
|
self.assertNotIn(session_id, self.registry.data)
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest.main()
|
|
|
|
|
assert not session_abs_path.exists()
|
|
|
|
|
assert session_id not in registry.data
|
|
|
|
|
|