3816a54d27
Phase 4 of any_type_componentization_20260621. Promotes the 2-level
dict[str, dict[str, Any]] structure in src/log_registry.py to typed
Session + SessionMetadata dataclasses (7 Any sites):
NEW dataclasses (inline in src/log_registry.py):
- SessionMetadata (frozen): message_count, errors, size_kb, whitelisted,
reason, timestamp
- Session (frozen): session_id, path, start_time, whitelisted, metadata
- to_dict() / from_dict() classmethod for round-trip with TOML shape
- Backward-compat __getitem__ / get() so existing test_log_registry.py
tests that use session_data['path'] / session_data.get('metadata')
continue to work
REFACTOR LogRegistry:
- self.data: dict[str, dict[str, Any]] -> dict[str, Session]
- load_registry: populates with Session.from_dict(...)
- save_registry: serializes via session.to_dict()
- register_session: creates Session dataclass
- update_session_metadata: creates new Session with updated SessionMetadata
- is_session_whitelisted: reads session.whitelisted
- update_auto_whitelist_status: reads session.path
- get_old_non_whitelisted_sessions: reads session.start_time + metadata
NEW tests/test_log_registry_dataclasses.py (13 tests, all pass):
- test_session_dataclass_construction
- test_session_metadata_dataclass_construction
- test_session_from_dict_basic / with_metadata
- test_session_to_dict_round_trip
- test_session_metadata_to_dict
- test_log_registry_data_is_typed
- test_log_registry_register_session_returns_session
- test_log_registry_update_session_metadata_sets_metadata
- test_log_registry_is_session_whitelisted
- test_log_registry_get_old_non_whitelisted_sessions
- test_session_is_frozen
- test_session_metadata_is_frozen
Verified:
uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30
18 passed in 3.27s (5 existing + 13 new)
148 lines
5.2 KiB
Python
148 lines
5.2 KiB
Python
"""Tests for src/log_registry.py Session + SessionMetadata dataclasses
|
|
|
|
Phase 4 of any_type_componentization_20260621. Verifies:
|
|
- Session dataclass (session_id, path, start_time, whitelisted, metadata)
|
|
- SessionMetadata dataclass (message_count, errors, size_kb, whitelisted, reason, timestamp)
|
|
- Session.from_dict() round-trip
|
|
- Session.to_dict() preserves TOML-compatible shape
|
|
- LogRegistry.data is now dict[str, Session] (typed)
|
|
- LogRegistry.register_session() returns Session instance
|
|
- LogRegistry.update_session_metadata() sets Session.metadata
|
|
- LogRegistry.get_old_non_whitelisted_sessions() returns Session list
|
|
|
|
CONVENTION: 1-space indentation. NO COMMENTS.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from datetime import datetime
|
|
|
|
import pytest
|
|
from src.log_registry import (
|
|
LogRegistry,
|
|
Session,
|
|
SessionMetadata,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def tmp_registry(tmp_path) -> LogRegistry:
|
|
path = tmp_path / "registry.toml"
|
|
return LogRegistry(str(path))
|
|
|
|
|
|
def test_session_dataclass_construction() -> None:
|
|
s = Session(session_id="s1", path="/tmp/s1", start_time="2026-06-21T10:00:00")
|
|
assert s.session_id == "s1"
|
|
assert s.path == "/tmp/s1"
|
|
assert s.start_time == "2026-06-21T10:00:00"
|
|
assert s.whitelisted is False
|
|
assert s.metadata is None
|
|
|
|
|
|
def test_session_metadata_dataclass_construction() -> None:
|
|
m = SessionMetadata(message_count=10, errors=2, size_kb=5)
|
|
assert m.message_count == 10
|
|
assert m.errors == 2
|
|
assert m.size_kb == 5
|
|
assert m.whitelisted is False
|
|
assert m.reason == ""
|
|
|
|
|
|
def test_session_from_dict_basic() -> None:
|
|
d = {"path": "/x", "start_time": "2026-06-21T10:00:00", "whitelisted": False, "metadata": None}
|
|
s = Session.from_dict("s1", d)
|
|
assert s.session_id == "s1"
|
|
assert s.path == "/x"
|
|
assert s.start_time == "2026-06-21T10:00:00"
|
|
assert s.whitelisted is False
|
|
assert s.metadata is None
|
|
|
|
|
|
def test_session_from_dict_with_metadata() -> None:
|
|
d = {
|
|
"path": "/x",
|
|
"start_time": "2026-06-21T10:00:00",
|
|
"whitelisted": True,
|
|
"metadata": {"message_count": 100, "errors": 1, "size_kb": 20, "whitelisted": True, "reason": "high"},
|
|
}
|
|
s = Session.from_dict("s1", d)
|
|
assert s.whitelisted is True
|
|
assert s.metadata is not None
|
|
assert s.metadata.message_count == 100
|
|
assert s.metadata.reason == "high"
|
|
|
|
|
|
def test_session_to_dict_round_trip() -> None:
|
|
m = SessionMetadata(message_count=42, errors=0, size_kb=15, whitelisted=True, reason="high count")
|
|
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00", whitelisted=True, metadata=m)
|
|
d = s.to_dict()
|
|
assert d["path"] == "/x"
|
|
assert d["start_time"] == "2026-06-21T10:00:00"
|
|
assert d["whitelisted"] is True
|
|
assert d["metadata"]["message_count"] == 42
|
|
|
|
|
|
def test_session_metadata_to_dict() -> None:
|
|
m = SessionMetadata(message_count=5, errors=1, size_kb=2)
|
|
d = m.to_dict()
|
|
assert d == {"message_count": 5, "errors": 1, "size_kb": 2, "whitelisted": False, "reason": "", "timestamp": None}
|
|
|
|
|
|
def test_log_registry_data_is_typed() -> None:
|
|
"""self.data is now dict[str, Session]."""
|
|
registry = LogRegistry("/tmp/_test_registry_xyz.toml")
|
|
assert isinstance(registry.data, dict)
|
|
|
|
|
|
def test_log_registry_register_session_returns_session(tmp_registry: LogRegistry) -> None:
|
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
|
s = tmp_registry.data["s1"]
|
|
assert isinstance(s, Session)
|
|
assert s.session_id == "s1"
|
|
assert s.path == "/tmp/s1"
|
|
assert s.start_time == "2026-06-21T10:00:00"
|
|
assert s.whitelisted is False
|
|
|
|
|
|
def test_log_registry_update_session_metadata_sets_metadata(tmp_registry: LogRegistry) -> None:
|
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
|
tmp_registry.update_session_metadata("s1", message_count=10, errors=2, size_kb=5, whitelisted=True, reason="test")
|
|
s = tmp_registry.data["s1"]
|
|
assert s.metadata is not None
|
|
assert s.metadata.message_count == 10
|
|
assert s.metadata.errors == 2
|
|
assert s.whitelisted is True
|
|
|
|
|
|
def test_log_registry_is_session_whitelisted(tmp_registry: LogRegistry) -> None:
|
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
|
assert tmp_registry.is_session_whitelisted("s1") is False
|
|
tmp_registry.update_session_metadata("s1", 10, 0, 5, True, "test")
|
|
assert tmp_registry.is_session_whitelisted("s1") is True
|
|
|
|
|
|
def test_log_registry_get_old_non_whitelisted_sessions(tmp_registry: LogRegistry) -> None:
|
|
cutoff = datetime(2026, 6, 1)
|
|
old_start = "2026-05-01T10:00:00"
|
|
recent_start = "2026-06-21T10:00:00"
|
|
tmp_registry.register_session("old", "/tmp/old", old_start)
|
|
tmp_registry.register_session("recent", "/tmp/recent", recent_start)
|
|
# Update metadata so neither session is "empty" (otherwise both would be flagged as old)
|
|
tmp_registry.update_session_metadata("old", 10, 0, 5, False, "test")
|
|
tmp_registry.update_session_metadata("recent", 10, 0, 5, False, "test")
|
|
old_sessions = tmp_registry.get_old_non_whitelisted_sessions(cutoff)
|
|
assert any(s["session_id"] == "old" for s in old_sessions)
|
|
assert not any(s["session_id"] == "recent" for s in old_sessions)
|
|
|
|
|
|
def test_session_is_frozen() -> None:
|
|
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00")
|
|
with pytest.raises(Exception):
|
|
s.path = "mutated"
|
|
|
|
|
|
def test_session_metadata_is_frozen() -> None:
|
|
m = SessionMetadata(message_count=10)
|
|
with pytest.raises(Exception):
|
|
m.message_count = 999 |