3816a54d27
Phase 4 of any_type_componentization_20260621. Promotes the 2-level
dict[str, dict[str, Any]] structure in src/log_registry.py to typed
Session + SessionMetadata dataclasses (7 Any sites):
NEW dataclasses (inline in src/log_registry.py):
- SessionMetadata (frozen): message_count, errors, size_kb, whitelisted,
reason, timestamp
- Session (frozen): session_id, path, start_time, whitelisted, metadata
- to_dict() / from_dict() classmethod for round-trip with TOML shape
- Backward-compat __getitem__ / get() so existing test_log_registry.py
tests that use session_data['path'] / session_data.get('metadata')
continue to work
REFACTOR LogRegistry:
- self.data: dict[str, dict[str, Any]] -> dict[str, Session]
- load_registry: populates with Session.from_dict(...)
- save_registry: serializes via session.to_dict()
- register_session: creates Session dataclass
- update_session_metadata: creates new Session with updated SessionMetadata
- is_session_whitelisted: reads session.whitelisted
- update_auto_whitelist_status: reads session.path
- get_old_non_whitelisted_sessions: reads session.start_time + metadata
NEW tests/test_log_registry_dataclasses.py (13 tests, all pass):
- test_session_dataclass_construction
- test_session_metadata_dataclass_construction
- test_session_from_dict_basic / with_metadata
- test_session_to_dict_round_trip
- test_session_metadata_to_dict
- test_log_registry_data_is_typed
- test_log_registry_register_session_returns_session
- test_log_registry_update_session_metadata_sets_metadata
- test_log_registry_is_session_whitelisted
- test_log_registry_get_old_non_whitelisted_sessions
- test_session_is_frozen
- test_session_metadata_is_frozen
Verified:
uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30
18 passed in 3.27s (5 existing + 13 new)
431 lines
16 KiB
Python
431 lines
16 KiB
Python
"""
|
|
Log Registry - Session metadata persistence for log management.
|
|
|
|
This module provides the LogRegistry class for tracking session logs
|
|
in a persistent TOML registry file. It supports session registration,
|
|
metadata updates, whitelisting, and age-based pruning queries.
|
|
|
|
Key Features:
|
|
- Persistent TOML-based registry (log_registry.toml)
|
|
- Session registration with path and start time
|
|
- Automatic whitelisting based on heuristics (errors, message count, size)
|
|
- Age-based session queries for log pruning
|
|
- Thread-safe file operations (via atomic TOML writes)
|
|
|
|
Registry File Format (log_registry.toml):
|
|
[session_id]
|
|
path = "logs/sessions/session_id"
|
|
start_time = "2024-01-15T10:30:00"
|
|
whitelisted = false
|
|
[session_id.metadata]
|
|
message_count = 42
|
|
errors = 0
|
|
size_kb = 15
|
|
reason = "High message count: 42"
|
|
|
|
Integration:
|
|
- Used by session_logger.py for session registration
|
|
- Used by log_pruner.py for age-based cleanup
|
|
- Called from gui_2.py for log management UI
|
|
|
|
Thread Safety:
|
|
- File operations use atomic write (tomli_w.dump)
|
|
- In-memory data dict is not thread-safe for concurrent access
|
|
|
|
See Also:
|
|
- src/session_logger.py for session lifecycle
|
|
- src/log_pruner.py for automated cleanup
|
|
- src/paths.py for registry path resolution
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
import tomli_w
|
|
import tomllib
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from typing import Any, Optional
|
|
|
|
from src.result_types import Result, ErrorInfo, ErrorKind
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class SessionMetadata:
|
|
message_count: int = 0
|
|
errors: int = 0
|
|
size_kb: int = 0
|
|
whitelisted: bool = False
|
|
reason: str = ''
|
|
timestamp: Optional[str] = None
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"message_count": self.message_count,
|
|
"errors": self.errors,
|
|
"size_kb": self.size_kb,
|
|
"whitelisted": self.whitelisted,
|
|
"reason": self.reason,
|
|
"timestamp": self.timestamp,
|
|
}
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Session:
|
|
session_id: str
|
|
path: str
|
|
start_time: str
|
|
whitelisted: bool = False
|
|
metadata: Optional[SessionMetadata] = None
|
|
|
|
def to_dict(self) -> dict[str, Any]:
|
|
d: dict[str, Any] = {
|
|
"path": self.path,
|
|
"start_time": self.start_time,
|
|
"whitelisted": self.whitelisted,
|
|
}
|
|
if self.metadata is not None:
|
|
d["metadata"] = self.metadata.to_dict()
|
|
else:
|
|
d["metadata"] = None
|
|
return d
|
|
|
|
def __getitem__(self, key: str) -> Any:
|
|
"""Backward-compat: dict-like access (e.g., session['path'])."""
|
|
if key == "path":
|
|
return self.path
|
|
if key == "start_time":
|
|
return self.start_time
|
|
if key == "whitelisted":
|
|
return self.whitelisted
|
|
if key == "metadata":
|
|
return self.metadata.to_dict() if self.metadata is not None else None
|
|
raise KeyError(key)
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
"""Backward-compat: dict.get."""
|
|
try:
|
|
return self[key]
|
|
except KeyError:
|
|
return default
|
|
|
|
@classmethod
|
|
def from_dict(cls, session_id: str, d: dict[str, Any]) -> Session:
|
|
metadata_raw = d.get("metadata")
|
|
metadata: Optional[SessionMetadata] = None
|
|
if isinstance(metadata_raw, dict):
|
|
metadata = SessionMetadata(
|
|
message_count=int(metadata_raw.get("message_count", 0)),
|
|
errors=int(metadata_raw.get("errors", 0)),
|
|
size_kb=int(metadata_raw.get("size_kb", 0)),
|
|
whitelisted=bool(metadata_raw.get("whitelisted", False)),
|
|
reason=str(metadata_raw.get("reason", "")),
|
|
timestamp=metadata_raw.get("timestamp"),
|
|
)
|
|
elif metadata_raw is not None:
|
|
metadata = metadata_raw
|
|
return cls(
|
|
session_id=session_id,
|
|
path=str(d.get("path", "")),
|
|
start_time=str(d.get("start_time", "")),
|
|
whitelisted=bool(d.get("whitelisted", False)),
|
|
metadata=metadata,
|
|
)
|
|
|
|
|
|
class LogRegistry:
|
|
"""
|
|
Manages a persistent registry of session logs using a TOML file.
|
|
Tracks session paths, start times, whitelisting status, and metadata.
|
|
"""
|
|
|
|
def __init__(self, registry_path: str) -> None:
|
|
"""
|
|
Initializes the LogRegistry with a path to the registry file.
|
|
|
|
Args:
|
|
registry_path (str): The file path to the TOML registry.
|
|
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
|
"""
|
|
self.registry_path = registry_path
|
|
self.data: dict[str, Session] = {}
|
|
self.load_registry()
|
|
|
|
@property
|
|
def sessions(self) -> dict[str, dict[str, Any]]:
|
|
"""Alias for compatibility with older code/tests."""
|
|
return self.data
|
|
|
|
def load_registry(self) -> None:
|
|
"""
|
|
Loads the registry data from the TOML file into memory.
|
|
Handles date/time conversions from TOML-native formats to strings for consistency.
|
|
"""
|
|
if os.path.exists(self.registry_path):
|
|
try:
|
|
with open(self.registry_path, 'rb') as f:
|
|
loaded_data = tomllib.load(f)
|
|
# Keep data as it is from TOML (strings or native datetimes)
|
|
# If we want to satisfy tests that expect strings, we ensure they are strings.
|
|
self.data = {}
|
|
for session_id, session_data in loaded_data.items():
|
|
new_session_data = session_data.copy()
|
|
# If tomllib parsed it as a datetime, convert it back to string for the tests
|
|
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
|
|
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
|
|
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
|
|
m = new_session_data['metadata']
|
|
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
|
|
m['timestamp'] = m['timestamp'].isoformat()
|
|
self.data[session_id] = Session.from_dict(session_id, new_session_data)
|
|
except Exception as e:
|
|
print(f"Error loading registry from {self.registry_path}: {e}")
|
|
self.data = {}
|
|
else:
|
|
self.data = {}
|
|
|
|
def save_registry(self) -> Result[bool]:
|
|
"""
|
|
Serializes and saves the current registry data to the TOML file.
|
|
Converts internal datetime objects to ISO format strings for compatibility.
|
|
[C: tests/test_logging_e2e.py:test_logging_e2e]
|
|
"""
|
|
try:
|
|
# Convert datetime objects to ISO format strings for TOML serialization
|
|
data_to_save: dict[str, Any] = {}
|
|
for session_id, session in self.data.items():
|
|
session_dict = session.to_dict()
|
|
filtered: dict[str, Any] = {}
|
|
for k, v in session_dict.items():
|
|
if v is None:
|
|
continue
|
|
if k == 'start_time' and isinstance(v, datetime):
|
|
filtered[k] = v.isoformat()
|
|
elif k == 'metadata' and isinstance(v, dict):
|
|
metadata_copy: dict[str, Any] = {}
|
|
for mk, mv in v.items():
|
|
if mv is None:
|
|
continue
|
|
if mk == 'timestamp' and isinstance(mv, datetime):
|
|
metadata_copy[mk] = mv.isoformat()
|
|
else:
|
|
metadata_copy[mk] = mv
|
|
filtered[k] = metadata_copy
|
|
else:
|
|
filtered[k] = v
|
|
data_to_save[session_id] = filtered
|
|
with open(self.registry_path, 'wb') as f:
|
|
tomli_w.dump(data_to_save, f)
|
|
return Result(data=True)
|
|
except OSError as e:
|
|
return Result(data=False, errors=[ErrorInfo(kind=ErrorKind.INTERNAL, message=str(e), source="log_registry.save_registry", original=e)])
|
|
|
|
def register_session(self, session_id: str, path: str, start_time: datetime | str) -> None:
|
|
"""
|
|
Registers a new session in the registry.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
path (str): File path to the session's log directory.
|
|
start_time (datetime|str): The timestamp when the session started.
|
|
[C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e]
|
|
"""
|
|
if session_id in self.data:
|
|
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
|
|
# Store start_time internally as a string to satisfy tests
|
|
if isinstance(start_time, datetime):
|
|
start_time_str = start_time.isoformat()
|
|
else:
|
|
start_time_str = start_time
|
|
self.data[session_id] = Session(
|
|
session_id=session_id,
|
|
path=path,
|
|
start_time=start_time_str,
|
|
whitelisted=False,
|
|
metadata=None,
|
|
)
|
|
self.save_registry()
|
|
|
|
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
|
|
"""
|
|
Updates metadata fields for an existing session.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
message_count (int): Total number of messages in the session.
|
|
errors (int): Number of errors identified in logs.
|
|
size_kb (int): Total size of the session logs in kilobytes.
|
|
whitelisted (bool): Whether the session should be protected from pruning.
|
|
reason (str): Explanation for the current whitelisting status.
|
|
[C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata]
|
|
"""
|
|
if session_id not in self.data:
|
|
print(f"Error: Session ID '{session_id}' not found for metadata update.")
|
|
return
|
|
existing = self.data[session_id]
|
|
new_metadata = SessionMetadata(
|
|
message_count=message_count,
|
|
errors=errors,
|
|
size_kb=size_kb,
|
|
whitelisted=whitelisted,
|
|
reason=reason,
|
|
timestamp=existing.metadata.timestamp if existing.metadata else None,
|
|
)
|
|
self.data[session_id] = Session(
|
|
session_id=existing.session_id,
|
|
path=existing.path,
|
|
start_time=existing.start_time,
|
|
whitelisted=whitelisted,
|
|
metadata=new_metadata,
|
|
)
|
|
self.save_registry() # Save after update
|
|
|
|
def set_session_start_time(self, session_id: str, start_time: datetime | str) -> None:
|
|
"""
|
|
Updates the start_time of an existing session.
|
|
|
|
Used by tests and maintenance tools to backdate a session for pruning
|
|
verification. Creates a new Session with the updated start_time while
|
|
preserving all other fields (Session is frozen).
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
start_time (datetime|str): The new start timestamp.
|
|
[C: tests/test_logging_e2e.py:test_logging_e2e]
|
|
"""
|
|
if session_id not in self.data:
|
|
print(f"Error: Session ID '{session_id}' not found for start_time update.")
|
|
return
|
|
if isinstance(start_time, datetime):
|
|
start_time_str: str = start_time.isoformat()
|
|
else:
|
|
start_time_str = start_time
|
|
existing = self.data[session_id]
|
|
self.data[session_id] = Session(
|
|
session_id=existing.session_id,
|
|
path=existing.path,
|
|
start_time=start_time_str,
|
|
whitelisted=existing.whitelisted,
|
|
metadata=existing.metadata,
|
|
)
|
|
self.save_registry()
|
|
|
|
def is_session_whitelisted(self, session_id: str) -> bool:
|
|
"""
|
|
Checks if a specific session is marked as whitelisted.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
|
|
Returns:
|
|
bool: True if whitelisted, False otherwise.
|
|
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
|
|
"""
|
|
session = self.data.get(session_id)
|
|
if session is None:
|
|
return False # Non-existent sessions are not whitelisted
|
|
return session.whitelisted
|
|
|
|
def update_auto_whitelist_status(self, session_id: str) -> None:
|
|
"""
|
|
Analyzes session logs and updates whitelisting status based on heuristics.
|
|
Sessions are automatically whitelisted if they contain error keywords,
|
|
have a high message count, or exceed a size threshold.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session to analyze.
|
|
[C: src/session_logger.py:close_session]
|
|
"""
|
|
if session_id not in self.data:
|
|
return
|
|
session_data = self.data[session_id]
|
|
session_path = session_data.path
|
|
if not session_path or not os.path.isdir(str(session_path)):
|
|
return
|
|
total_size_bytes = 0
|
|
message_count = 0
|
|
found_keywords = []
|
|
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
|
|
try:
|
|
for entry in os.scandir(str(session_path)):
|
|
if entry.is_file():
|
|
size = entry.stat().st_size
|
|
total_size_bytes += size
|
|
# Analyze comms.log for messages and keywords
|
|
if entry.name == "comms.log":
|
|
try:
|
|
with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
for line in f:
|
|
message_count += 1
|
|
for kw in keywords_to_check:
|
|
if kw in line and kw not in found_keywords:
|
|
found_keywords.append(kw)
|
|
except OSError as e:
|
|
import sys
|
|
sys.stderr.write(f"[LogRegistry] read comms.log entry failed: {e}\n")
|
|
except OSError as e:
|
|
import sys
|
|
sys.stderr.write(f"[LogRegistry] scan session_path failed: {e}\n")
|
|
size_kb = total_size_bytes / 1024
|
|
whitelisted = False
|
|
reason = ""
|
|
if found_keywords:
|
|
whitelisted = True
|
|
reason = f"Found keywords: {', '.join(found_keywords)}"
|
|
elif message_count > 10:
|
|
whitelisted = True
|
|
reason = f"High message count: {message_count}"
|
|
elif size_kb > 50:
|
|
whitelisted = True
|
|
reason = f"Large session size: {size_kb:.1f} KB"
|
|
self.update_session_metadata(
|
|
session_id,
|
|
message_count = message_count,
|
|
errors = len(found_keywords),
|
|
size_kb = int(size_kb),
|
|
whitelisted = whitelisted,
|
|
reason = reason
|
|
)
|
|
|
|
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]:
|
|
"""
|
|
Retrieves a list of sessions that are older than a specific cutoff time
|
|
and are not marked as whitelisted.
|
|
Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0).
|
|
|
|
Args:
|
|
cutoff_datetime (datetime): The threshold time for identifying old sessions.
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing session details (id, path, start_time).
|
|
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
|
|
"""
|
|
old_sessions = []
|
|
for session_id, session in self.data.items():
|
|
# Check if session is older than cutoff and not whitelisted
|
|
start_time_raw = session.start_time
|
|
if isinstance(start_time_raw, str):
|
|
try:
|
|
start_time = datetime.fromisoformat(start_time_raw)
|
|
except ValueError:
|
|
start_time = None
|
|
else:
|
|
start_time = start_time_raw
|
|
is_whitelisted = session.whitelisted
|
|
|
|
# Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata
|
|
metadata = session.metadata
|
|
if metadata is None:
|
|
is_empty = True
|
|
else:
|
|
is_empty = (metadata.message_count == 0 or metadata.size_kb == 0)
|
|
|
|
if not is_whitelisted:
|
|
if is_empty or (start_time is not None and start_time < cutoff_datetime):
|
|
old_sessions.append({
|
|
'session_id': session_id,
|
|
'path': session.path,
|
|
'start_time': start_time_raw
|
|
})
|
|
return old_sessions
|