258 lines
10 KiB
Python
258 lines
10 KiB
Python
import tomli_w
|
|
import tomllib
|
|
from datetime import datetime
|
|
import os
|
|
|
|
class LogRegistry:
|
|
"""
|
|
Manages a persistent registry of session logs using a TOML file.
|
|
Tracks session paths, start times, whitelisting status, and metadata.
|
|
"""
|
|
def __init__(self, registry_path):
|
|
"""
|
|
Initializes the LogRegistry with a path to the registry file.
|
|
|
|
Args:
|
|
registry_path (str): The file path to the TOML registry.
|
|
"""
|
|
self.registry_path = registry_path
|
|
self.data = {}
|
|
self.load_registry()
|
|
|
|
def load_registry(self):
|
|
"""
|
|
Loads the registry data from the TOML file into memory.
|
|
Handles date/time conversions from TOML-native formats to strings for consistency.
|
|
"""
|
|
if os.path.exists(self.registry_path):
|
|
try:
|
|
with open(self.registry_path, 'rb') as f:
|
|
loaded_data = tomllib.load(f)
|
|
# Keep data as it is from TOML (strings or native datetimes)
|
|
# If we want to satisfy tests that expect strings, we ensure they are strings.
|
|
self.data = {}
|
|
for session_id, session_data in loaded_data.items():
|
|
new_session_data = session_data.copy()
|
|
# If tomllib parsed it as a datetime, convert it back to string for the tests
|
|
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
|
|
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
|
|
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
|
|
m = new_session_data['metadata']
|
|
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
|
|
m['timestamp'] = m['timestamp'].isoformat()
|
|
self.data[session_id] = new_session_data
|
|
except Exception as e:
|
|
print(f"Error loading registry from {self.registry_path}: {e}")
|
|
self.data = {}
|
|
else:
|
|
self.data = {}
|
|
|
|
def save_registry(self):
|
|
"""
|
|
Serializes and saves the current registry data to the TOML file.
|
|
Converts internal datetime objects to ISO format strings for compatibility.
|
|
"""
|
|
try:
|
|
# Convert datetime objects to ISO format strings for TOML serialization
|
|
data_to_save = {}
|
|
for session_id, session_data in self.data.items():
|
|
session_data_copy = {}
|
|
for k, v in session_data.items():
|
|
if v is None:
|
|
continue
|
|
if k == 'start_time' and isinstance(v, datetime):
|
|
session_data_copy[k] = v.isoformat()
|
|
elif k == 'metadata' and isinstance(v, dict):
|
|
metadata_copy = {}
|
|
for mk, mv in v.items():
|
|
if mv is None:
|
|
continue
|
|
if mk == 'timestamp' and isinstance(mv, datetime):
|
|
metadata_copy[mk] = mv.isoformat()
|
|
else:
|
|
metadata_copy[mk] = mv
|
|
session_data_copy[k] = metadata_copy
|
|
else:
|
|
session_data_copy[k] = v
|
|
data_to_save[session_id] = session_data_copy
|
|
|
|
with open(self.registry_path, 'wb') as f:
|
|
tomli_w.dump(data_to_save, f)
|
|
except Exception as e:
|
|
print(f"Error saving registry to {self.registry_path}: {e}")
|
|
|
|
def register_session(self, session_id, path, start_time):
|
|
"""
|
|
Registers a new session in the registry.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
path (str): File path to the session's log directory.
|
|
start_time (datetime|str): The timestamp when the session started.
|
|
"""
|
|
if session_id in self.data:
|
|
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
|
|
|
|
# Store start_time internally as a string to satisfy tests
|
|
if isinstance(start_time, datetime):
|
|
start_time_str = start_time.isoformat()
|
|
else:
|
|
start_time_str = start_time
|
|
|
|
self.data[session_id] = {
|
|
'path': path,
|
|
'start_time': start_time_str,
|
|
'whitelisted': False,
|
|
'metadata': None
|
|
}
|
|
self.save_registry()
|
|
|
|
def update_session_metadata(self, session_id, message_count, errors, size_kb, whitelisted, reason):
|
|
"""
|
|
Updates metadata fields for an existing session.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
message_count (int): Total number of messages in the session.
|
|
errors (int): Number of errors identified in logs.
|
|
size_kb (int): Total size of the session logs in kilobytes.
|
|
whitelisted (bool): Whether the session should be protected from pruning.
|
|
reason (str): Explanation for the current whitelisting status.
|
|
"""
|
|
if session_id not in self.data:
|
|
print(f"Error: Session ID '{session_id}' not found for metadata update.")
|
|
return
|
|
|
|
# Ensure metadata exists
|
|
if self.data[session_id].get('metadata') is None:
|
|
self.data[session_id]['metadata'] = {}
|
|
|
|
# Update fields
|
|
self.data[session_id]['metadata']['message_count'] = message_count
|
|
self.data[session_id]['metadata']['errors'] = errors
|
|
self.data[session_id]['metadata']['size_kb'] = size_kb
|
|
self.data[session_id]['metadata']['whitelisted'] = whitelisted
|
|
self.data[session_id]['metadata']['reason'] = reason
|
|
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
|
|
|
|
# Also update the top-level whitelisted flag if provided
|
|
if whitelisted is not None:
|
|
self.data[session_id]['whitelisted'] = whitelisted
|
|
|
|
self.save_registry() # Save after update
|
|
|
|
def is_session_whitelisted(self, session_id):
|
|
"""
|
|
Checks if a specific session is marked as whitelisted.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session.
|
|
|
|
Returns:
|
|
bool: True if whitelisted, False otherwise.
|
|
"""
|
|
session_data = self.data.get(session_id)
|
|
if session_data is None:
|
|
return False # Non-existent sessions are not whitelisted
|
|
|
|
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
|
|
return session_data.get('whitelisted', False)
|
|
|
|
def update_auto_whitelist_status(self, session_id: str):
|
|
"""
|
|
Analyzes session logs and updates whitelisting status based on heuristics.
|
|
Sessions are automatically whitelisted if they contain error keywords,
|
|
have a high message count, or exceed a size threshold.
|
|
|
|
Args:
|
|
session_id (str): Unique identifier for the session to analyze.
|
|
"""
|
|
if session_id not in self.data:
|
|
return
|
|
|
|
session_data = self.data[session_id]
|
|
session_path = session_data.get('path')
|
|
if not session_path or not os.path.isdir(session_path):
|
|
return
|
|
|
|
total_size_bytes = 0
|
|
message_count = 0
|
|
found_keywords = []
|
|
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
|
|
|
|
try:
|
|
for entry in os.scandir(session_path):
|
|
if entry.is_file():
|
|
size = entry.stat().st_size
|
|
total_size_bytes += size
|
|
|
|
# Analyze comms.log for messages and keywords
|
|
if entry.name == "comms.log":
|
|
try:
|
|
with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
for line in f:
|
|
message_count += 1
|
|
for kw in keywords_to_check:
|
|
if kw in line and kw not in found_keywords:
|
|
found_keywords.append(kw)
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
size_kb = total_size_bytes / 1024
|
|
whitelisted = False
|
|
reason = ""
|
|
|
|
if found_keywords:
|
|
whitelisted = True
|
|
reason = f"Found keywords: {', '.join(found_keywords)}"
|
|
elif message_count > 10:
|
|
whitelisted = True
|
|
reason = f"High message count: {message_count}"
|
|
elif size_kb > 50:
|
|
whitelisted = True
|
|
reason = f"Large session size: {size_kb:.1f} KB"
|
|
|
|
self.update_session_metadata(
|
|
session_id,
|
|
message_count=message_count,
|
|
errors=len(found_keywords),
|
|
size_kb=int(size_kb),
|
|
whitelisted=whitelisted,
|
|
reason=reason
|
|
)
|
|
|
|
def get_old_non_whitelisted_sessions(self, cutoff_datetime):
|
|
"""
|
|
Retrieves a list of sessions that are older than a specific cutoff time
|
|
and are not marked as whitelisted.
|
|
|
|
Args:
|
|
cutoff_datetime (datetime): The threshold time for identifying old sessions.
|
|
|
|
Returns:
|
|
list: A list of dictionaries containing session details (id, path, start_time).
|
|
"""
|
|
old_sessions = []
|
|
for session_id, session_data in self.data.items():
|
|
# Check if session is older than cutoff and not whitelisted
|
|
start_time_raw = session_data.get('start_time')
|
|
if isinstance(start_time_raw, str):
|
|
try:
|
|
start_time = datetime.fromisoformat(start_time_raw)
|
|
except ValueError:
|
|
start_time = None
|
|
else:
|
|
start_time = start_time_raw
|
|
|
|
is_whitelisted = session_data.get('whitelisted', False)
|
|
|
|
if start_time is not None and start_time < cutoff_datetime and not is_whitelisted:
|
|
old_sessions.append({
|
|
'session_id': session_id,
|
|
'path': session_data.get('path'),
|
|
'start_time': start_time_raw
|
|
})
|
|
return old_sessions
|