2 Commits

6 changed files with 209 additions and 18 deletions
+2 -1
View File
@@ -263,7 +263,8 @@ class App:
self._autosave_interval = 60.0
self._last_autosave = time.time()
session_logger.open_session()
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
self._init_ai_and_hooks()
@property
+2 -1
View File
@@ -513,7 +513,8 @@ class App:
"cache_creation_input_tokens": 0
}
session_logger.open_session()
label = self.project.get("project", {}).get("name", "")
session_logger.open_session(label=label)
ai_client.set_provider(self.current_provider, self.current_model)
ai_client.confirm_and_run_callback = self._confirm_and_run
ai_client.comms_log_callback = self._on_comms_entry
+60
View File
@@ -0,0 +1,60 @@
import os
import shutil
from datetime import datetime, timedelta
from log_registry import LogRegistry
class LogPruner:
def __init__(self, log_registry: LogRegistry, logs_dir: str):
"""
Initializes the LogPruner.
Args:
log_registry: An instance of LogRegistry to check session data.
logs_dir: The path to the directory containing session sub-directories.
"""
self.log_registry = log_registry
self.logs_dir = logs_dir
def prune(self):
"""
Prunes old and small session directories from the logs directory.
Deletes session directories that meet the following criteria:
1. The session start time is older than 24 hours (based on data from LogRegistry).
2. The session name is NOT in the whitelist provided by the LogRegistry.
3. The total size of all files within the session directory is less than 2KB (2048 bytes).
"""
now = datetime.now()
cutoff_time = now - timedelta(hours=24)
# Ensure the base logs directory exists.
if not os.path.isdir(self.logs_dir):
return
# Get sessions that are old and not whitelisted from the registry
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
# Prune sessions if their size is less than 2048 bytes
for session_info in old_sessions_to_check:
session_id = session_info['session_id']
session_path = session_info['path']
if not session_path or not os.path.isdir(session_path):
continue
# Calculate total size of files in the directory
total_size = 0
try:
for entry in os.scandir(session_path):
if entry.is_file():
total_size += entry.stat().st_size
except OSError:
continue
# Prune if the total size is less than 2KB (2048 bytes)
if total_size < 2048: # 2KB
try:
shutil.rmtree(session_path)
# print(f"Pruned session '{session_id}' (Size: {total_size} bytes)")
except OSError:
pass
+32 -16
View File
@@ -26,46 +26,62 @@ _LOG_DIR = Path("./logs")
_SCRIPTS_DIR = Path("./scripts/generated")
_ts: str = "" # session timestamp string e.g. "20260301_142233"
_session_id: str = "" # YYYYMMDD_HHMMSS[_Label]
_session_dir: Path = None # Path to the sub-directory for this session
_seq: int = 0 # monotonic counter for script files this session
_seq_lock = threading.Lock()
_comms_fh = None # file handle: logs/comms_<ts>.log
_tool_fh = None # file handle: logs/toolcalls_<ts>.log
_api_fh = None # file handle: logs/apihooks_<ts>.log - API hook calls
_cli_fh = None # file handle: logs/clicalls_<ts>.log - CLI subprocess calls
_comms_fh = None # file handle: logs/<session_id>/comms.log
_tool_fh = None # file handle: logs/<session_id>/toolcalls.log
_api_fh = None # file handle: logs/<session_id>/apihooks.log
_cli_fh = None # file handle: logs/<session_id>/clicalls.log
def _now_ts() -> str:
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def open_session():
def open_session(label: str | None = None):
"""
Called once at GUI startup. Creates the log directories if needed and
opens the two log files for this session. Idempotent - a second call is
ignored.
opens the log files for this session within a sub-directory.
"""
global _ts, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
if _comms_fh is not None:
return # already open
_LOG_DIR.mkdir(parents=True, exist_ok=True)
_ts = _now_ts()
_session_id = _ts
if label:
# Sanitize label: remove non-alphanumeric chars
safe_label = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in label)
_session_id += f"_{safe_label}"
_session_dir = _LOG_DIR / _session_id
_session_dir.mkdir(parents=True, exist_ok=True)
_SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
_ts = _now_ts()
_seq = 0
_comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_LOG_DIR / f"clicalls_{_ts}.log", "w", encoding="utf-8", buffering=1) # New log file handle
_comms_fh = open(_session_dir / "comms.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_session_dir / "toolcalls.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_session_dir / "apihooks.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_session_dir / "clicalls.log", "w", encoding="utf-8", buffering=1)
_tool_fh.write(f"# Tool-call log — session {_ts}\n\n")
_tool_fh.write(f"# Tool-call log — session {_session_id}\n\n")
_tool_fh.flush()
_cli_fh.write(f"# CLI Subprocess Call Log — session {_ts}\n\n") # Header for new log file
_cli_fh.write(f"# CLI Subprocess Call Log — session {_session_id}\n\n")
_cli_fh.flush()
# Register this session in the log registry
try:
from log_registry import LogRegistry
registry = LogRegistry(str(_LOG_DIR / "log_registry.toml"))
registry.register_session(_session_id, str(_session_dir), datetime.datetime.now())
except Exception as e:
print(f"Warning: Could not register session in LogRegistry: {e}")
atexit.register(close_session)
+55
View File
@@ -0,0 +1,55 @@
import os
import shutil
import pytest
from pathlib import Path
from datetime import datetime, timedelta
from log_registry import LogRegistry
from log_pruner import LogPruner
@pytest.fixture
def pruner_setup(tmp_path):
logs_dir = tmp_path / "logs"
logs_dir.mkdir()
registry_path = logs_dir / "log_registry.toml"
registry = LogRegistry(str(registry_path))
pruner = LogPruner(registry, str(logs_dir))
return pruner, registry, logs_dir
def test_prune_old_insignificant_logs(pruner_setup):
pruner, registry, logs_dir = pruner_setup
# 1. Old and small (insignificant) -> should be pruned
session_id_old_small = "old_small"
dir_old_small = logs_dir / session_id_old_small
dir_old_small.mkdir()
(dir_old_small / "comms.log").write_text("small") # < 2KB
registry.register_session(session_id_old_small, str(dir_old_small), datetime.now() - timedelta(days=2))
# 2. Old and large (significant) -> should NOT be pruned
session_id_old_large = "old_large"
dir_old_large = logs_dir / session_id_old_large
dir_old_large.mkdir()
(dir_old_large / "comms.log").write_text("x" * 3000) # > 2KB
registry.register_session(session_id_old_large, str(dir_old_large), datetime.now() - timedelta(days=2))
# 3. Recent and small -> should NOT be pruned
session_id_recent_small = "recent_small"
dir_recent_small = logs_dir / session_id_recent_small
dir_recent_small.mkdir()
(dir_recent_small / "comms.log").write_text("small")
registry.register_session(session_id_recent_small, str(dir_recent_small), datetime.now() - timedelta(hours=2))
# 4. Old and whitelisted -> should NOT be pruned
session_id_old_whitelisted = "old_whitelisted"
dir_old_whitelisted = logs_dir / session_id_old_whitelisted
dir_old_whitelisted.mkdir()
(dir_old_whitelisted / "comms.log").write_text("small")
registry.register_session(session_id_old_whitelisted, str(dir_old_whitelisted), datetime.now() - timedelta(days=2))
registry.update_session_metadata(session_id_old_whitelisted, 0, 0, 0, True, "Manual")
pruner.prune()
assert not dir_old_small.exists()
assert dir_old_large.exists()
assert dir_recent_small.exists()
assert dir_old_whitelisted.exists()
+58
View File
@@ -0,0 +1,58 @@
import os
import shutil
import pytest
from pathlib import Path
from datetime import datetime
from unittest.mock import patch
import session_logger
import tomllib
@pytest.fixture
def temp_logs(tmp_path):
# Mock _LOG_DIR in session_logger
original_log_dir = session_logger._LOG_DIR
session_logger._LOG_DIR = tmp_path / "logs"
session_logger._LOG_DIR.mkdir(parents=True, exist_ok=True)
# Mock _SCRIPTS_DIR
original_scripts_dir = session_logger._SCRIPTS_DIR
session_logger._SCRIPTS_DIR = tmp_path / "scripts" / "generated"
session_logger._SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
yield tmp_path / "logs"
# Cleanup: Close handles if open
session_logger.close_session()
session_logger._LOG_DIR = original_log_dir
session_logger._SCRIPTS_DIR = original_scripts_dir
def test_open_session_creates_subdir_and_registry(temp_logs):
label = "test-label"
# We can't easily mock datetime.datetime.now() because it's a built-in
# but we can check the resulting directory name pattern
session_logger.open_session(label=label)
# Check that a subdirectory was created
subdirs = list(temp_logs.iterdir())
# One is the log_registry.toml, one is the session dir
session_dirs = [d for d in subdirs if d.is_dir()]
assert len(session_dirs) == 1
session_dir = session_dirs[0]
assert session_dir.name.endswith(f"_{label}")
# Check for log files
assert (session_dir / "comms.log").exists()
assert (session_dir / "toolcalls.log").exists()
assert (session_dir / "apihooks.log").exists()
assert (session_dir / "clicalls.log").exists()
# Check registry
registry_path = temp_logs / "log_registry.toml"
assert registry_path.exists()
with open(registry_path, "rb") as f:
data = tomllib.load(f)
assert session_dir.name in data
assert data[session_dir.name]["path"] == str(session_dir)