From fef6c20ea03cf6c5794799f650a256b9290d67cb Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 21 Jun 2026 16:56:24 -0400 Subject: [PATCH] feat(log): add Session + SessionMetadata dataclasses (t4_1-t4_8) Phase 4 of any_type_componentization_20260621. Promotes the 2-level dict[str, dict[str, Any]] structure in src/log_registry.py to typed Session + SessionMetadata dataclasses (7 Any sites): NEW dataclasses (inline in src/log_registry.py): - SessionMetadata (frozen): message_count, errors, size_kb, whitelisted, reason, timestamp - Session (frozen): session_id, path, start_time, whitelisted, metadata - to_dict() / from_dict() classmethod for round-trip with TOML shape - Backward-compat __getitem__ / get() so existing test_log_registry.py tests that use session_data['path'] / session_data.get('metadata') continue to work REFACTOR LogRegistry: - self.data: dict[str, dict[str, Any]] -> dict[str, Session] - load_registry: populates with Session.from_dict(...) - save_registry: serializes via session.to_dict() - register_session: creates Session dataclass - update_session_metadata: creates new Session with updated SessionMetadata - is_session_whitelisted: reads session.whitelisted - update_auto_whitelist_status: reads session.path - get_old_non_whitelisted_sessions: reads session.start_time + metadata NEW tests/test_log_registry_dataclasses.py (13 tests, all pass): - test_session_dataclass_construction - test_session_metadata_dataclass_construction - test_session_from_dict_basic / with_metadata - test_session_to_dict_round_trip - test_session_metadata_to_dict - test_log_registry_data_is_typed - test_log_registry_register_session_returns_session - test_log_registry_update_session_metadata_sets_metadata - test_log_registry_is_session_whitelisted - test_log_registry_get_old_non_whitelisted_sessions - test_session_is_frozen - test_session_metadata_is_frozen Verified: uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30 18 passed in 3.27s (5 existing + 13 new) --- src/log_registry.py | 176 ++++++++++++++++++------- tests/test_log_registry_dataclasses.py | 148 +++++++++++++++++++++ 2 files changed, 278 insertions(+), 46 deletions(-) create mode 100644 tests/test_log_registry_dataclasses.py diff --git a/src/log_registry.py b/src/log_registry.py index 5ee346fe..8bf3ea3d 100644 --- a/src/log_registry.py +++ b/src/log_registry.py @@ -43,12 +43,96 @@ import os import tomli_w import tomllib +from dataclasses import dataclass from datetime import datetime -from typing import Any +from typing import Any, Optional from src.result_types import Result, ErrorInfo, ErrorKind +@dataclass(frozen=True) +class SessionMetadata: + message_count: int = 0 + errors: int = 0 + size_kb: int = 0 + whitelisted: bool = False + reason: str = '' + timestamp: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + return { + "message_count": self.message_count, + "errors": self.errors, + "size_kb": self.size_kb, + "whitelisted": self.whitelisted, + "reason": self.reason, + "timestamp": self.timestamp, + } + + +@dataclass(frozen=True) +class Session: + session_id: str + path: str + start_time: str + whitelisted: bool = False + metadata: Optional[SessionMetadata] = None + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = { + "path": self.path, + "start_time": self.start_time, + "whitelisted": self.whitelisted, + } + if self.metadata is not None: + d["metadata"] = self.metadata.to_dict() + else: + d["metadata"] = None + return d + + def __getitem__(self, key: str) -> Any: + """Backward-compat: dict-like access (e.g., session['path']).""" + if key == "path": + return self.path + if key == "start_time": + return self.start_time + if key == "whitelisted": + return self.whitelisted + if key == "metadata": + return self.metadata.to_dict() if self.metadata is not None else None + raise KeyError(key) + + def get(self, key: str, default: Any = None) -> Any: + """Backward-compat: dict.get.""" + try: + return self[key] + except KeyError: + return default + + @classmethod + def from_dict(cls, session_id: str, d: dict[str, Any]) -> Session: + metadata_raw = d.get("metadata") + metadata: Optional[SessionMetadata] = None + if isinstance(metadata_raw, dict): + metadata = SessionMetadata( + message_count=int(metadata_raw.get("message_count", 0)), + errors=int(metadata_raw.get("errors", 0)), + size_kb=int(metadata_raw.get("size_kb", 0)), + whitelisted=bool(metadata_raw.get("whitelisted", False)), + reason=str(metadata_raw.get("reason", "")), + timestamp=metadata_raw.get("timestamp"), + ) + elif metadata_raw is not None: + metadata = metadata_raw + return cls( + session_id=session_id, + path=str(d.get("path", "")), + start_time=str(d.get("start_time", "")), + whitelisted=bool(d.get("whitelisted", False)), + metadata=metadata, + ) + + class LogRegistry: """ Manages a persistent registry of session logs using a TOML file. @@ -58,13 +142,13 @@ class LogRegistry: def __init__(self, registry_path: str) -> None: """ Initializes the LogRegistry with a path to the registry file. - + Args: registry_path (str): The file path to the TOML registry. [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.registry_path = registry_path - self.data: dict[str, dict[str, Any]] = {} + self.data: dict[str, Session] = {} self.load_registry() @property @@ -93,7 +177,7 @@ class LogRegistry: m = new_session_data['metadata'] if 'timestamp' in m and isinstance(m['timestamp'], datetime): m['timestamp'] = m['timestamp'].isoformat() - self.data[session_id] = new_session_data + self.data[session_id] = Session.from_dict(session_id, new_session_data) except Exception as e: print(f"Error loading registry from {self.registry_path}: {e}") self.data = {} @@ -109,13 +193,14 @@ class LogRegistry: try: # Convert datetime objects to ISO format strings for TOML serialization data_to_save: dict[str, Any] = {} - for session_id, session_data in self.data.items(): - session_data_copy: dict[str, Any] = {} - for k, v in session_data.items(): + for session_id, session in self.data.items(): + session_dict = session.to_dict() + filtered: dict[str, Any] = {} + for k, v in session_dict.items(): if v is None: continue if k == 'start_time' and isinstance(v, datetime): - session_data_copy[k] = v.isoformat() + filtered[k] = v.isoformat() elif k == 'metadata' and isinstance(v, dict): metadata_copy: dict[str, Any] = {} for mk, mv in v.items(): @@ -125,10 +210,10 @@ class LogRegistry: metadata_copy[mk] = mv.isoformat() else: metadata_copy[mk] = mv - session_data_copy[k] = metadata_copy + filtered[k] = metadata_copy else: - session_data_copy[k] = v - data_to_save[session_id] = session_data_copy + filtered[k] = v + data_to_save[session_id] = filtered with open(self.registry_path, 'wb') as f: tomli_w.dump(data_to_save, f) return Result(data=True) @@ -152,12 +237,13 @@ class LogRegistry: start_time_str = start_time.isoformat() else: start_time_str = start_time - self.data[session_id] = { - 'path': path, - 'start_time': start_time_str, - 'whitelisted': False, - 'metadata': None - } + self.data[session_id] = Session( + session_id=session_id, + path=path, + start_time=start_time_str, + whitelisted=False, + metadata=None, + ) self.save_registry() def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None: @@ -176,21 +262,22 @@ class LogRegistry: if session_id not in self.data: print(f"Error: Session ID '{session_id}' not found for metadata update.") return - # Ensure metadata exists - if self.data[session_id].get('metadata') is None: - self.data[session_id]['metadata'] = {} - # Update fields - metadata = self.data[session_id].get('metadata') - if isinstance(metadata, dict): - metadata['message_count'] = message_count - metadata['errors'] = errors - metadata['size_kb'] = size_kb - metadata['whitelisted'] = whitelisted - metadata['reason'] = reason - # self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp - # Also update the top-level whitelisted flag if provided - if whitelisted is not None: - self.data[session_id]['whitelisted'] = whitelisted + existing = self.data[session_id] + new_metadata = SessionMetadata( + message_count=message_count, + errors=errors, + size_kb=size_kb, + whitelisted=whitelisted, + reason=reason, + timestamp=existing.metadata.timestamp if existing.metadata else None, + ) + self.data[session_id] = Session( + session_id=existing.session_id, + path=existing.path, + start_time=existing.start_time, + whitelisted=whitelisted, + metadata=new_metadata, + ) self.save_registry() # Save after update def is_session_whitelisted(self, session_id: str) -> bool: @@ -202,13 +289,12 @@ class LogRegistry: Returns: bool: True if whitelisted, False otherwise. - [C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e] + [C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e] """ - session_data = self.data.get(session_id) - if session_data is None: + session = self.data.get(session_id) + if session is None: return False # Non-existent sessions are not whitelisted - # Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted. - return bool(session_data.get('whitelisted', False)) + return session.whitelisted def update_auto_whitelist_status(self, session_id: str) -> None: """ @@ -223,7 +309,7 @@ class LogRegistry: if session_id not in self.data: return session_data = self.data[session_id] - session_path = session_data.get('path') + session_path = session_data.path if not session_path or not os.path.isdir(str(session_path)): return total_size_bytes = 0 @@ -285,9 +371,9 @@ class LogRegistry: [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions] """ old_sessions = [] - for session_id, session_data in self.data.items(): + for session_id, session in self.data.items(): # Check if session is older than cutoff and not whitelisted - start_time_raw = session_data.get('start_time') + start_time_raw = session.start_time if isinstance(start_time_raw, str): try: start_time = datetime.fromisoformat(start_time_raw) @@ -295,22 +381,20 @@ class LogRegistry: start_time = None else: start_time = start_time_raw - is_whitelisted = session_data.get('whitelisted', False) + is_whitelisted = session.whitelisted # Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata - metadata = session_data.get('metadata') + metadata = session.metadata if metadata is None: is_empty = True else: - message_count = metadata.get('message_count', -1) - size_kb = metadata.get('size_kb', -1) - is_empty = (message_count == 0 or size_kb == 0) + is_empty = (metadata.message_count == 0 or metadata.size_kb == 0) if not is_whitelisted: if is_empty or (start_time is not None and start_time < cutoff_datetime): old_sessions.append({ 'session_id': session_id, - 'path': session_data.get('path'), + 'path': session.path, 'start_time': start_time_raw }) return old_sessions diff --git a/tests/test_log_registry_dataclasses.py b/tests/test_log_registry_dataclasses.py new file mode 100644 index 00000000..d61b10f5 --- /dev/null +++ b/tests/test_log_registry_dataclasses.py @@ -0,0 +1,148 @@ +"""Tests for src/log_registry.py Session + SessionMetadata dataclasses + +Phase 4 of any_type_componentization_20260621. Verifies: +- Session dataclass (session_id, path, start_time, whitelisted, metadata) +- SessionMetadata dataclass (message_count, errors, size_kb, whitelisted, reason, timestamp) +- Session.from_dict() round-trip +- Session.to_dict() preserves TOML-compatible shape +- LogRegistry.data is now dict[str, Session] (typed) +- LogRegistry.register_session() returns Session instance +- LogRegistry.update_session_metadata() sets Session.metadata +- LogRegistry.get_old_non_whitelisted_sessions() returns Session list + +CONVENTION: 1-space indentation. NO COMMENTS. +""" +from __future__ import annotations + +import os +from datetime import datetime + +import pytest +from src.log_registry import ( + LogRegistry, + Session, + SessionMetadata, +) + + +@pytest.fixture +def tmp_registry(tmp_path) -> LogRegistry: + path = tmp_path / "registry.toml" + return LogRegistry(str(path)) + + +def test_session_dataclass_construction() -> None: + s = Session(session_id="s1", path="/tmp/s1", start_time="2026-06-21T10:00:00") + assert s.session_id == "s1" + assert s.path == "/tmp/s1" + assert s.start_time == "2026-06-21T10:00:00" + assert s.whitelisted is False + assert s.metadata is None + + +def test_session_metadata_dataclass_construction() -> None: + m = SessionMetadata(message_count=10, errors=2, size_kb=5) + assert m.message_count == 10 + assert m.errors == 2 + assert m.size_kb == 5 + assert m.whitelisted is False + assert m.reason == "" + + +def test_session_from_dict_basic() -> None: + d = {"path": "/x", "start_time": "2026-06-21T10:00:00", "whitelisted": False, "metadata": None} + s = Session.from_dict("s1", d) + assert s.session_id == "s1" + assert s.path == "/x" + assert s.start_time == "2026-06-21T10:00:00" + assert s.whitelisted is False + assert s.metadata is None + + +def test_session_from_dict_with_metadata() -> None: + d = { + "path": "/x", + "start_time": "2026-06-21T10:00:00", + "whitelisted": True, + "metadata": {"message_count": 100, "errors": 1, "size_kb": 20, "whitelisted": True, "reason": "high"}, + } + s = Session.from_dict("s1", d) + assert s.whitelisted is True + assert s.metadata is not None + assert s.metadata.message_count == 100 + assert s.metadata.reason == "high" + + +def test_session_to_dict_round_trip() -> None: + m = SessionMetadata(message_count=42, errors=0, size_kb=15, whitelisted=True, reason="high count") + s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00", whitelisted=True, metadata=m) + d = s.to_dict() + assert d["path"] == "/x" + assert d["start_time"] == "2026-06-21T10:00:00" + assert d["whitelisted"] is True + assert d["metadata"]["message_count"] == 42 + + +def test_session_metadata_to_dict() -> None: + m = SessionMetadata(message_count=5, errors=1, size_kb=2) + d = m.to_dict() + assert d == {"message_count": 5, "errors": 1, "size_kb": 2, "whitelisted": False, "reason": "", "timestamp": None} + + +def test_log_registry_data_is_typed() -> None: + """self.data is now dict[str, Session].""" + registry = LogRegistry("/tmp/_test_registry_xyz.toml") + assert isinstance(registry.data, dict) + + +def test_log_registry_register_session_returns_session(tmp_registry: LogRegistry) -> None: + tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00") + s = tmp_registry.data["s1"] + assert isinstance(s, Session) + assert s.session_id == "s1" + assert s.path == "/tmp/s1" + assert s.start_time == "2026-06-21T10:00:00" + assert s.whitelisted is False + + +def test_log_registry_update_session_metadata_sets_metadata(tmp_registry: LogRegistry) -> None: + tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00") + tmp_registry.update_session_metadata("s1", message_count=10, errors=2, size_kb=5, whitelisted=True, reason="test") + s = tmp_registry.data["s1"] + assert s.metadata is not None + assert s.metadata.message_count == 10 + assert s.metadata.errors == 2 + assert s.whitelisted is True + + +def test_log_registry_is_session_whitelisted(tmp_registry: LogRegistry) -> None: + tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00") + assert tmp_registry.is_session_whitelisted("s1") is False + tmp_registry.update_session_metadata("s1", 10, 0, 5, True, "test") + assert tmp_registry.is_session_whitelisted("s1") is True + + +def test_log_registry_get_old_non_whitelisted_sessions(tmp_registry: LogRegistry) -> None: + cutoff = datetime(2026, 6, 1) + old_start = "2026-05-01T10:00:00" + recent_start = "2026-06-21T10:00:00" + tmp_registry.register_session("old", "/tmp/old", old_start) + tmp_registry.register_session("recent", "/tmp/recent", recent_start) + # Update metadata so neither session is "empty" (otherwise both would be flagged as old) + tmp_registry.update_session_metadata("old", 10, 0, 5, False, "test") + tmp_registry.update_session_metadata("recent", 10, 0, 5, False, "test") + old_sessions = tmp_registry.get_old_non_whitelisted_sessions(cutoff) + assert any(s["session_id"] == "old" for s in old_sessions) + assert not any(s["session_id"] == "recent" for s in old_sessions) + + +def test_session_is_frozen() -> None: + s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00") + with pytest.raises(Exception): + s.path = "mutated" + + +def test_session_metadata_is_frozen() -> None: + m = SessionMetadata(message_count=10) + with pytest.raises(Exception): + m.message_count = 999 \ No newline at end of file