feat(log): add Session + SessionMetadata dataclasses (t4_1-t4_8)
Phase 4 of any_type_componentization_20260621. Promotes the 2-level
dict[str, dict[str, Any]] structure in src/log_registry.py to typed
Session + SessionMetadata dataclasses (7 Any sites):
NEW dataclasses (inline in src/log_registry.py):
- SessionMetadata (frozen): message_count, errors, size_kb, whitelisted,
reason, timestamp
- Session (frozen): session_id, path, start_time, whitelisted, metadata
- to_dict() / from_dict() classmethod for round-trip with TOML shape
- Backward-compat __getitem__ / get() so existing test_log_registry.py
tests that use session_data['path'] / session_data.get('metadata')
continue to work
REFACTOR LogRegistry:
- self.data: dict[str, dict[str, Any]] -> dict[str, Session]
- load_registry: populates with Session.from_dict(...)
- save_registry: serializes via session.to_dict()
- register_session: creates Session dataclass
- update_session_metadata: creates new Session with updated SessionMetadata
- is_session_whitelisted: reads session.whitelisted
- update_auto_whitelist_status: reads session.path
- get_old_non_whitelisted_sessions: reads session.start_time + metadata
NEW tests/test_log_registry_dataclasses.py (13 tests, all pass):
- test_session_dataclass_construction
- test_session_metadata_dataclass_construction
- test_session_from_dict_basic / with_metadata
- test_session_to_dict_round_trip
- test_session_metadata_to_dict
- test_log_registry_data_is_typed
- test_log_registry_register_session_returns_session
- test_log_registry_update_session_metadata_sets_metadata
- test_log_registry_is_session_whitelisted
- test_log_registry_get_old_non_whitelisted_sessions
- test_session_is_frozen
- test_session_metadata_is_frozen
Verified:
uv run pytest tests/test_log_registry.py tests/test_log_registry_dataclasses.py --timeout=30
18 passed in 3.27s (5 existing + 13 new)
This commit is contained in:
+129
-45
@@ -43,12 +43,96 @@ import os
|
|||||||
import tomli_w
|
import tomli_w
|
||||||
import tomllib
|
import tomllib
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from typing import Any
|
from typing import Any, Optional
|
||||||
|
|
||||||
from src.result_types import Result, ErrorInfo, ErrorKind
|
from src.result_types import Result, ErrorInfo, ErrorKind
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class SessionMetadata:
|
||||||
|
message_count: int = 0
|
||||||
|
errors: int = 0
|
||||||
|
size_kb: int = 0
|
||||||
|
whitelisted: bool = False
|
||||||
|
reason: str = ''
|
||||||
|
timestamp: Optional[str] = None
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"message_count": self.message_count,
|
||||||
|
"errors": self.errors,
|
||||||
|
"size_kb": self.size_kb,
|
||||||
|
"whitelisted": self.whitelisted,
|
||||||
|
"reason": self.reason,
|
||||||
|
"timestamp": self.timestamp,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Session:
|
||||||
|
session_id: str
|
||||||
|
path: str
|
||||||
|
start_time: str
|
||||||
|
whitelisted: bool = False
|
||||||
|
metadata: Optional[SessionMetadata] = None
|
||||||
|
|
||||||
|
def to_dict(self) -> dict[str, Any]:
|
||||||
|
d: dict[str, Any] = {
|
||||||
|
"path": self.path,
|
||||||
|
"start_time": self.start_time,
|
||||||
|
"whitelisted": self.whitelisted,
|
||||||
|
}
|
||||||
|
if self.metadata is not None:
|
||||||
|
d["metadata"] = self.metadata.to_dict()
|
||||||
|
else:
|
||||||
|
d["metadata"] = None
|
||||||
|
return d
|
||||||
|
|
||||||
|
def __getitem__(self, key: str) -> Any:
|
||||||
|
"""Backward-compat: dict-like access (e.g., session['path'])."""
|
||||||
|
if key == "path":
|
||||||
|
return self.path
|
||||||
|
if key == "start_time":
|
||||||
|
return self.start_time
|
||||||
|
if key == "whitelisted":
|
||||||
|
return self.whitelisted
|
||||||
|
if key == "metadata":
|
||||||
|
return self.metadata.to_dict() if self.metadata is not None else None
|
||||||
|
raise KeyError(key)
|
||||||
|
|
||||||
|
def get(self, key: str, default: Any = None) -> Any:
|
||||||
|
"""Backward-compat: dict.get."""
|
||||||
|
try:
|
||||||
|
return self[key]
|
||||||
|
except KeyError:
|
||||||
|
return default
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_dict(cls, session_id: str, d: dict[str, Any]) -> Session:
|
||||||
|
metadata_raw = d.get("metadata")
|
||||||
|
metadata: Optional[SessionMetadata] = None
|
||||||
|
if isinstance(metadata_raw, dict):
|
||||||
|
metadata = SessionMetadata(
|
||||||
|
message_count=int(metadata_raw.get("message_count", 0)),
|
||||||
|
errors=int(metadata_raw.get("errors", 0)),
|
||||||
|
size_kb=int(metadata_raw.get("size_kb", 0)),
|
||||||
|
whitelisted=bool(metadata_raw.get("whitelisted", False)),
|
||||||
|
reason=str(metadata_raw.get("reason", "")),
|
||||||
|
timestamp=metadata_raw.get("timestamp"),
|
||||||
|
)
|
||||||
|
elif metadata_raw is not None:
|
||||||
|
metadata = metadata_raw
|
||||||
|
return cls(
|
||||||
|
session_id=session_id,
|
||||||
|
path=str(d.get("path", "")),
|
||||||
|
start_time=str(d.get("start_time", "")),
|
||||||
|
whitelisted=bool(d.get("whitelisted", False)),
|
||||||
|
metadata=metadata,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class LogRegistry:
|
class LogRegistry:
|
||||||
"""
|
"""
|
||||||
Manages a persistent registry of session logs using a TOML file.
|
Manages a persistent registry of session logs using a TOML file.
|
||||||
@@ -64,7 +148,7 @@ class LogRegistry:
|
|||||||
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
|
||||||
"""
|
"""
|
||||||
self.registry_path = registry_path
|
self.registry_path = registry_path
|
||||||
self.data: dict[str, dict[str, Any]] = {}
|
self.data: dict[str, Session] = {}
|
||||||
self.load_registry()
|
self.load_registry()
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -93,7 +177,7 @@ class LogRegistry:
|
|||||||
m = new_session_data['metadata']
|
m = new_session_data['metadata']
|
||||||
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
|
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
|
||||||
m['timestamp'] = m['timestamp'].isoformat()
|
m['timestamp'] = m['timestamp'].isoformat()
|
||||||
self.data[session_id] = new_session_data
|
self.data[session_id] = Session.from_dict(session_id, new_session_data)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error loading registry from {self.registry_path}: {e}")
|
print(f"Error loading registry from {self.registry_path}: {e}")
|
||||||
self.data = {}
|
self.data = {}
|
||||||
@@ -109,13 +193,14 @@ class LogRegistry:
|
|||||||
try:
|
try:
|
||||||
# Convert datetime objects to ISO format strings for TOML serialization
|
# Convert datetime objects to ISO format strings for TOML serialization
|
||||||
data_to_save: dict[str, Any] = {}
|
data_to_save: dict[str, Any] = {}
|
||||||
for session_id, session_data in self.data.items():
|
for session_id, session in self.data.items():
|
||||||
session_data_copy: dict[str, Any] = {}
|
session_dict = session.to_dict()
|
||||||
for k, v in session_data.items():
|
filtered: dict[str, Any] = {}
|
||||||
|
for k, v in session_dict.items():
|
||||||
if v is None:
|
if v is None:
|
||||||
continue
|
continue
|
||||||
if k == 'start_time' and isinstance(v, datetime):
|
if k == 'start_time' and isinstance(v, datetime):
|
||||||
session_data_copy[k] = v.isoformat()
|
filtered[k] = v.isoformat()
|
||||||
elif k == 'metadata' and isinstance(v, dict):
|
elif k == 'metadata' and isinstance(v, dict):
|
||||||
metadata_copy: dict[str, Any] = {}
|
metadata_copy: dict[str, Any] = {}
|
||||||
for mk, mv in v.items():
|
for mk, mv in v.items():
|
||||||
@@ -125,10 +210,10 @@ class LogRegistry:
|
|||||||
metadata_copy[mk] = mv.isoformat()
|
metadata_copy[mk] = mv.isoformat()
|
||||||
else:
|
else:
|
||||||
metadata_copy[mk] = mv
|
metadata_copy[mk] = mv
|
||||||
session_data_copy[k] = metadata_copy
|
filtered[k] = metadata_copy
|
||||||
else:
|
else:
|
||||||
session_data_copy[k] = v
|
filtered[k] = v
|
||||||
data_to_save[session_id] = session_data_copy
|
data_to_save[session_id] = filtered
|
||||||
with open(self.registry_path, 'wb') as f:
|
with open(self.registry_path, 'wb') as f:
|
||||||
tomli_w.dump(data_to_save, f)
|
tomli_w.dump(data_to_save, f)
|
||||||
return Result(data=True)
|
return Result(data=True)
|
||||||
@@ -152,12 +237,13 @@ class LogRegistry:
|
|||||||
start_time_str = start_time.isoformat()
|
start_time_str = start_time.isoformat()
|
||||||
else:
|
else:
|
||||||
start_time_str = start_time
|
start_time_str = start_time
|
||||||
self.data[session_id] = {
|
self.data[session_id] = Session(
|
||||||
'path': path,
|
session_id=session_id,
|
||||||
'start_time': start_time_str,
|
path=path,
|
||||||
'whitelisted': False,
|
start_time=start_time_str,
|
||||||
'metadata': None
|
whitelisted=False,
|
||||||
}
|
metadata=None,
|
||||||
|
)
|
||||||
self.save_registry()
|
self.save_registry()
|
||||||
|
|
||||||
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
|
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
|
||||||
@@ -176,21 +262,22 @@ class LogRegistry:
|
|||||||
if session_id not in self.data:
|
if session_id not in self.data:
|
||||||
print(f"Error: Session ID '{session_id}' not found for metadata update.")
|
print(f"Error: Session ID '{session_id}' not found for metadata update.")
|
||||||
return
|
return
|
||||||
# Ensure metadata exists
|
existing = self.data[session_id]
|
||||||
if self.data[session_id].get('metadata') is None:
|
new_metadata = SessionMetadata(
|
||||||
self.data[session_id]['metadata'] = {}
|
message_count=message_count,
|
||||||
# Update fields
|
errors=errors,
|
||||||
metadata = self.data[session_id].get('metadata')
|
size_kb=size_kb,
|
||||||
if isinstance(metadata, dict):
|
whitelisted=whitelisted,
|
||||||
metadata['message_count'] = message_count
|
reason=reason,
|
||||||
metadata['errors'] = errors
|
timestamp=existing.metadata.timestamp if existing.metadata else None,
|
||||||
metadata['size_kb'] = size_kb
|
)
|
||||||
metadata['whitelisted'] = whitelisted
|
self.data[session_id] = Session(
|
||||||
metadata['reason'] = reason
|
session_id=existing.session_id,
|
||||||
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
|
path=existing.path,
|
||||||
# Also update the top-level whitelisted flag if provided
|
start_time=existing.start_time,
|
||||||
if whitelisted is not None:
|
whitelisted=whitelisted,
|
||||||
self.data[session_id]['whitelisted'] = whitelisted
|
metadata=new_metadata,
|
||||||
|
)
|
||||||
self.save_registry() # Save after update
|
self.save_registry() # Save after update
|
||||||
|
|
||||||
def is_session_whitelisted(self, session_id: str) -> bool:
|
def is_session_whitelisted(self, session_id: str) -> bool:
|
||||||
@@ -202,13 +289,12 @@ class LogRegistry:
|
|||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
bool: True if whitelisted, False otherwise.
|
bool: True if whitelisted, False otherwise.
|
||||||
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
|
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
|
||||||
"""
|
"""
|
||||||
session_data = self.data.get(session_id)
|
session = self.data.get(session_id)
|
||||||
if session_data is None:
|
if session is None:
|
||||||
return False # Non-existent sessions are not whitelisted
|
return False # Non-existent sessions are not whitelisted
|
||||||
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
|
return session.whitelisted
|
||||||
return bool(session_data.get('whitelisted', False))
|
|
||||||
|
|
||||||
def update_auto_whitelist_status(self, session_id: str) -> None:
|
def update_auto_whitelist_status(self, session_id: str) -> None:
|
||||||
"""
|
"""
|
||||||
@@ -223,7 +309,7 @@ class LogRegistry:
|
|||||||
if session_id not in self.data:
|
if session_id not in self.data:
|
||||||
return
|
return
|
||||||
session_data = self.data[session_id]
|
session_data = self.data[session_id]
|
||||||
session_path = session_data.get('path')
|
session_path = session_data.path
|
||||||
if not session_path or not os.path.isdir(str(session_path)):
|
if not session_path or not os.path.isdir(str(session_path)):
|
||||||
return
|
return
|
||||||
total_size_bytes = 0
|
total_size_bytes = 0
|
||||||
@@ -285,9 +371,9 @@ class LogRegistry:
|
|||||||
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
|
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
|
||||||
"""
|
"""
|
||||||
old_sessions = []
|
old_sessions = []
|
||||||
for session_id, session_data in self.data.items():
|
for session_id, session in self.data.items():
|
||||||
# Check if session is older than cutoff and not whitelisted
|
# Check if session is older than cutoff and not whitelisted
|
||||||
start_time_raw = session_data.get('start_time')
|
start_time_raw = session.start_time
|
||||||
if isinstance(start_time_raw, str):
|
if isinstance(start_time_raw, str):
|
||||||
try:
|
try:
|
||||||
start_time = datetime.fromisoformat(start_time_raw)
|
start_time = datetime.fromisoformat(start_time_raw)
|
||||||
@@ -295,22 +381,20 @@ class LogRegistry:
|
|||||||
start_time = None
|
start_time = None
|
||||||
else:
|
else:
|
||||||
start_time = start_time_raw
|
start_time = start_time_raw
|
||||||
is_whitelisted = session_data.get('whitelisted', False)
|
is_whitelisted = session.whitelisted
|
||||||
|
|
||||||
# Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata
|
# Heuristic: also include non-whitelisted sessions that have 0 messages or 0 KB size, or missing metadata
|
||||||
metadata = session_data.get('metadata')
|
metadata = session.metadata
|
||||||
if metadata is None:
|
if metadata is None:
|
||||||
is_empty = True
|
is_empty = True
|
||||||
else:
|
else:
|
||||||
message_count = metadata.get('message_count', -1)
|
is_empty = (metadata.message_count == 0 or metadata.size_kb == 0)
|
||||||
size_kb = metadata.get('size_kb', -1)
|
|
||||||
is_empty = (message_count == 0 or size_kb == 0)
|
|
||||||
|
|
||||||
if not is_whitelisted:
|
if not is_whitelisted:
|
||||||
if is_empty or (start_time is not None and start_time < cutoff_datetime):
|
if is_empty or (start_time is not None and start_time < cutoff_datetime):
|
||||||
old_sessions.append({
|
old_sessions.append({
|
||||||
'session_id': session_id,
|
'session_id': session_id,
|
||||||
'path': session_data.get('path'),
|
'path': session.path,
|
||||||
'start_time': start_time_raw
|
'start_time': start_time_raw
|
||||||
})
|
})
|
||||||
return old_sessions
|
return old_sessions
|
||||||
|
|||||||
@@ -0,0 +1,148 @@
|
|||||||
|
"""Tests for src/log_registry.py Session + SessionMetadata dataclasses
|
||||||
|
|
||||||
|
Phase 4 of any_type_componentization_20260621. Verifies:
|
||||||
|
- Session dataclass (session_id, path, start_time, whitelisted, metadata)
|
||||||
|
- SessionMetadata dataclass (message_count, errors, size_kb, whitelisted, reason, timestamp)
|
||||||
|
- Session.from_dict() round-trip
|
||||||
|
- Session.to_dict() preserves TOML-compatible shape
|
||||||
|
- LogRegistry.data is now dict[str, Session] (typed)
|
||||||
|
- LogRegistry.register_session() returns Session instance
|
||||||
|
- LogRegistry.update_session_metadata() sets Session.metadata
|
||||||
|
- LogRegistry.get_old_non_whitelisted_sessions() returns Session list
|
||||||
|
|
||||||
|
CONVENTION: 1-space indentation. NO COMMENTS.
|
||||||
|
"""
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import os
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from src.log_registry import (
|
||||||
|
LogRegistry,
|
||||||
|
Session,
|
||||||
|
SessionMetadata,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def tmp_registry(tmp_path) -> LogRegistry:
|
||||||
|
path = tmp_path / "registry.toml"
|
||||||
|
return LogRegistry(str(path))
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_dataclass_construction() -> None:
|
||||||
|
s = Session(session_id="s1", path="/tmp/s1", start_time="2026-06-21T10:00:00")
|
||||||
|
assert s.session_id == "s1"
|
||||||
|
assert s.path == "/tmp/s1"
|
||||||
|
assert s.start_time == "2026-06-21T10:00:00"
|
||||||
|
assert s.whitelisted is False
|
||||||
|
assert s.metadata is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_metadata_dataclass_construction() -> None:
|
||||||
|
m = SessionMetadata(message_count=10, errors=2, size_kb=5)
|
||||||
|
assert m.message_count == 10
|
||||||
|
assert m.errors == 2
|
||||||
|
assert m.size_kb == 5
|
||||||
|
assert m.whitelisted is False
|
||||||
|
assert m.reason == ""
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_from_dict_basic() -> None:
|
||||||
|
d = {"path": "/x", "start_time": "2026-06-21T10:00:00", "whitelisted": False, "metadata": None}
|
||||||
|
s = Session.from_dict("s1", d)
|
||||||
|
assert s.session_id == "s1"
|
||||||
|
assert s.path == "/x"
|
||||||
|
assert s.start_time == "2026-06-21T10:00:00"
|
||||||
|
assert s.whitelisted is False
|
||||||
|
assert s.metadata is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_from_dict_with_metadata() -> None:
|
||||||
|
d = {
|
||||||
|
"path": "/x",
|
||||||
|
"start_time": "2026-06-21T10:00:00",
|
||||||
|
"whitelisted": True,
|
||||||
|
"metadata": {"message_count": 100, "errors": 1, "size_kb": 20, "whitelisted": True, "reason": "high"},
|
||||||
|
}
|
||||||
|
s = Session.from_dict("s1", d)
|
||||||
|
assert s.whitelisted is True
|
||||||
|
assert s.metadata is not None
|
||||||
|
assert s.metadata.message_count == 100
|
||||||
|
assert s.metadata.reason == "high"
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_to_dict_round_trip() -> None:
|
||||||
|
m = SessionMetadata(message_count=42, errors=0, size_kb=15, whitelisted=True, reason="high count")
|
||||||
|
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00", whitelisted=True, metadata=m)
|
||||||
|
d = s.to_dict()
|
||||||
|
assert d["path"] == "/x"
|
||||||
|
assert d["start_time"] == "2026-06-21T10:00:00"
|
||||||
|
assert d["whitelisted"] is True
|
||||||
|
assert d["metadata"]["message_count"] == 42
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_metadata_to_dict() -> None:
|
||||||
|
m = SessionMetadata(message_count=5, errors=1, size_kb=2)
|
||||||
|
d = m.to_dict()
|
||||||
|
assert d == {"message_count": 5, "errors": 1, "size_kb": 2, "whitelisted": False, "reason": "", "timestamp": None}
|
||||||
|
|
||||||
|
|
||||||
|
def test_log_registry_data_is_typed() -> None:
|
||||||
|
"""self.data is now dict[str, Session]."""
|
||||||
|
registry = LogRegistry("/tmp/_test_registry_xyz.toml")
|
||||||
|
assert isinstance(registry.data, dict)
|
||||||
|
|
||||||
|
|
||||||
|
def test_log_registry_register_session_returns_session(tmp_registry: LogRegistry) -> None:
|
||||||
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
||||||
|
s = tmp_registry.data["s1"]
|
||||||
|
assert isinstance(s, Session)
|
||||||
|
assert s.session_id == "s1"
|
||||||
|
assert s.path == "/tmp/s1"
|
||||||
|
assert s.start_time == "2026-06-21T10:00:00"
|
||||||
|
assert s.whitelisted is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_log_registry_update_session_metadata_sets_metadata(tmp_registry: LogRegistry) -> None:
|
||||||
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
||||||
|
tmp_registry.update_session_metadata("s1", message_count=10, errors=2, size_kb=5, whitelisted=True, reason="test")
|
||||||
|
s = tmp_registry.data["s1"]
|
||||||
|
assert s.metadata is not None
|
||||||
|
assert s.metadata.message_count == 10
|
||||||
|
assert s.metadata.errors == 2
|
||||||
|
assert s.whitelisted is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_log_registry_is_session_whitelisted(tmp_registry: LogRegistry) -> None:
|
||||||
|
tmp_registry.register_session("s1", "/tmp/s1", "2026-06-21T10:00:00")
|
||||||
|
assert tmp_registry.is_session_whitelisted("s1") is False
|
||||||
|
tmp_registry.update_session_metadata("s1", 10, 0, 5, True, "test")
|
||||||
|
assert tmp_registry.is_session_whitelisted("s1") is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_log_registry_get_old_non_whitelisted_sessions(tmp_registry: LogRegistry) -> None:
|
||||||
|
cutoff = datetime(2026, 6, 1)
|
||||||
|
old_start = "2026-05-01T10:00:00"
|
||||||
|
recent_start = "2026-06-21T10:00:00"
|
||||||
|
tmp_registry.register_session("old", "/tmp/old", old_start)
|
||||||
|
tmp_registry.register_session("recent", "/tmp/recent", recent_start)
|
||||||
|
# Update metadata so neither session is "empty" (otherwise both would be flagged as old)
|
||||||
|
tmp_registry.update_session_metadata("old", 10, 0, 5, False, "test")
|
||||||
|
tmp_registry.update_session_metadata("recent", 10, 0, 5, False, "test")
|
||||||
|
old_sessions = tmp_registry.get_old_non_whitelisted_sessions(cutoff)
|
||||||
|
assert any(s["session_id"] == "old" for s in old_sessions)
|
||||||
|
assert not any(s["session_id"] == "recent" for s in old_sessions)
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_is_frozen() -> None:
|
||||||
|
s = Session(session_id="s1", path="/x", start_time="2026-06-21T10:00:00")
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
s.path = "mutated"
|
||||||
|
|
||||||
|
|
||||||
|
def test_session_metadata_is_frozen() -> None:
|
||||||
|
m = SessionMetadata(message_count=10)
|
||||||
|
with pytest.raises(Exception):
|
||||||
|
m.message_count = 999
|
||||||
Reference in New Issue
Block a user