Private
Public Access
0
0
Files
manual_slop/src/log_registry.py
T
ed 3816a54d27 feat(log): add Session + SessionMetadata dataclasses (t4_1-t4_8)
Phase 4 of any_type_componentization_20260621. Promotes the 2-level
dict[str, dict[str, Any]] structure in src/log_registry.py to typed
Session + SessionMetadata dataclasses (7 Any sites):

NEW dataclasses (inline in src/log_registry.py):
- SessionMetadata (frozen): message_count, errors, size_kb, whitelisted,
  reason, timestamp
- Session (frozen): session_id, path, start_time, whitelisted, metadata
- to_dict() / from_dict() classmethod for round-trip with TOML shape
- Backward-compat __getitem__ / get() so existing test_log_registry.py
  tests that use session_data['path'] / session_data.get('metadata')
  continue to work

REFACTOR LogRegistry:
- self.data: dict[str, dict[str, Any]] -> dict[str, Session]
- load_registry: populates with Session.from_dict(...)
- save_registry: serializes via session.to_dict()
- register_session: creates Session dataclass
- update_session_metadata: creates new Session with updated SessionMetadata
- is_session_whitelisted: reads session.whitelisted
- update_auto_whitelist_status: reads session.path
- get_old_non_whitelisted_sessions: reads session.start_time + metadata

NEW tests/test_log_registry_dataclasses.py (13 tests, all pass):
- test_session_dataclass_construction
- test_session_metadata_dataclass_construction
- test_session_from_dict_basic / with_metadata
- test_session_to_dict_round_trip
- test_session_metadata_to_dict
- test_log_registry_data_is_typed
- test_log_registry_register_session_returns_session
- test_log_registry_update_session_metadata_sets_metadata
- test_log_registry_is_session_whitelisted
- test_log_registry_get_old_non_whitelisted_sessions
- test_session_is_frozen
- test_session_metadata_is_frozen

Verified:
  uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30
    18 passed in 3.27s (5 existing + 13 new)
2026-06-22 01:00:00 -04:00

431 lines
16 KiB
Python

"""
Log Registry - Session metadata persistence for log management.
This module provides the LogRegistry class for tracking session logs
in a persistent TOML registry file. It supports session registration,
metadata updates, whitelisting, and age-based pruning queries.
Key Features:
- Persistent TOML-based registry (log_registry.toml)
- Session registration with path and start time
- Automatic whitelisting based on heuristics (errors, message count, size)
- Age-based session queries for log pruning
- Thread-safe file operations (via atomic TOML writes)
Registry File Format (log_registry.toml):
[session_id]
path = "logs/sessions/session_id"
start_time = "2024-01-15T10:30:00"
whitelisted = false
[session_id.metadata]
message_count = 42
errors = 0
size_kb = 15
reason = "High message count: 42"
Integration:
- Used by session_logger.py for session registration
- Used by log_pruner.py for age-based cleanup
- Called from gui_2.py for log management UI
Thread Safety:
- File operations use atomic write (tomli_w.dump)
- In-memory data dict is not thread-safe for concurrent access
See Also:
- src/session_logger.py for session lifecycle
- src/log_pruner.py for automated cleanup
- src/paths.py for registry path resolution
"""
from __future__ import annotations
import os
import tomli_w
import tomllib
from dataclasses import dataclass
from datetime import datetime
from typing import Any, Optional
from src.result_types import Result, ErrorInfo, ErrorKind
@dataclass(frozen=True)
class SessionMetadata:
message_count: int = 0
errors: int = 0
size_kb: int = 0
whitelisted: bool = False
reason: str = ''
timestamp: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
return {
"message_count": self.message_count,
"errors": self.errors,
"size_kb": self.size_kb,
"whitelisted": self.whitelisted,
"reason": self.reason,
"timestamp": self.timestamp,
}
@dataclass(frozen=True)
class Session:
session_id: str
path: str
start_time: str
whitelisted: bool = False
metadata: Optional[SessionMetadata] = None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"path": self.path,
"start_time": self.start_time,
"whitelisted": self.whitelisted,
}
if self.metadata is not None:
d["metadata"] = self.metadata.to_dict()
else:
d["metadata"] = None
return d
def __getitem__(self, key: str) -> Any:
"""Backward-compat: dict-like access (e.g., session['path'])."""
if key == "path":
return self.path
if key == "start_time":
return self.start_time
if key == "whitelisted":
return self.whitelisted
if key == "metadata":
return self.metadata.to_dict() if self.metadata is not None else None
raise KeyError(key)
def get(self, key: str, default: Any = None) -> Any:
"""Backward-compat: dict.get."""
try:
return self[key]
except KeyError:
return default
@classmethod
def from_dict(cls, session_id: str, d: dict[str, Any]) -> Session:
metadata_raw = d.get("metadata")
metadata: Optional[SessionMetadata] = None
if isinstance(metadata_raw, dict):
metadata = SessionMetadata(
message_count=int(metadata_raw.get("message_count", 0)),
errors=int(metadata_raw.get("errors", 0)),
size_kb=int(metadata_raw.get("size_kb", 0)),
whitelisted=bool(metadata_raw.get("whitelisted", False)),
reason=str(metadata_raw.get("reason", "")),
timestamp=metadata_raw.get("timestamp"),
)
elif metadata_raw is not None:
metadata = metadata_raw
return cls(
session_id=session_id,
path=str(d.get("path", "")),
start_time=str(d.get("start_time", "")),
whitelisted=bool(d.get("whitelisted", False)),
metadata=metadata,
)
class LogRegistry:
"""
Manages a persistent registry of session logs using a TOML file.
Tracks session paths, start times, whitelisting status, and metadata.
"""
def __init__(self, registry_path: str) -> None:
"""
Initializes the LogRegistry with a path to the registry file.
Args:
registry_path (str): The file path to the TOML registry.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.registry_path = registry_path
self.data: dict[str, Session] = {}
self.load_registry()
@property
def sessions(self) -> dict[str, dict[str, Any]]:
"""Alias for compatibility with older code/tests."""
return self.data
def load_registry(self) -> None:
"""
Loads the registry data from the TOML file into memory.
Handles date/time conversions from TOML-native formats to strings for consistency.
"""
if os.path.exists(self.registry_path):
try:
with open(self.registry_path, 'rb') as f:
loaded_data = tomllib.load(f)
# Keep data as it is from TOML (strings or native datetimes)
# If we want to satisfy tests that expect strings, we ensure they are strings.
self.data = {}
for session_id, session_data in loaded_data.items():
new_session_data = session_data.copy()
# If tomllib parsed it as a datetime, convert it back to string for the tests
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
m = new_session_data['metadata']
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
m['timestamp'] = m['timestamp'].isoformat()
self.data[session_id] = Session.from_dict(session_id, new_session_data)
except Exception as e:
print(f"Error loading registry from {self.registry_path}: {e}")
self.data = {}
else:
self.data = {}
def save_registry(self) -> Result[bool]:
"""
Serializes and saves the current registry data to the TOML file.
Converts internal datetime objects to ISO format strings for compatibility.
[C: tests/test_logging_e2e.py:test_logging_e2e]
"""
try:
# Convert datetime objects to ISO format strings for TOML serialization
data_to_save: dict[str, Any] = {}
for session_id, session in self.data.items():
session_dict = session.to_dict()
filtered: dict[str, Any] = {}
for k, v in session_dict.items():
if v is None:
continue
if k == 'start_time' and isinstance(v, datetime):
filtered[k] = v.isoformat()
elif k == 'metadata' and isinstance(v, dict):
metadata_copy: dict[str, Any] = {}
for mk, mv in v.items():
if mv is None:
continue
if mk == 'timestamp' and isinstance(mv, datetime):
metadata_copy[mk] = mv.isoformat()
else:
metadata_copy[mk] = mv
filtered[k] = metadata_copy
else:
filtered[k] = v
data_to_save[session_id] = filtered
with open(self.registry_path, 'wb') as f:
tomli_w.dump(data_to_save, f)
return Result(data=True)
except OSError as e:
return Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="log_registry.save_registry", original=e)])
def register_session(self, session_id: str, path: str, start_time: datetime | str) -> None:
"""
Registers a new session in the registry.
Args:
session_id (str): Unique identifier for the session.
path (str): File path to the session's log directory.
start_time (datetime|str): The timestamp when the session started.
[C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e]
"""
if session_id in self.data:
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
# Store start_time internally as a string to satisfy tests
if isinstance(start_time, datetime):
start_time_str = start_time.isoformat()
else:
start_time_str = start_time
self.data[session_id] = Session(
session_id=session_id,
path=path,
start_time=start_time_str,
whitelisted=False,
metadata=None,
)
self.save_registry()
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
"""
Updates metadata fields for an existing session.
Args:
session_id (str): Unique identifier for the session.
message_count (int): Total number of messages in the session.
errors (int): Number of errors identified in logs.
size_kb (int): Total size of the session logs in kilobytes.
whitelisted (bool): Whether the session should be protected from pruning.
reason (str): Explanation for the current whitelisting status.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata]
"""
if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for metadata update.")
return
existing = self.data[session_id]
new_metadata = SessionMetadata(
message_count=message_count,
errors=errors,
size_kb=size_kb,
whitelisted=whitelisted,
reason=reason,
timestamp=existing.metadata.timestamp if existing.metadata else None,
)
self.data[session_id] = Session(
session_id=existing.session_id,
path=existing.path,
start_time=existing.start_time,
whitelisted=whitelisted,
metadata=new_metadata,
)
self.save_registry() # Save after update
def set_session_start_time(self, session_id: str, start_time: datetime | str) -> None:
"""
Updates the start_time of an existing session.
Used by tests and maintenance tools to backdate a session for pruning
verification. Creates a new Session with the updated start_time while
preserving all other fields (Session is frozen).
Args:
session_id (str): Unique identifier for the session.
start_time (datetime|str): The new start timestamp.
[C: tests/test_logging_e2e.py:test_logging_e2e]
"""
if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for start_time update.")
return
if isinstance(start_time, datetime):
start_time_str: str = start_time.isoformat()
else:
start_time_str = start_time
existing = self.data[session_id]
self.data[session_id] = Session(
session_id=existing.session_id,
path=existing.path,
start_time=start_time_str,
whitelisted=existing.whitelisted,
metadata=existing.metadata,
)
self.save_registry()
def is_session_whitelisted(self, session_id: str) -> bool:
"""
Checks if a specific session is marked as whitelisted.
Args:
session_id (str): Unique identifier for the session.
Returns:
bool: True if whitelisted, False otherwise.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
"""
session = self.data.get(session_id)
if session is None:
return False # Non-existent sessions are not whitelisted
return session.whitelisted
def update_auto_whitelist_status(self, session_id: str) -> None:
"""
Analyzes session logs and updates whitelisting status based on heuristics.
Sessions are automatically whitelisted if they contain error keywords,
have a high message count, or exceed a size threshold.
Args:
session_id (str): Unique identifier for the session to analyze.
[C: src/session_logger.py:close_session]
"""
if session_id not in self.data:
return
session_data = self.data[session_id]
session_path = session_data.path
if not session_path or not os.path.isdir(str(session_path)):
return
total_size_bytes = 0
message_count = 0
found_keywords = []
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
try:
for entry in os.scandir(str(session_path)):
if entry.is_file():
size = entry.stat().st_size
total_size_bytes += size
# Analyze comms.log for messages and keywords
if entry.name == "comms.log":
try:
with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
message_count += 1
for kw in keywords_to_check:
if kw in line and kw not in found_keywords:
found_keywords.append(kw)
except OSError as e:
import sys
sys.stderr.write(f"[LogRegistry] read comms.log entry failed: {e}\n")
except OSError as e:
import sys
sys.stderr.write(f"[LogRegistry] scan session_path failed: {e}\n")
size_kb = total_size_bytes / 1024
whitelisted = False
reason = ""
if found_keywords:
whitelisted = True
reason = f"Found keywords: {', '.join(found_keywords)}"
elif message_count > 10:
whitelisted = True
reason = f"High message count: {message_count}"
elif size_kb > 50:
whitelisted = True
reason = f"Large session size: {size_kb:.1f} KB"
self.update_session_metadata(
session_id,
message_count = message_count,
errors = len(found_keywords),
size_kb = int(size_kb),
whitelisted = whitelisted,
reason = reason
)
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]:
"""
Retrieves a list of sessions that are older than a specific cutoff time
and are not marked as whitelisted.
Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0).
Args:
cutoff_datetime (datetime): The threshold time for identifying old sessions.
Returns:
list: A list of dictionaries containing session details (id, path, start_time).
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
"""
old_sessions = []
for session_id, session in self.data.items():
# Check if session is older than cutoff and not whitelisted
start_time_raw = session.start_time
if isinstance(start_time_raw, str):
try:
start_time = datetime.fromisoformat(start_time_raw)
except ValueError:
start_time = None
else:
start_time = start_time_raw
is_whitelisted = session.whitelisted
# Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata
metadata = session.metadata
if metadata is None:
is_empty = True
else:
is_empty = (metadata.message_count == 0 or metadata.size_kb == 0)
if not is_whitelisted:
if is_empty or (start_time is not None and start_time < cutoff_datetime):
old_sessions.append({
'session_id': session_id,
'path': session.path,
'start_time': start_time_raw
})
return old_sessions