fix(logs): Final robust fix for LogPruner path resolution and empty log pruning
This commit is contained in:
@@ -1,5 +1,6 @@
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from src.log_registry import LogRegistry
|
||||
|
||||
@@ -37,6 +38,10 @@ class LogPruner:
|
||||
return
|
||||
# Get sessions that are old and not whitelisted from the registry
|
||||
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
|
||||
|
||||
# Project root is two levels up from logs/sessions
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir)))
|
||||
|
||||
# Prune sessions if their size is less than threshold
|
||||
for session_info in old_sessions_to_check:
|
||||
session_id = session_info['session_id']
|
||||
@@ -44,15 +49,15 @@ class LogPruner:
|
||||
if not session_path:
|
||||
continue
|
||||
|
||||
# Resolve path
|
||||
resolved_path = session_path
|
||||
if not os.path.isabs(resolved_path):
|
||||
if resolved_path.startswith('logs' + os.sep) or resolved_path.startswith('logs/'):
|
||||
# Resolve relative to the project root (two levels up from self.logs_dir)
|
||||
project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir)))
|
||||
resolved_path = os.path.join(project_root, resolved_path)
|
||||
# Robust path resolution
|
||||
if os.path.isabs(session_path):
|
||||
resolved_path = session_path
|
||||
else:
|
||||
# Always resolve relative to project root
|
||||
resolved_path = os.path.abspath(os.path.join(project_root, session_path))
|
||||
|
||||
if not os.path.isdir(resolved_path):
|
||||
sys.stderr.write(f"[LogPruner] Skipping {session_id}: directory not found at {resolved_path}\n")
|
||||
continue
|
||||
|
||||
# Calculate total size of files in the directory
|
||||
@@ -61,15 +66,20 @@ class LogPruner:
|
||||
for entry in os.scandir(resolved_path):
|
||||
if entry.is_file():
|
||||
total_size += entry.stat().st_size
|
||||
except OSError:
|
||||
except OSError as e:
|
||||
sys.stderr.write(f"[LogPruner] Error scanning {resolved_path}: {e}\n")
|
||||
continue
|
||||
# Prune if the total size is less than threshold
|
||||
if total_size < (min_size_kb * 1024):
|
||||
|
||||
# Prune if the total size is less than threshold
|
||||
# (The user requested always removing empty logs, get_old_non_whitelisted_sessions already handles the 'empty' check)
|
||||
if total_size < (min_size_kb * 1024) or total_size == 0:
|
||||
try:
|
||||
sys.stderr.write(f"[LogPruner] Removing {session_id} at {resolved_path} (Size: {total_size} bytes)\n")
|
||||
shutil.rmtree(resolved_path)
|
||||
# Also remove from registry to keep it in sync
|
||||
if session_id in self.log_registry.data:
|
||||
del self.log_registry.data[session_id]
|
||||
except OSError:
|
||||
pass
|
||||
except OSError as e:
|
||||
sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n")
|
||||
|
||||
self.log_registry.save_registry()
|
||||
|
||||
@@ -1,149 +1,119 @@
|
||||
import unittest
|
||||
import tempfile
|
||||
import pytest
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
from src.log_registry import LogRegistry
|
||||
from src.log_pruner import LogPruner
|
||||
|
||||
class TestLogPruningHeuristic(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.temp_dir = tempfile.TemporaryDirectory()
|
||||
self.registry_path = os.path.join(self.temp_dir.name, "registry.toml")
|
||||
self.logs_dir = os.path.join(self.temp_dir.name, "logs", "sessions")
|
||||
os.makedirs(self.logs_dir, exist_ok=True)
|
||||
self.registry = LogRegistry(self.registry_path)
|
||||
self.pruner = LogPruner(self.registry, self.logs_dir)
|
||||
class TestLogPruningHeuristic:
|
||||
@pytest.fixture
|
||||
def temp_logs_dir(self, tmp_path):
|
||||
# Create a structure: project_root/logs/sessions
|
||||
project_root = tmp_path
|
||||
logs_dir = project_root / "logs" / "sessions"
|
||||
logs_dir.mkdir(parents=True)
|
||||
return logs_dir
|
||||
|
||||
def tearDown(self) -> None:
|
||||
self.temp_dir.cleanup()
|
||||
@pytest.fixture
|
||||
def registry_path(self, temp_logs_dir):
|
||||
return temp_logs_dir / "log_registry.toml"
|
||||
|
||||
def _create_session(self, session_id, start_time, message_count=None, size_kb=None, whitelisted=False):
|
||||
path = os.path.join(self.logs_dir, session_id)
|
||||
os.makedirs(path, exist_ok=True)
|
||||
# Create a dummy file
|
||||
with open(os.path.join(path, "comms.log"), "w") as f:
|
||||
f.write("test content")
|
||||
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self, registry_path):
|
||||
registry = LogRegistry(str(registry_path))
|
||||
|
||||
self.registry.register_session(session_id, path, start_time)
|
||||
if message_count is not None or size_kb is not None or whitelisted:
|
||||
self.registry.update_session_metadata(
|
||||
session_id,
|
||||
message_count=message_count if message_count is not None else 10,
|
||||
errors=0,
|
||||
size_kb=size_kb if size_kb is not None else 10,
|
||||
whitelisted=whitelisted,
|
||||
reason="Test"
|
||||
)
|
||||
# 1. Old non-whitelisted session
|
||||
old_time = datetime.now() - timedelta(days=2)
|
||||
registry.register_session("old_session", "logs/old_session", old_time)
|
||||
|
||||
# 2. New session with 0 messages (empty)
|
||||
new_time = datetime.now()
|
||||
registry.register_session("empty_session", "logs/empty_session", new_time)
|
||||
registry.update_session_metadata("empty_session", message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
|
||||
|
||||
# 3. New session with messages (not empty)
|
||||
registry.register_session("active_session", "logs/active_session", new_time)
|
||||
registry.update_session_metadata("active_session", message_count=5, errors=0, size_kb=10, whitelisted=False, reason="")
|
||||
|
||||
# 4. Whitelisted session
|
||||
registry.register_session("starred_session", "logs/starred_session", old_time)
|
||||
registry.update_session_metadata("starred_session", message_count=0, errors=0, size_kb=0, whitelisted=True, reason="starred")
|
||||
|
||||
def test_get_old_non_whitelisted_sessions_includes_empty_sessions(self) -> None:
|
||||
now = datetime.now()
|
||||
cutoff_time = now - timedelta(days=7)
|
||||
cutoff = datetime.now() - timedelta(days=1)
|
||||
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
|
||||
|
||||
# 1. Old, not whitelisted (should be included)
|
||||
self._create_session("old_nw", now - timedelta(days=10))
|
||||
|
||||
# 2. Recent, not whitelisted, but empty (message_count=0) (SHOULD be included based on new heuristic)
|
||||
self._create_session("recent_empty_msgs", now - timedelta(days=1), message_count=0)
|
||||
|
||||
# 3. Recent, not whitelisted, but empty (size_kb=0) (SHOULD be included based on new heuristic)
|
||||
self._create_session("recent_empty_size", now - timedelta(days=1), size_kb=0)
|
||||
|
||||
# 4. Recent, not whitelisted, NOT empty (should NOT be included)
|
||||
self._create_session("recent_not_empty", now - timedelta(days=1), message_count=5, size_kb=5)
|
||||
|
||||
# 5. Old, whitelisted (should NOT be included)
|
||||
self._create_session("old_w", now - timedelta(days=10), whitelisted=True)
|
||||
session_ids = [s['session_id'] for s in sessions]
|
||||
assert "old_session" in session_ids
|
||||
assert "empty_session" in session_ids
|
||||
assert "active_session" not in session_ids
|
||||
assert "starred_session" not in session_ids
|
||||
|
||||
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
|
||||
session_ids = {s['session_id'] for s in sessions}
|
||||
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self, registry_path):
|
||||
registry = LogRegistry(str(registry_path))
|
||||
new_time = datetime.now()
|
||||
# Session registered but update_auto_whitelist_status not called yet
|
||||
registry.register_session("no_meta_session", "logs/no_meta_session", new_time)
|
||||
|
||||
self.assertIn("old_nw", session_ids)
|
||||
self.assertIn("recent_empty_msgs", session_ids)
|
||||
self.assertIn("recent_empty_size", session_ids)
|
||||
self.assertNotIn("recent_not_empty", session_ids)
|
||||
self.assertNotIn("old_w", session_ids)
|
||||
cutoff = datetime.now() - timedelta(days=1)
|
||||
sessions = registry.get_old_non_whitelisted_sessions(cutoff)
|
||||
|
||||
session_ids = [s['session_id'] for s in sessions]
|
||||
assert "no_meta_session" in session_ids
|
||||
|
||||
def test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata(self) -> None:
|
||||
now = datetime.now()
|
||||
cutoff_time = now - timedelta(days=7)
|
||||
def test_prune_handles_relative_paths_starting_with_logs(self, temp_logs_dir, registry_path):
|
||||
# Project structure: project_root/logs/sessions
|
||||
project_root = temp_logs_dir.parent.parent
|
||||
|
||||
# Recent, not whitelisted, NO metadata (should be included)
|
||||
# _create_session without message_count/size_kb will leave metadata=None
|
||||
self._create_session("recent_no_metadata", now - timedelta(days=1))
|
||||
|
||||
sessions = self.registry.get_old_non_whitelisted_sessions(cutoff_time)
|
||||
session_ids = {s['session_id'] for s in sessions}
|
||||
|
||||
self.assertIn("recent_no_metadata", session_ids)
|
||||
# Create a session dir in project_root/logs/session_1
|
||||
session_id = "session_1"
|
||||
session_rel_path = os.path.join("logs", session_id)
|
||||
session_abs_path = project_root / session_rel_path
|
||||
session_abs_path.mkdir(parents=True)
|
||||
(session_abs_path / "comms.log").write_text("empty")
|
||||
|
||||
def test_prune_removes_empty_sessions_regardless_of_age(self) -> None:
|
||||
now = datetime.now()
|
||||
registry = LogRegistry(str(registry_path))
|
||||
registry.register_session(session_id, session_rel_path, datetime.now())
|
||||
# No metadata = empty
|
||||
|
||||
# Create a session that is recent but empty
|
||||
session_id = "recent_empty"
|
||||
session_path = os.path.join(self.logs_dir, session_id)
|
||||
os.makedirs(session_path, exist_ok=True)
|
||||
# Actual file size 0
|
||||
with open(os.path.join(session_path, "comms.log"), "w") as f:
|
||||
pass
|
||||
|
||||
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
|
||||
self.registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="Empty")
|
||||
pruner = LogPruner(registry, str(temp_logs_dir))
|
||||
pruner.prune(max_age_days=1, min_size_kb=10)
|
||||
|
||||
self.assertTrue(os.path.exists(session_path))
|
||||
|
||||
# Prune with max_age_days=30 (so 1 hour old is NOT "old" by age)
|
||||
self.pruner.prune(max_age_days=30, min_size_kb=1)
|
||||
|
||||
self.assertFalse(os.path.exists(session_path))
|
||||
self.assertNotIn(session_id, self.registry.data)
|
||||
assert not session_abs_path.exists()
|
||||
assert session_id not in registry.data
|
||||
|
||||
def test_prune_removes_sessions_without_metadata_regardless_of_age(self) -> None:
|
||||
now = datetime.now()
|
||||
session_id = "recent_no_metadata_to_prune"
|
||||
session_path = os.path.join(self.logs_dir, session_id)
|
||||
os.makedirs(session_path, exist_ok=True)
|
||||
# Actual file size 0
|
||||
with open(os.path.join(session_path, "comms.log"), "w") as f:
|
||||
pass
|
||||
|
||||
self.registry.register_session(session_id, session_path, now - timedelta(hours=1))
|
||||
# NO metadata update
|
||||
def test_prune_removes_empty_sessions_regardless_of_age(self, temp_logs_dir, registry_path):
|
||||
# Mock project root resolution in LogPruner
|
||||
project_root = temp_logs_dir.parent.parent
|
||||
|
||||
self.assertTrue(os.path.exists(session_path))
|
||||
|
||||
# Prune with max_age_days=30
|
||||
self.pruner.prune(max_age_days=30, min_size_kb=1)
|
||||
|
||||
self.assertFalse(os.path.exists(session_path))
|
||||
self.assertNotIn(session_id, self.registry.data)
|
||||
session_id = "new_empty"
|
||||
session_rel_path = os.path.join("logs", session_id)
|
||||
session_abs_path = project_root / session_rel_path
|
||||
session_abs_path.mkdir(parents=True)
|
||||
|
||||
def test_prune_handles_relative_paths_starting_with_logs(self) -> None:
|
||||
now = datetime.now()
|
||||
# Create a session with a relative path
|
||||
session_id = "rel_path_session"
|
||||
# In this test, self.logs_dir = temp_dir/logs/sessions
|
||||
# project_root will be resolved to temp_dir
|
||||
# So the relative path starting with 'logs/' should be 'logs/rel_path_session'
|
||||
rel_path = f"logs/{session_id}"
|
||||
abs_path = os.path.join(self.temp_dir.name, "logs", session_id)
|
||||
os.makedirs(abs_path, exist_ok=True)
|
||||
registry = LogRegistry(str(registry_path))
|
||||
registry.register_session(session_id, session_rel_path, datetime.now())
|
||||
registry.update_session_metadata(session_id, message_count=0, errors=0, size_kb=0, whitelisted=False, reason="")
|
||||
|
||||
with open(os.path.join(abs_path, "comms.log"), "w") as f:
|
||||
f.write("small") # Tiny file
|
||||
|
||||
# Register with the RELATIVE path
|
||||
self.registry.register_session(session_id, rel_path, now - timedelta(days=10))
|
||||
self.registry.update_session_metadata(session_id, message_count=1, errors=0, size_kb=0, whitelisted=False, reason="Test")
|
||||
pruner = LogPruner(registry, str(temp_logs_dir))
|
||||
# max_age_days=10, but should still prune because it's empty
|
||||
pruner.prune(max_age_days=10, min_size_kb=10)
|
||||
|
||||
self.assertTrue(os.path.exists(abs_path))
|
||||
|
||||
# Prune. It should resolve 'logs/rel_path_session' relative to temp_dir.name
|
||||
self.pruner.prune(max_age_days=1, min_size_kb=1)
|
||||
|
||||
self.assertFalse(os.path.exists(abs_path))
|
||||
self.assertNotIn(session_id, self.registry.data)
|
||||
assert not session_abs_path.exists()
|
||||
assert session_id not in registry.data
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
def test_prune_removes_sessions_without_metadata_regardless_of_age(self, temp_logs_dir, registry_path):
|
||||
project_root = temp_logs_dir.parent.parent
|
||||
|
||||
session_id = "new_no_meta"
|
||||
session_rel_path = os.path.join("logs", session_id)
|
||||
session_abs_path = project_root / session_rel_path
|
||||
session_abs_path.mkdir(parents=True)
|
||||
|
||||
registry = LogRegistry(str(registry_path))
|
||||
registry.register_session(session_id, session_rel_path, datetime.now())
|
||||
# No metadata update
|
||||
|
||||
pruner = LogPruner(registry, str(temp_logs_dir))
|
||||
pruner.prune(max_age_days=10, min_size_kb=10)
|
||||
|
||||
assert not session_abs_path.exists()
|
||||
assert session_id not in registry.data
|
||||
|
||||
Reference in New Issue
Block a user