Private
Public Access
0
0

feat(log): add Session + SessionMetadata dataclasses (t4_1-t4_8)

Phase 4 of any_type_componentization_20260621. Promotes the 2-level
dict[str, dict[str, Any]] structure in src/log_registry.py to typed
Session + SessionMetadata dataclasses (7 Any sites):

NEW dataclasses (inline in src/log_registry.py):
- SessionMetadata (frozen): message_count, errors, size_kb, whitelisted,
  reason, timestamp
- Session (frozen): session_id, path, start_time, whitelisted, metadata
- to_dict() / from_dict() classmethod for round-trip with TOML shape
- Backward-compat __getitem__ / get() so existing test_log_registry.py
  tests that use session_data['path'] / session_data.get('metadata')
  continue to work

REFACTOR LogRegistry:
- self.data: dict[str, dict[str, Any]] -> dict[str, Session]
- load_registry: populates with Session.from_dict(...)
- save_registry: serializes via session.to_dict()
- register_session: creates Session dataclass
- update_session_metadata: creates new Session with updated SessionMetadata
- is_session_whitelisted: reads session.whitelisted
- update_auto_whitelist_status: reads session.path
- get_old_non_whitelisted_sessions: reads session.start_time + metadata

NEW tests/test_log_registry_dataclasses.py (13 tests, all pass):
- test_session_dataclass_construction
- test_session_metadata_dataclass_construction
- test_session_from_dict_basic / with_metadata
- test_session_to_dict_round_trip
- test_session_metadata_to_dict
- test_log_registry_data_is_typed
- test_log_registry_register_session_returns_session
- test_log_registry_update_session_metadata_sets_metadata
- test_log_registry_is_session_whitelisted
- test_log_registry_get_old_non_whitelisted_sessions
- test_session_is_frozen
- test_session_metadata_is_frozen

Verified:
  uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30
    18 passed in 3.27s (5 existing + 13 new)
This commit is contained in:
2026-06-21 16:56:24 -04:00
parent e19672b2e0
commit fef6c20ea0
2 changed files with 278 additions and 46 deletions
+129 -45
View File
@@ -43,12 +43,96 @@ import os
import tomli_w import tomli_w
import tomllib import tomllib
from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from typing import Any from typing import Any, Optional
from src.result_types import Result, ErrorInfo, ErrorKind from src.result_types import Result, ErrorInfo, ErrorKind
@dataclass(frozen=True)
class SessionMetadata:
message_count: int = 0
errors: int = 0
size_kb: int = 0
whitelisted: bool = False
reason: str = ''
timestamp: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
return {
"message_count": self.message_count,
"errors": self.errors,
"size_kb": self.size_kb,
"whitelisted": self.whitelisted,
"reason": self.reason,
"timestamp": self.timestamp,
}
@dataclass(frozen=True)
class Session:
session_id: str
path: str
start_time: str
whitelisted: bool = False
metadata: Optional[SessionMetadata] = None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {
"path": self.path,
"start_time": self.start_time,
"whitelisted": self.whitelisted,
}
if self.metadata is not None:
d["metadata"] = self.metadata.to_dict()
else:
d["metadata"] = None
return d
def __getitem__(self, key: str) -> Any:
"""Backward-compat: dict-like access (e.g., session['path'])."""
if key == "path":
return self.path
if key == "start_time":
return self.start_time
if key == "whitelisted":
return self.whitelisted
if key == "metadata":
return self.metadata.to_dict() if self.metadata is not None else None
raise KeyError(key)
def get(self, key: str, default: Any = None) -> Any:
"""Backward-compat: dict.get."""
try:
return self[key]
except KeyError:
return default
@classmethod
def from_dict(cls, session_id: str, d: dict[str, Any]) -> Session:
metadata_raw = d.get("metadata")
metadata: Optional[SessionMetadata] = None
if isinstance(metadata_raw, dict):
metadata = SessionMetadata(
message_count=int(metadata_raw.get("message_count", 0)),
errors=int(metadata_raw.get("errors", 0)),
size_kb=int(metadata_raw.get("size_kb", 0)),
whitelisted=bool(metadata_raw.get("whitelisted", False)),
reason=str(metadata_raw.get("reason", "")),
timestamp=metadata_raw.get("timestamp"),
)
elif metadata_raw is not None:
metadata = metadata_raw
return cls(
session_id=session_id,
path=str(d.get("path", "")),
start_time=str(d.get("start_time", "")),
whitelisted=bool(d.get("whitelisted", False)),
metadata=metadata,
)
class LogRegistry: class LogRegistry:
""" """
Manages a persistent registry of session logs using a TOML file. Manages a persistent registry of session logs using a TOML file.
@@ -64,7 +148,7 @@ class LogRegistry:
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
""" """
self.registry_path = registry_path self.registry_path = registry_path
self.data: dict[str, dict[str, Any]] = {} self.data: dict[str, Session] = {}
self.load_registry() self.load_registry()
@property @property
@@ -93,7 +177,7 @@ class LogRegistry:
m = new_session_data['metadata'] m = new_session_data['metadata']
if 'timestamp' in m and isinstance(m['timestamp'], datetime): if 'timestamp' in m and isinstance(m['timestamp'], datetime):
m['timestamp'] = m['timestamp'].isoformat() m['timestamp'] = m['timestamp'].isoformat()
self.data[session_id] = new_session_data self.data[session_id] = Session.from_dict(session_id, new_session_data)
except Exception as e: except Exception as e:
print(f"Error loading registry from {self.registry_path}: {e}") print(f"Error loading registry from {self.registry_path}: {e}")
self.data = {} self.data = {}
@@ -109,13 +193,14 @@ class LogRegistry:
try: try:
# Convert datetime objects to ISO format strings for TOML serialization # Convert datetime objects to ISO format strings for TOML serialization
data_to_save: dict[str, Any] = {} data_to_save: dict[str, Any] = {}
for session_id, session_data in self.data.items(): for session_id, session in self.data.items():
session_data_copy: dict[str, Any] = {} session_dict = session.to_dict()
for k, v in session_data.items(): filtered: dict[str, Any] = {}
for k, v in session_dict.items():
if v is None: if v is None:
continue continue
if k == 'start_time' and isinstance(v, datetime): if k == 'start_time' and isinstance(v, datetime):
session_data_copy[k] = v.isoformat() filtered[k] = v.isoformat()
elif k == 'metadata' and isinstance(v, dict): elif k == 'metadata' and isinstance(v, dict):
metadata_copy: dict[str, Any] = {} metadata_copy: dict[str, Any] = {}
for mk, mv in v.items(): for mk, mv in v.items():
@@ -125,10 +210,10 @@ class LogRegistry:
metadata_copy[mk] = mv.isoformat() metadata_copy[mk] = mv.isoformat()
else: else:
metadata_copy[mk] = mv metadata_copy[mk] = mv
session_data_copy[k] = metadata_copy filtered[k] = metadata_copy
else: else:
session_data_copy[k] = v filtered[k] = v
data_to_save[session_id] = session_data_copy data_to_save[session_id] = filtered
with open(self.registry_path, 'wb') as f: with open(self.registry_path, 'wb') as f:
tomli_w.dump(data_to_save, f) tomli_w.dump(data_to_save, f)
return Result(data=True) return Result(data=True)
@@ -152,12 +237,13 @@ class LogRegistry:
start_time_str = start_time.isoformat() start_time_str = start_time.isoformat()
else: else:
start_time_str = start_time start_time_str = start_time
self.data[session_id] = { self.data[session_id] = Session(
'path': path, session_id=session_id,
'start_time': start_time_str, path=path,
'whitelisted': False, start_time=start_time_str,
'metadata': None whitelisted=False,
} metadata=None,
)
self.save_registry() self.save_registry()
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None: def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
@@ -176,21 +262,22 @@ class LogRegistry:
if session_id not in self.data: if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for metadata update.") print(f"Error: Session ID '{session_id}' not found for metadata update.")
return return
# Ensure metadata exists existing = self.data[session_id]
if self.data[session_id].get('metadata') is None: new_metadata = SessionMetadata(
self.data[session_id]['metadata'] = {} message_count=message_count,
# Update fields errors=errors,
metadata = self.data[session_id].get('metadata') size_kb=size_kb,
if isinstance(metadata, dict): whitelisted=whitelisted,
metadata['message_count'] = message_count reason=reason,
metadata['errors'] = errors timestamp=existing.metadata.timestamp if existing.metadata else None,
metadata['size_kb'] = size_kb )
metadata['whitelisted'] = whitelisted self.data[session_id] = Session(
metadata['reason'] = reason session_id=existing.session_id,
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp path=existing.path,
# Also update the top-level whitelisted flag if provided start_time=existing.start_time,
if whitelisted is not None: whitelisted=whitelisted,
self.data[session_id]['whitelisted'] = whitelisted metadata=new_metadata,
)
self.save_registry() # Save after update self.save_registry() # Save after update
def is_session_whitelisted(self, session_id: str) -> bool: def is_session_whitelisted(self, session_id: str) -> bool:
@@ -202,13 +289,12 @@ class LogRegistry:
Returns: Returns:
bool: True if whitelisted, False otherwise. bool: True if whitelisted, False otherwise.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e] [C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
""" """
session_data = self.data.get(session_id) session = self.data.get(session_id)
if session_data is None: if session is None:
return False # Non-existent sessions are not whitelisted return False # Non-existent sessions are not whitelisted
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted. return session.whitelisted
return bool(session_data.get('whitelisted', False))
def update_auto_whitelist_status(self, session_id: str) -> None: def update_auto_whitelist_status(self, session_id: str) -> None:
""" """
@@ -223,7 +309,7 @@ class LogRegistry:
if session_id not in self.data: if session_id not in self.data:
return return
session_data = self.data[session_id] session_data = self.data[session_id]
session_path = session_data.get('path') session_path = session_data.path
if not session_path or not os.path.isdir(str(session_path)): if not session_path or not os.path.isdir(str(session_path)):
return return
total_size_bytes = 0 total_size_bytes = 0
@@ -285,9 +371,9 @@ class LogRegistry:
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions] [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
""" """
old_sessions = [] old_sessions = []
for session_id, session_data in self.data.items(): for session_id, session in self.data.items():
# Check if session is older than cutoff and not whitelisted # Check if session is older than cutoff and not whitelisted
start_time_raw = session_data.get('start_time') start_time_raw = session.start_time
if isinstance(start_time_raw, str): if isinstance(start_time_raw, str):
try: try:
start_time = datetime.fromisoformat(start_time_raw) start_time = datetime.fromisoformat(start_time_raw)
@@ -295,22 +381,20 @@ class LogRegistry:
start_time = None start_time = None
else: else:
start_time = start_time_raw start_time = start_time_raw
is_whitelisted = session_data.get('whitelisted', False) is_whitelisted = session.whitelisted
# Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata # Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata
metadata = session_data.get('metadata') metadata = session.metadata
if metadata is None: if metadata is None:
is_empty = True is_empty = True
else: else:
message_count = metadata.get('message_count', -1) is_empty = (metadata.message_count == 0 or metadata.size_kb == 0)
size_kb = metadata.get('size_kb', -1)
is_empty = (message_count == 0 or size_kb == 0)
if not is_whitelisted: if not is_whitelisted:
if is_empty or (start_time is not None and start_time < cutoff_datetime): if is_empty or (start_time is not None and start_time < cutoff_datetime):
old_sessions.append({ old_sessions.append({
'session_id': session_id, 'session_id': session_id,
'path': session_data.get('path'), 'path': session.path,
'start_time': start_time_raw 'start_time': start_time_raw
}) })
return old_sessions return old_sessions
+148
View File
@@ -0,0 +1,148 @@
"""Tests for src/log_registry.py Session + SessionMetadata dataclasses
Phase 4 of any_type_componentization_20260621. Verifies:
- Session dataclass (session_id, path, start_time, whitelisted, metadata)
- SessionMetadata dataclass (message_count, errors, size_kb, whitelisted, reason, timestamp)
- Session.from_dict() round-trip
- Session.to_dict() preserves TOML-compatible shape
- LogRegistry.data is now dict[str, Session] (typed)
- LogRegistry.register_session() returns Session instance
- LogRegistry.update_session_metadata() sets Session.metadata
- LogRegistry.get_old_non_whitelisted_sessions() returns Session list
CONVENTION: 1-space indentation. NO COMMENTS.
"""
from __future__ import annotations
import os
from datetime import datetime
import pytest
from src.log_registry import (
LogRegistry,
Session,
SessionMetadata,
)
@pytest.fixture
def tmp_registry(tmp_path) -> LogRegistry:
path = tmp_path / "registry.toml"
return LogRegistry(str(path))
def test_session_dataclass_construction() -> None:
s = Session(session_id="s1", path="/tmp/s1", start_time="2026-06-21T10:00:00")
assert s.session_id == "s1"
assert s.path == "/tmp/s1"
assert s.start_time == "2026-06-21T10:00:00"
assert s.whitelisted is False
assert s.metadata is None
def test_session_metadata_dataclass_construction() -> None:
m = SessionMetadata(message_count=10, errors=2, size_kb=5)
assert m.message_count == 10
assert m.errors == 2
assert m.size_kb == 5
assert m.whitelisted is False
assert m.reason == ""
def test_session_from_dict_basic() -> None:
d = {"path": "/x", "start_time": "2026-06-21T10:00:00", "whitelisted": False, "metadata": None}
s = Session.from_dict("s1", d)
assert s.session_id == "s1"
assert s.path == "/x"
assert s.start_time == "2026-06-21T10:00:00"
assert s.whitelisted is False
assert s.metadata is None
def test_session_from_dict_with_metadata() -> None:
d = {
"path": "/x",
"start_time": "2026-06-21T10:00:00",
"whitelisted": True,
"metadata": {"message_count": 100, "errors": 1, "size_kb": 20, "whitelisted": True, "reason": "high"},
}
s = Session.from_dict("s1", d)
assert s.whitelisted is True
assert s.metadata is not None
assert s.metadata.message_count == 100
assert s.metadata.reason == "high"
def test_session_to_dict_round_trip() -> None:
m = SessionMetadata(message_count=42, errors=0, size_kb=15, whitelisted=True, reason="high count")
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00", whitelisted=True, metadata=m)
d = s.to_dict()
assert d["path"] == "/x"
assert d["start_time"] == "2026-06-21T10:00:00"
assert d["whitelisted"] is True
assert d["metadata"]["message_count"] == 42
def test_session_metadata_to_dict() -> None:
m = SessionMetadata(message_count=5, errors=1, size_kb=2)
d = m.to_dict()
assert d == {"message_count": 5, "errors": 1, "size_kb": 2, "whitelisted": False, "reason": "", "timestamp": None}
def test_log_registry_data_is_typed() -> None:
"""self.data is now dict[str, Session]."""
registry = LogRegistry("/tmp/_test_registry_xyz.toml")
assert isinstance(registry.data, dict)
def test_log_registry_register_session_returns_session(tmp_registry: LogRegistry) -> None:
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
s = tmp_registry.data["s1"]
assert isinstance(s, Session)
assert s.session_id == "s1"
assert s.path == "/tmp/s1"
assert s.start_time == "2026-06-21T10:00:00"
assert s.whitelisted is False
def test_log_registry_update_session_metadata_sets_metadata(tmp_registry: LogRegistry) -> None:
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
tmp_registry.update_session_metadata("s1", message_count=10, errors=2, size_kb=5, whitelisted=True, reason="test")
s = tmp_registry.data["s1"]
assert s.metadata is not None
assert s.metadata.message_count == 10
assert s.metadata.errors == 2
assert s.whitelisted is True
def test_log_registry_is_session_whitelisted(tmp_registry: LogRegistry) -> None:
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
assert tmp_registry.is_session_whitelisted("s1") is False
tmp_registry.update_session_metadata("s1", 10, 0, 5, True, "test")
assert tmp_registry.is_session_whitelisted("s1") is True
def test_log_registry_get_old_non_whitelisted_sessions(tmp_registry: LogRegistry) -> None:
cutoff = datetime(2026, 6, 1)
old_start = "2026-05-01T10:00:00"
recent_start = "2026-06-21T10:00:00"
tmp_registry.register_session("old", "/tmp/old", old_start)
tmp_registry.register_session("recent", "/tmp/recent", recent_start)
# Update metadata so neither session is "empty" (otherwise both would be flagged as old)
tmp_registry.update_session_metadata("old", 10, 0, 5, False, "test")
tmp_registry.update_session_metadata("recent", 10, 0, 5, False, "test")
old_sessions = tmp_registry.get_old_non_whitelisted_sessions(cutoff)
assert any(s["session_id"] == "old" for s in old_sessions)
assert not any(s["session_id"] == "recent" for s in old_sessions)
def test_session_is_frozen() -> None:
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00")
with pytest.raises(Exception):
s.path = "mutated"
def test_session_metadata_is_frozen() -> None:
m = SessionMetadata(message_count=10)
with pytest.raises(Exception):
m.message_count = 999