Compare commits

...

3 Commits

2 changed files with 161 additions and 111 deletions

View File

@@ -1,5 +1,7 @@
import os
import shutil
import sys
import time
from datetime import datetime, timedelta
from src.log_registry import LogRegistry
@@ -37,27 +39,79 @@ class LogPruner:
return
# Get sessions that are old and not whitelisted from the registry
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
# Project root is two levels up from logs/sessions
project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir)))
# Prune sessions if their size is less than threshold
for session_info in old_sessions_to_check:
session_id = session_info['session_id']
session_path = session_info['path']
if not session_path or not os.path.isdir(session_path):
if not session_path:
continue
# RESOLUTION STRATEGY:
# 1. Try as-is (absolute or relative to project root)
# 2. Try as a sub-directory of self.logs_dir (e.g. logs/sessions/session_id)
# 3. Try relative to parent of logs_dir if it starts with 'logs/'
candidates = []
if os.path.isabs(session_path):
candidates.append(session_path)
else:
candidates.append(os.path.abspath(os.path.join(project_root, session_path)))
candidates.append(os.path.abspath(os.path.join(self.logs_dir, session_id)))
candidates.append(os.path.abspath(os.path.join(self.logs_dir, os.path.basename(session_path))))
resolved_path = None
for cand in candidates:
if os.path.isdir(cand):
resolved_path = cand
break
if not resolved_path:
# If we can't find it, we still remove it from the registry if it's "empty"
# so it stops cluttering the UI.
sys.stderr.write(f"[LogPruner] Could not find directory for {session_id} in candidates. Removing registry entry.\n")
if session_id in self.log_registry.data:
del self.log_registry.data[session_id]
continue
# Calculate total size of files in the directory
total_size = 0
try:
for entry in os.scandir(session_path):
for entry in os.scandir(resolved_path):
if entry.is_file():
total_size += entry.stat().st_size
except OSError:
except OSError as e:
sys.stderr.write(f"[LogPruner] Error scanning {resolved_path}: {e}\n")
continue
# Prune if the total size is less than threshold
if total_size < (min_size_kb * 1024):
if total_size < (min_size_kb * 1024) or total_size == 0:
try:
shutil.rmtree(session_path)
sys.stderr.write(f"[LogPruner] Removing {session_id} at {resolved_path} (Size: {total_size} bytes)\n")
# Windows specific: sometimes files are locked.
# We try a few times with small delays.
def remove_readonly(func, path, excinfo):
os.chmod(path, 0o777)
func(path)
for attempt in range(3):
try:
shutil.rmtree(resolved_path, onerror=remove_readonly)
break
except OSError:
if attempt < 2:
time.sleep(0.1)
else:
raise
# Also remove from registry to keep it in sync
if session_id in self.log_registry.data:
del self.log_registry.data[session_id]
except OSError:
pass
except OSError as e:
sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n")
self.log_registry.save_registry()

View File

@@ -1,123 +1,119 @@
import unittest
import tempfile
import pytest
import os
import shutil
from datetime import datetime, timedelta
from src.log_registry import LogRegistry
from src.log_pruner import LogPruner
class TestLogPruningHeuristic(unittest.TestCase):
def setUp(self) -> None:
self.temp_dir = tempfile.TemporaryDirectory()
self.registry_path = os.path.join(self.temp_dir.name, "registry.toml")
self.logs_dir = os.path.join(self.temp_dir.name, "logs")
os.makedirs(self.logs_dir, exist_ok=True)
self.registry = LogRegistry(self.registry_path)
self.pruner = LogPruner(self.registry, self.logs_dir)
class TestLogPruningHeuristic:
@pytest.fixture
def temp_logs_dir(self, tmp_path):
# Create a structure: project_root/logs/sessions
project_root = tmp_path
logs_dir = project_root / "logs" / "sessions"
logs_dir.mkdir(parents=True)
return logs_dir
def tearDown(self) -> None:
self.temp_dir.cleanup()
@pytest.fixture
def registry_path(self, temp_logs_dir):
return temp_logs_dir / "log_registry.toml"
def _create_session(self, session_id, start_time, message_count=None, size_kb=None, whitelisted=False):
path = os.path.join(self.logs_dir, session_id)
os.makedirs(path, exist_ok=True)
# Create a dummy file
with open(os.path.join(path, "comms.log"), "w") as f:
f.write("test content")
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self, registry_path):
registry = LogRegistry(str(registry_path))
self.registry.register_session(session_id, path, start_time)
if message_count is not None or size_kb is not None or whitelisted:
self.registry.update_session_metadata(
session_id,
message_count=message_count if message_count is not None else 10,
errors=0,
size_kb=size_kb if size_kb is not None else 10,
whitelisted=whitelisted,
reason="Test"
)
# 1. Old non-whitelisted session
old_time = datetime.now() - timedelta(days=2)
registry.register_session("old_session", "logs/old_session", old_time)
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self) -> None:
now = datetime.now()
cutoff_time = now - timedelta(days=7)
# 2. New session with 0 messages (empty)
new_time = datetime.now()
registry.register_session("empty_session", "logs/empty_session", new_time)
registry.update_session_metadata("empty_session", message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
# 1. Old, not whitelisted (should be included)
self._create_session("old_nw", now - timedelta(days=10))
# 3. New session with messages (not empty)
registry.register_session("active_session", "logs/active_session", new_time)
registry.update_session_metadata("active_session", message_count=5, errors=0, size_kb=10, whitelisted=False, reason="")
# 2. Recent, not whitelisted, but empty (message_count=0) (SHOULD be included based on new heuristic)
self._create_session("recent_empty_msgs", now - timedelta(days=1), message_count=0)
# 4. Whitelisted session
registry.register_session("starred_session", "logs/starred_session", old_time)
registry.update_session_metadata("starred_session", message_count=0, errors=0, size_kb=0, whitelisted=True, reason="starred")
# 3. Recent, not whitelisted, but empty (size_kb=0) (SHOULD be included based on new heuristic)
self._create_session("recent_empty_size", now - timedelta(days=1), size_kb=0)
cutoff = datetime.now() - timedelta(days=1)
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
# 4. Recent, not whitelisted, NOT empty (should NOT be included)
self._create_session("recent_not_empty", now - timedelta(days=1), message_count=5, size_kb=5)
session_ids = [s['session_id'] for s in sessions]
assert "old_session" in session_ids
assert "empty_session" in session_ids
assert "active_session" not in session_ids
assert "starred_session" not in session_ids
# 5. Old, whitelisted (should NOT be included)
self._create_session("old_w", now - timedelta(days=10), whitelisted=True)
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self, registry_path):
registry = LogRegistry(str(registry_path))
new_time = datetime.now()
# Session registered but update_auto_whitelist_status not called yet
registry.register_session("no_meta_session", "logs/no_meta_session", new_time)
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
session_ids = {s['session_id'] for s in sessions}
cutoff = datetime.now() - timedelta(days=1)
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
self.assertIn("old_nw", session_ids)
self.assertIn("recent_empty_msgs", session_ids)
self.assertIn("recent_empty_size", session_ids)
self.assertNotIn("recent_not_empty", session_ids)
self.assertNotIn("old_w", session_ids)
session_ids = [s['session_id'] for s in sessions]
assert "no_meta_session" in session_ids
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self) -> None:
now = datetime.now()
cutoff_time = now - timedelta(days=7)
def test_prune_handles_relative_paths_starting_with_logs(self, temp_logs_dir, registry_path):
# Project structure: project_root/logs/sessions
project_root = temp_logs_dir.parent.parent
# Recent, not whitelisted, NO metadata (should be included)
# _create_session without message_count/size_kb will leave metadata=None
self._create_session("recent_no_metadata", now - timedelta(days=1))
# Create a session dir in project_root/logs/session_1
session_id = "session_1"
session_rel_path = os.path.join("logs", session_id)
session_abs_path = project_root / session_rel_path
session_abs_path.mkdir(parents=True)
(session_abs_path / "comms.log").write_text("empty")
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
session_ids = {s['session_id'] for s in sessions}
registry = LogRegistry(str(registry_path))
registry.register_session(session_id, session_rel_path, datetime.now())
# No metadata = empty
self.assertIn("recent_no_metadata", session_ids)
pruner = LogPruner(registry, str(temp_logs_dir))
pruner.prune(max_age_days=1, min_size_kb=10)
def test_prune_removes_empty_sessions_regardless_of_age(self) -> None:
now = datetime.now()
assert not session_abs_path.exists()
assert session_id not in registry.data
# Create a session that is recent but empty
session_id = "recent_empty"
session_path = os.path.join(self.logs_dir, session_id)
os.makedirs(session_path, exist_ok=True)
# Actual file size 0
with open(os.path.join(session_path, "comms.log"), "w") as f:
pass
def test_prune_removes_empty_sessions_regardless_of_age(self, temp_logs_dir, registry_path):
# Mock project root resolution in LogPruner
project_root = temp_logs_dir.parent.parent
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
self.registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="Empty")
session_id = "new_empty"
session_rel_path = os.path.join("logs", session_id)
session_abs_path = project_root / session_rel_path
session_abs_path.mkdir(parents=True)
self.assertTrue(os.path.exists(session_path))
registry = LogRegistry(str(registry_path))
registry.register_session(session_id, session_rel_path, datetime.now())
registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
# Prune with max_age_days=30 (so 1 hour old is NOT "old" by age)
self.pruner.prune(max_age_days=30, min_size_kb=1)
pruner = LogPruner(registry, str(temp_logs_dir))
# max_age_days=10, but should still prune because it's empty
pruner.prune(max_age_days=10, min_size_kb=10)
self.assertFalse(os.path.exists(session_path))
self.assertNotIn(session_id, self.registry.data)
assert not session_abs_path.exists()
assert session_id not in registry.data
def test_prune_removes_sessions_without_metadata_regardless_of_age(self) -> None:
now = datetime.now()
session_id = "recent_no_metadata_to_prune"
session_path = os.path.join(self.logs_dir, session_id)
os.makedirs(session_path, exist_ok=True)
# Actual file size 0
with open(os.path.join(session_path, "comms.log"), "w") as f:
pass
def test_prune_removes_sessions_without_metadata_regardless_of_age(self, temp_logs_dir, registry_path):
project_root = temp_logs_dir.parent.parent
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
# NO metadata update
session_id = "new_no_meta"
session_rel_path = os.path.join("logs", session_id)
session_abs_path = project_root / session_rel_path
session_abs_path.mkdir(parents=True)
self.assertTrue(os.path.exists(session_path))
registry = LogRegistry(str(registry_path))
registry.register_session(session_id, session_rel_path, datetime.now())
# No metadata update
# Prune with max_age_days=30
self.pruner.prune(max_age_days=30, min_size_kb=1)
pruner = LogPruner(registry, str(temp_logs_dir))
pruner.prune(max_age_days=10, min_size_kb=10)
self.assertFalse(os.path.exists(session_path))
self.assertNotIn(session_id, self.registry.data)
if __name__ == '__main__':
unittest.main()
assert not session_abs_path.exists()
assert session_id not in registry.data