Files
manual_slop/src/log_pruner.py

118 lines
4.1 KiB
Python

import os
import shutil
import sys
import time
from datetime import datetime, timedelta
from src.log_registry import LogRegistry
class LogPruner:
"""
Handles the automated deletion of old and insignificant session logs.
Ensures that only whitelisted or significant sessions (based on size/content)
are preserved long-term.
"""
def __init__(self, log_registry: LogRegistry, logs_dir: str) -> None:
"""
Initializes the LogPruner.
Args:
log_registry: An instance of LogRegistry to check session data.
logs_dir: The path to the directory containing session sub-directories.
"""
self.log_registry = log_registry
self.logs_dir = logs_dir
def prune(self, max_age_days: int = 1, min_size_kb: int = 2) -> None:
"""
Prunes old and small session directories from the logs directory.
Deletes session directories that meet the following criteria:
1. The session start time is older than max_age_days.
2. The session name is NOT in the whitelist provided by the LogRegistry.
3. The total size of all files within the session directory is less than min_size_kb.
"""
now = datetime.now()
cutoff_time = now - timedelta(days=max_age_days)
# Ensure the base logs directory exists.
if not os.path.isdir(self.logs_dir):
return
# Get sessions that are old and not whitelisted from the registry
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
# Project root is two levels up from logs/sessions
project_root = os.path.dirname(os.path.dirname(os.path.abspath(self.logs_dir)))
# Prune sessions if their size is less than threshold
for session_info in old_sessions_to_check:
session_id = session_info['session_id']
session_path = session_info['path']
if not session_path:
continue
# RESOLUTION STRATEGY:
# 1. Try as-is (absolute or relative to project root)
# 2. Try as a sub-directory of self.logs_dir (e.g. logs/sessions/session_id)
# 3. Try relative to parent of logs_dir if it starts with 'logs/'
candidates = []
if os.path.isabs(session_path):
candidates.append(session_path)
else:
candidates.append(os.path.abspath(os.path.join(project_root, session_path)))
candidates.append(os.path.abspath(os.path.join(self.logs_dir, session_id)))
candidates.append(os.path.abspath(os.path.join(self.logs_dir, os.path.basename(session_path))))
resolved_path = None
for cand in candidates:
if os.path.isdir(cand):
resolved_path = cand
break
if not resolved_path:
# If we can't find it, we still remove it from the registry if it's "empty"
# so it stops cluttering the UI.
sys.stderr.write(f"[LogPruner] Could not find directory for {session_id} in candidates. Removing registry entry.\n")
if session_id in self.log_registry.data:
del self.log_registry.data[session_id]
continue
# Calculate total size of files in the directory
total_size = 0
try:
for entry in os.scandir(resolved_path):
if entry.is_file():
total_size += entry.stat().st_size
except OSError as e:
sys.stderr.write(f"[LogPruner] Error scanning {resolved_path}: {e}\n")
continue
# Prune if the total size is less than threshold
if total_size < (min_size_kb * 1024) or total_size == 0:
try:
sys.stderr.write(f"[LogPruner] Removing {session_id} at {resolved_path} (Size: {total_size} bytes)\n")
# Windows specific: sometimes files are locked.
# We try a few times with small delays.
def remove_readonly(func, path, excinfo):
os.chmod(path, 0o777)
func(path)
for attempt in range(3):
try:
shutil.rmtree(resolved_path, onerror=remove_readonly)
break
except OSError:
if attempt < 2:
time.sleep(0.1)
else:
raise
# Also remove from registry to keep it in sync
if session_id in self.log_registry.data:
del self.log_registry.data[session_id]
except OSError as e:
sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n")
self.log_registry.save_registry()