Compare commits
2 Commits
10fbfd0f54
...
bd2a79c090
| Author | SHA1 | Date | |
|---|---|---|---|
| bd2a79c090 | |||
| 3f4dc1ae03 |
@@ -263,7 +263,8 @@ class App:
|
||||
self._autosave_interval = 60.0
|
||||
self._last_autosave = time.time()
|
||||
|
||||
session_logger.open_session()
|
||||
label = self.project.get("project", {}).get("name", "")
|
||||
session_logger.open_session(label=label)
|
||||
self._init_ai_and_hooks()
|
||||
|
||||
@property
|
||||
|
||||
+2
-1
@@ -513,7 +513,8 @@ class App:
|
||||
"cache_creation_input_tokens": 0
|
||||
}
|
||||
|
||||
session_logger.open_session()
|
||||
label = self.project.get("project", {}).get("name", "")
|
||||
session_logger.open_session(label=label)
|
||||
ai_client.set_provider(self.current_provider, self.current_model)
|
||||
ai_client.confirm_and_run_callback = self._confirm_and_run
|
||||
ai_client.comms_log_callback = self._on_comms_entry
|
||||
|
||||
@@ -0,0 +1,60 @@
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime, timedelta
|
||||
from log_registry import LogRegistry
|
||||
|
||||
class LogPruner:
|
||||
def __init__(self, log_registry: LogRegistry, logs_dir: str):
|
||||
"""
|
||||
Initializes the LogPruner.
|
||||
|
||||
Args:
|
||||
log_registry: An instance of LogRegistry to check session data.
|
||||
logs_dir: The path to the directory containing session sub-directories.
|
||||
"""
|
||||
self.log_registry = log_registry
|
||||
self.logs_dir = logs_dir
|
||||
|
||||
def prune(self):
|
||||
"""
|
||||
Prunes old and small session directories from the logs directory.
|
||||
|
||||
Deletes session directories that meet the following criteria:
|
||||
1. The session start time is older than 24 hours (based on data from LogRegistry).
|
||||
2. The session name is NOT in the whitelist provided by the LogRegistry.
|
||||
3. The total size of all files within the session directory is less than 2KB (2048 bytes).
|
||||
"""
|
||||
now = datetime.now()
|
||||
cutoff_time = now - timedelta(hours=24)
|
||||
|
||||
# Ensure the base logs directory exists.
|
||||
if not os.path.isdir(self.logs_dir):
|
||||
return
|
||||
|
||||
# Get sessions that are old and not whitelisted from the registry
|
||||
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
|
||||
|
||||
# Prune sessions if their size is less than 2048 bytes
|
||||
for session_info in old_sessions_to_check:
|
||||
session_id = session_info['session_id']
|
||||
session_path = session_info['path']
|
||||
|
||||
if not session_path or not os.path.isdir(session_path):
|
||||
continue
|
||||
|
||||
# Calculate total size of files in the directory
|
||||
total_size = 0
|
||||
try:
|
||||
for entry in os.scandir(session_path):
|
||||
if entry.is_file():
|
||||
total_size += entry.stat().st_size
|
||||
except OSError:
|
||||
continue
|
||||
|
||||
# Prune if the total size is less than 2KB (2048 bytes)
|
||||
if total_size < 2048: # 2KB
|
||||
try:
|
||||
shutil.rmtree(session_path)
|
||||
# print(f"Pruned session '{session_id}' (Size: {total_size} bytes)")
|
||||
except OSError:
|
||||
pass
|
||||
+32
-16
@@ -26,46 +26,62 @@ _LOG_DIR = Path("./logs")
|
||||
_SCRIPTS_DIR = Path("./scripts/generated")
|
||||
|
||||
_ts: str = "" # session timestamp string e.g. "20260301_142233"
|
||||
_session_id: str = "" # YYYYMMDD_HHMMSS[_Label]
|
||||
_session_dir: Path = None # Path to the sub-directory for this session
|
||||
_seq: int = 0 # monotonic counter for script files this session
|
||||
_seq_lock = threading.Lock()
|
||||
|
||||
_comms_fh = None # file handle: logs/comms_<ts>.log
|
||||
_tool_fh = None # file handle: logs/toolcalls_<ts>.log
|
||||
_api_fh = None # file handle: logs/apihooks_<ts>.log - API hook calls
|
||||
_cli_fh = None # file handle: logs/clicalls_<ts>.log - CLI subprocess calls
|
||||
_comms_fh = None # file handle: logs/<session_id>/comms.log
|
||||
_tool_fh = None # file handle: logs/<session_id>/toolcalls.log
|
||||
_api_fh = None # file handle: logs/<session_id>/apihooks.log
|
||||
_cli_fh = None # file handle: logs/<session_id>/clicalls.log
|
||||
|
||||
|
||||
def _now_ts() -> str:
|
||||
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
|
||||
|
||||
def open_session():
|
||||
def open_session(label: str | None = None):
|
||||
"""
|
||||
Called once at GUI startup. Creates the log directories if needed and
|
||||
opens the two log files for this session. Idempotent - a second call is
|
||||
ignored.
|
||||
opens the log files for this session within a sub-directory.
|
||||
"""
|
||||
global _ts, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
|
||||
global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
|
||||
|
||||
if _comms_fh is not None:
|
||||
return # already open
|
||||
|
||||
_LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
_ts = _now_ts()
|
||||
_session_id = _ts
|
||||
if label:
|
||||
# Sanitize label: remove non-alphanumeric chars
|
||||
safe_label = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in label)
|
||||
_session_id += f"_{safe_label}"
|
||||
|
||||
_session_dir = _LOG_DIR / _session_id
|
||||
_session_dir.mkdir(parents=True, exist_ok=True)
|
||||
_SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
_ts = _now_ts()
|
||||
_seq = 0
|
||||
|
||||
_comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1)
|
||||
_tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1)
|
||||
_api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1)
|
||||
_cli_fh = open(_LOG_DIR / f"clicalls_{_ts}.log", "w", encoding="utf-8", buffering=1) # New log file handle
|
||||
_comms_fh = open(_session_dir / "comms.log", "w", encoding="utf-8", buffering=1)
|
||||
_tool_fh = open(_session_dir / "toolcalls.log", "w", encoding="utf-8", buffering=1)
|
||||
_api_fh = open(_session_dir / "apihooks.log", "w", encoding="utf-8", buffering=1)
|
||||
_cli_fh = open(_session_dir / "clicalls.log", "w", encoding="utf-8", buffering=1)
|
||||
|
||||
_tool_fh.write(f"# Tool-call log — session {_ts}\n\n")
|
||||
_tool_fh.write(f"# Tool-call log — session {_session_id}\n\n")
|
||||
_tool_fh.flush()
|
||||
_cli_fh.write(f"# CLI Subprocess Call Log — session {_ts}\n\n") # Header for new log file
|
||||
_cli_fh.write(f"# CLI Subprocess Call Log — session {_session_id}\n\n")
|
||||
_cli_fh.flush()
|
||||
|
||||
# Register this session in the log registry
|
||||
try:
|
||||
from log_registry import LogRegistry
|
||||
registry = LogRegistry(str(_LOG_DIR / "log_registry.toml"))
|
||||
registry.register_session(_session_id, str(_session_dir), datetime.datetime.now())
|
||||
except Exception as e:
|
||||
print(f"Warning: Could not register session in LogRegistry: {e}")
|
||||
|
||||
atexit.register(close_session)
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,55 @@
|
||||
import os
|
||||
import shutil
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from datetime import datetime, timedelta
|
||||
from log_registry import LogRegistry
|
||||
from log_pruner import LogPruner
|
||||
|
||||
@pytest.fixture
|
||||
def pruner_setup(tmp_path):
|
||||
logs_dir = tmp_path / "logs"
|
||||
logs_dir.mkdir()
|
||||
registry_path = logs_dir / "log_registry.toml"
|
||||
registry = LogRegistry(str(registry_path))
|
||||
pruner = LogPruner(registry, str(logs_dir))
|
||||
return pruner, registry, logs_dir
|
||||
|
||||
def test_prune_old_insignificant_logs(pruner_setup):
|
||||
pruner, registry, logs_dir = pruner_setup
|
||||
|
||||
# 1. Old and small (insignificant) -> should be pruned
|
||||
session_id_old_small = "old_small"
|
||||
dir_old_small = logs_dir / session_id_old_small
|
||||
dir_old_small.mkdir()
|
||||
(dir_old_small / "comms.log").write_text("small") # < 2KB
|
||||
registry.register_session(session_id_old_small, str(dir_old_small), datetime.now() - timedelta(days=2))
|
||||
|
||||
# 2. Old and large (significant) -> should NOT be pruned
|
||||
session_id_old_large = "old_large"
|
||||
dir_old_large = logs_dir / session_id_old_large
|
||||
dir_old_large.mkdir()
|
||||
(dir_old_large / "comms.log").write_text("x" * 3000) # > 2KB
|
||||
registry.register_session(session_id_old_large, str(dir_old_large), datetime.now() - timedelta(days=2))
|
||||
|
||||
# 3. Recent and small -> should NOT be pruned
|
||||
session_id_recent_small = "recent_small"
|
||||
dir_recent_small = logs_dir / session_id_recent_small
|
||||
dir_recent_small.mkdir()
|
||||
(dir_recent_small / "comms.log").write_text("small")
|
||||
registry.register_session(session_id_recent_small, str(dir_recent_small), datetime.now() - timedelta(hours=2))
|
||||
|
||||
# 4. Old and whitelisted -> should NOT be pruned
|
||||
session_id_old_whitelisted = "old_whitelisted"
|
||||
dir_old_whitelisted = logs_dir / session_id_old_whitelisted
|
||||
dir_old_whitelisted.mkdir()
|
||||
(dir_old_whitelisted / "comms.log").write_text("small")
|
||||
registry.register_session(session_id_old_whitelisted, str(dir_old_whitelisted), datetime.now() - timedelta(days=2))
|
||||
registry.update_session_metadata(session_id_old_whitelisted, 0, 0, 0, True, "Manual")
|
||||
|
||||
pruner.prune()
|
||||
|
||||
assert not dir_old_small.exists()
|
||||
assert dir_old_large.exists()
|
||||
assert dir_recent_small.exists()
|
||||
assert dir_old_whitelisted.exists()
|
||||
@@ -0,0 +1,58 @@
|
||||
import os
|
||||
import shutil
|
||||
import pytest
|
||||
from pathlib import Path
|
||||
from datetime import datetime
|
||||
from unittest.mock import patch
|
||||
import session_logger
|
||||
import tomllib
|
||||
|
||||
@pytest.fixture
|
||||
def temp_logs(tmp_path):
|
||||
# Mock _LOG_DIR in session_logger
|
||||
original_log_dir = session_logger._LOG_DIR
|
||||
session_logger._LOG_DIR = tmp_path / "logs"
|
||||
session_logger._LOG_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Mock _SCRIPTS_DIR
|
||||
original_scripts_dir = session_logger._SCRIPTS_DIR
|
||||
session_logger._SCRIPTS_DIR = tmp_path / "scripts" / "generated"
|
||||
session_logger._SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
yield tmp_path / "logs"
|
||||
|
||||
# Cleanup: Close handles if open
|
||||
session_logger.close_session()
|
||||
session_logger._LOG_DIR = original_log_dir
|
||||
session_logger._SCRIPTS_DIR = original_scripts_dir
|
||||
|
||||
def test_open_session_creates_subdir_and_registry(temp_logs):
|
||||
label = "test-label"
|
||||
# We can't easily mock datetime.datetime.now() because it's a built-in
|
||||
# but we can check the resulting directory name pattern
|
||||
|
||||
session_logger.open_session(label=label)
|
||||
|
||||
# Check that a subdirectory was created
|
||||
subdirs = list(temp_logs.iterdir())
|
||||
# One is the log_registry.toml, one is the session dir
|
||||
session_dirs = [d for d in subdirs if d.is_dir()]
|
||||
assert len(session_dirs) == 1
|
||||
session_dir = session_dirs[0]
|
||||
|
||||
assert session_dir.name.endswith(f"_{label}")
|
||||
|
||||
# Check for log files
|
||||
assert (session_dir / "comms.log").exists()
|
||||
assert (session_dir / "toolcalls.log").exists()
|
||||
assert (session_dir / "apihooks.log").exists()
|
||||
assert (session_dir / "clicalls.log").exists()
|
||||
|
||||
# Check registry
|
||||
registry_path = temp_logs / "log_registry.toml"
|
||||
assert registry_path.exists()
|
||||
|
||||
with open(registry_path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
assert session_dir.name in data
|
||||
assert data[session_dir.name]["path"] == str(session_dir)
|
||||
Reference in New Issue
Block a user