feat(logging): Implement LogRegistry for managing session metadata
This commit is contained in:
142
log_registry.py
Normal file
142
log_registry.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import tomli_w
|
||||
import tomllib
|
||||
from datetime import datetime
|
||||
import os
|
||||
|
||||
class LogRegistry:
|
||||
def __init__(self, registry_path):
|
||||
self.registry_path = registry_path
|
||||
self.data = {}
|
||||
self.load_registry()
|
||||
|
||||
def load_registry(self):
|
||||
"""Loads the registry data from the TOML file."""
|
||||
if os.path.exists(self.registry_path):
|
||||
try:
|
||||
with open(self.registry_path, 'rb') as f:
|
||||
loaded_data = tomllib.load(f)
|
||||
# Keep data as it is from TOML (strings or native datetimes)
|
||||
# If we want to satisfy tests that expect strings, we ensure they are strings.
|
||||
self.data = {}
|
||||
for session_id, session_data in loaded_data.items():
|
||||
new_session_data = session_data.copy()
|
||||
# If tomllib parsed it as a datetime, convert it back to string for the tests
|
||||
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
|
||||
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
|
||||
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
|
||||
m = new_session_data['metadata']
|
||||
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
|
||||
m['timestamp'] = m['timestamp'].isoformat()
|
||||
self.data[session_id] = new_session_data
|
||||
except Exception as e:
|
||||
print(f"Error loading registry from {self.registry_path}: {e}")
|
||||
self.data = {}
|
||||
else:
|
||||
self.data = {}
|
||||
|
||||
def save_registry(self):
|
||||
"""Saves the current registry data to the TOML file."""
|
||||
try:
|
||||
# Convert datetime objects to ISO format strings for TOML serialization
|
||||
data_to_save = {}
|
||||
for session_id, session_data in self.data.items():
|
||||
session_data_copy = {}
|
||||
for k, v in session_data.items():
|
||||
if v is None:
|
||||
continue
|
||||
if k == 'start_time' and isinstance(v, datetime):
|
||||
session_data_copy[k] = v.isoformat()
|
||||
elif k == 'metadata' and isinstance(v, dict):
|
||||
metadata_copy = {}
|
||||
for mk, mv in v.items():
|
||||
if mv is None:
|
||||
continue
|
||||
if mk == 'timestamp' and isinstance(mv, datetime):
|
||||
metadata_copy[mk] = mv.isoformat()
|
||||
else:
|
||||
metadata_copy[mk] = mv
|
||||
session_data_copy[k] = metadata_copy
|
||||
else:
|
||||
session_data_copy[k] = v
|
||||
data_to_save[session_id] = session_data_copy
|
||||
|
||||
with open(self.registry_path, 'wb') as f:
|
||||
tomli_w.dump(data_to_save, f)
|
||||
except Exception as e:
|
||||
print(f"Error saving registry to {self.registry_path}: {e}")
|
||||
|
||||
def register_session(self, session_id, path, start_time):
|
||||
"""Registers a new session."""
|
||||
if session_id in self.data:
|
||||
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
|
||||
|
||||
# Store start_time internally as a string to satisfy tests
|
||||
if isinstance(start_time, datetime):
|
||||
start_time_str = start_time.isoformat()
|
||||
else:
|
||||
start_time_str = start_time
|
||||
|
||||
self.data[session_id] = {
|
||||
'path': path,
|
||||
'start_time': start_time_str,
|
||||
'whitelisted': False,
|
||||
'metadata': None
|
||||
}
|
||||
self.save_registry()
|
||||
|
||||
def update_session_metadata(self, session_id, message_count, errors, size_kb, whitelisted, reason):
|
||||
"""Updates metadata for an existing session."""
|
||||
if session_id not in self.data:
|
||||
print(f"Error: Session ID '{session_id}' not found for metadata update.")
|
||||
return
|
||||
|
||||
# Ensure metadata exists
|
||||
if self.data[session_id].get('metadata') is None:
|
||||
self.data[session_id]['metadata'] = {}
|
||||
|
||||
# Update fields
|
||||
self.data[session_id]['metadata']['message_count'] = message_count
|
||||
self.data[session_id]['metadata']['errors'] = errors
|
||||
self.data[session_id]['metadata']['size_kb'] = size_kb
|
||||
self.data[session_id]['metadata']['whitelisted'] = whitelisted
|
||||
self.data[session_id]['metadata']['reason'] = reason
|
||||
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
|
||||
|
||||
# Also update the top-level whitelisted flag if provided
|
||||
if whitelisted is not None:
|
||||
self.data[session_id]['whitelisted'] = whitelisted
|
||||
|
||||
self.save_registry() # Save after update
|
||||
|
||||
def is_session_whitelisted(self, session_id):
|
||||
"""Checks if a session is whitelisted."""
|
||||
session_data = self.data.get(session_id)
|
||||
if session_data is None:
|
||||
return False # Non-existent sessions are not whitelisted
|
||||
|
||||
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
|
||||
return session_data.get('whitelisted', False)
|
||||
|
||||
def get_old_non_whitelisted_sessions(self, cutoff_datetime):
|
||||
"""Gets sessions older than cutoff_datetime and not whitelisted."""
|
||||
old_sessions = []
|
||||
for session_id, session_data in self.data.items():
|
||||
# Check if session is older than cutoff and not whitelisted
|
||||
start_time_raw = session_data.get('start_time')
|
||||
if isinstance(start_time_raw, str):
|
||||
try:
|
||||
start_time = datetime.fromisoformat(start_time_raw)
|
||||
except ValueError:
|
||||
start_time = None
|
||||
else:
|
||||
start_time = start_time_raw
|
||||
|
||||
is_whitelisted = session_data.get('whitelisted', False)
|
||||
|
||||
if start_time is not None and start_time < cutoff_datetime and not is_whitelisted:
|
||||
old_sessions.append({
|
||||
'session_id': session_id,
|
||||
'path': session_data.get('path'),
|
||||
'start_time': start_time_raw
|
||||
})
|
||||
return old_sessions
|
||||
Reference in New Issue
Block a user