Files
manual_slop/log_registry.py

143 lines
6.3 KiB
Python

import tomli_w
import tomllib
from datetime import datetime
import os
class LogRegistry:
def __init__(self, registry_path):
self.registry_path = registry_path
self.data = {}
self.load_registry()
def load_registry(self):
"""Loads the registry data from the TOML file."""
if os.path.exists(self.registry_path):
try:
with open(self.registry_path, 'rb') as f:
loaded_data = tomllib.load(f)
# Keep data as it is from TOML (strings or native datetimes)
# If we want to satisfy tests that expect strings, we ensure they are strings.
self.data = {}
for session_id, session_data in loaded_data.items():
new_session_data = session_data.copy()
# If tomllib parsed it as a datetime, convert it back to string for the tests
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
m = new_session_data['metadata']
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
m['timestamp'] = m['timestamp'].isoformat()
self.data[session_id] = new_session_data
except Exception as e:
print(f"Error loading registry from {self.registry_path}: {e}")
self.data = {}
else:
self.data = {}
def save_registry(self):
"""Saves the current registry data to the TOML file."""
try:
# Convert datetime objects to ISO format strings for TOML serialization
data_to_save = {}
for session_id, session_data in self.data.items():
session_data_copy = {}
for k, v in session_data.items():
if v is None:
continue
if k == 'start_time' and isinstance(v, datetime):
session_data_copy[k] = v.isoformat()
elif k == 'metadata' and isinstance(v, dict):
metadata_copy = {}
for mk, mv in v.items():
if mv is None:
continue
if mk == 'timestamp' and isinstance(mv, datetime):
metadata_copy[mk] = mv.isoformat()
else:
metadata_copy[mk] = mv
session_data_copy[k] = metadata_copy
else:
session_data_copy[k] = v
data_to_save[session_id] = session_data_copy
with open(self.registry_path, 'wb') as f:
tomli_w.dump(data_to_save, f)
except Exception as e:
print(f"Error saving registry to {self.registry_path}: {e}")
def register_session(self, session_id, path, start_time):
"""Registers a new session."""
if session_id in self.data:
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
# Store start_time internally as a string to satisfy tests
if isinstance(start_time, datetime):
start_time_str = start_time.isoformat()
else:
start_time_str = start_time
self.data[session_id] = {
'path': path,
'start_time': start_time_str,
'whitelisted': False,
'metadata': None
}
self.save_registry()
def update_session_metadata(self, session_id, message_count, errors, size_kb, whitelisted, reason):
"""Updates metadata for an existing session."""
if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for metadata update.")
return
# Ensure metadata exists
if self.data[session_id].get('metadata') is None:
self.data[session_id]['metadata'] = {}
# Update fields
self.data[session_id]['metadata']['message_count'] = message_count
self.data[session_id]['metadata']['errors'] = errors
self.data[session_id]['metadata']['size_kb'] = size_kb
self.data[session_id]['metadata']['whitelisted'] = whitelisted
self.data[session_id]['metadata']['reason'] = reason
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
# Also update the top-level whitelisted flag if provided
if whitelisted is not None:
self.data[session_id]['whitelisted'] = whitelisted
self.save_registry() # Save after update
def is_session_whitelisted(self, session_id):
"""Checks if a session is whitelisted."""
session_data = self.data.get(session_id)
if session_data is None:
return False # Non-existent sessions are not whitelisted
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
return session_data.get('whitelisted', False)
def get_old_non_whitelisted_sessions(self, cutoff_datetime):
"""Gets sessions older than cutoff_datetime and not whitelisted."""
old_sessions = []
for session_id, session_data in self.data.items():
# Check if session is older than cutoff and not whitelisted
start_time_raw = session_data.get('start_time')
if isinstance(start_time_raw, str):
try:
start_time = datetime.fromisoformat(start_time_raw)
except ValueError:
start_time = None
else:
start_time = start_time_raw
is_whitelisted = session_data.get('whitelisted', False)
if start_time is not None and start_time < cutoff_datetime and not is_whitelisted:
old_sessions.append({
'session_id': session_id,
'path': session_data.get('path'),
'start_time': start_time_raw
})
return old_sessions