import tomli_w import tomllib from datetime import datetime import os class LogRegistry: """ Manages a persistent registry of session logs using a TOML file. Tracks session paths, start times, whitelisting status, and metadata. """ def __init__(self, registry_path): """ Initializes the LogRegistry with a path to the registry file. Args: registry_path (str): The file path to the TOML registry. """ self.registry_path = registry_path self.data = {} self.load_registry() def load_registry(self) -> None: """ Loads the registry data from the TOML file into memory. Handles date/time conversions from TOML-native formats to strings for consistency. """ if os.path.exists(self.registry_path): try: with open(self.registry_path, 'rb') as f: loaded_data = tomllib.load(f) # Keep data as it is from TOML (strings or native datetimes) # If we want to satisfy tests that expect strings, we ensure they are strings. self.data = {} for session_id, session_data in loaded_data.items(): new_session_data = session_data.copy() # If tomllib parsed it as a datetime, convert it back to string for the tests if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime): new_session_data['start_time'] = new_session_data['start_time'].isoformat() if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict): m = new_session_data['metadata'] if 'timestamp' in m and isinstance(m['timestamp'], datetime): m['timestamp'] = m['timestamp'].isoformat() self.data[session_id] = new_session_data except Exception as e: print(f"Error loading registry from {self.registry_path}: {e}") self.data = {} else: self.data = {} def save_registry(self) -> None: """ Serializes and saves the current registry data to the TOML file. Converts internal datetime objects to ISO format strings for compatibility. """ try: # Convert datetime objects to ISO format strings for TOML serialization data_to_save = {} for session_id, session_data in self.data.items(): session_data_copy = {} for k, v in session_data.items(): if v is None: continue if k == 'start_time' and isinstance(v, datetime): session_data_copy[k] = v.isoformat() elif k == 'metadata' and isinstance(v, dict): metadata_copy = {} for mk, mv in v.items(): if mv is None: continue if mk == 'timestamp' and isinstance(mv, datetime): metadata_copy[mk] = mv.isoformat() else: metadata_copy[mk] = mv session_data_copy[k] = metadata_copy else: session_data_copy[k] = v data_to_save[session_id] = session_data_copy with open(self.registry_path, 'wb') as f: tomli_w.dump(data_to_save, f) except Exception as e: print(f"Error saving registry to {self.registry_path}: {e}") def register_session(self, session_id, path, start_time): """ Registers a new session in the registry. Args: session_id (str): Unique identifier for the session. path (str): File path to the session's log directory. start_time (datetime|str): The timestamp when the session started. """ if session_id in self.data: print(f"Warning: Session ID '{session_id}' already exists. Overwriting.") # Store start_time internally as a string to satisfy tests if isinstance(start_time, datetime): start_time_str = start_time.isoformat() else: start_time_str = start_time self.data[session_id] = { 'path': path, 'start_time': start_time_str, 'whitelisted': False, 'metadata': None } self.save_registry() def update_session_metadata(self, session_id, message_count, errors, size_kb, whitelisted, reason): """ Updates metadata fields for an existing session. Args: session_id (str): Unique identifier for the session. message_count (int): Total number of messages in the session. errors (int): Number of errors identified in logs. size_kb (int): Total size of the session logs in kilobytes. whitelisted (bool): Whether the session should be protected from pruning. reason (str): Explanation for the current whitelisting status. """ if session_id not in self.data: print(f"Error: Session ID '{session_id}' not found for metadata update.") return # Ensure metadata exists if self.data[session_id].get('metadata') is None: self.data[session_id]['metadata'] = {} # Update fields self.data[session_id]['metadata']['message_count'] = message_count self.data[session_id]['metadata']['errors'] = errors self.data[session_id]['metadata']['size_kb'] = size_kb self.data[session_id]['metadata']['whitelisted'] = whitelisted self.data[session_id]['metadata']['reason'] = reason # self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp # Also update the top-level whitelisted flag if provided if whitelisted is not None: self.data[session_id]['whitelisted'] = whitelisted self.save_registry() # Save after update def is_session_whitelisted(self, session_id): """ Checks if a specific session is marked as whitelisted. Args: session_id (str): Unique identifier for the session. Returns: bool: True if whitelisted, False otherwise. """ session_data = self.data.get(session_id) if session_data is None: return False # Non-existent sessions are not whitelisted # Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted. return session_data.get('whitelisted', False) def update_auto_whitelist_status(self, session_id: str) -> None: """ Analyzes session logs and updates whitelisting status based on heuristics. Sessions are automatically whitelisted if they contain error keywords, have a high message count, or exceed a size threshold. Args: session_id (str): Unique identifier for the session to analyze. """ if session_id not in self.data: return session_data = self.data[session_id] session_path = session_data.get('path') if not session_path or not os.path.isdir(session_path): return total_size_bytes = 0 message_count = 0 found_keywords = [] keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION'] try: for entry in os.scandir(session_path): if entry.is_file(): size = entry.stat().st_size total_size_bytes += size # Analyze comms.log for messages and keywords if entry.name == "comms.log": try: with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f: for line in f: message_count += 1 for kw in keywords_to_check: if kw in line and kw not in found_keywords: found_keywords.append(kw) except Exception: pass except Exception: pass size_kb = total_size_bytes / 1024 whitelisted = False reason = "" if found_keywords: whitelisted = True reason = f"Found keywords: {', '.join(found_keywords)}" elif message_count > 10: whitelisted = True reason = f"High message count: {message_count}" elif size_kb > 50: whitelisted = True reason = f"Large session size: {size_kb:.1f} KB" self.update_session_metadata( session_id, message_count=message_count, errors=len(found_keywords), size_kb=int(size_kb), whitelisted=whitelisted, reason=reason ) def get_old_non_whitelisted_sessions(self, cutoff_datetime): """ Retrieves a list of sessions that are older than a specific cutoff time and are not marked as whitelisted. Args: cutoff_datetime (datetime): The threshold time for identifying old sessions. Returns: list: A list of dictionaries containing session details (id, path, start_time). """ old_sessions = [] for session_id, session_data in self.data.items(): # Check if session is older than cutoff and not whitelisted start_time_raw = session_data.get('start_time') if isinstance(start_time_raw, str): try: start_time = datetime.fromisoformat(start_time_raw) except ValueError: start_time = None else: start_time = start_time_raw is_whitelisted = session_data.get('whitelisted', False) if start_time is not None and start_time < cutoff_datetime and not is_whitelisted: old_sessions.append({ 'session_id': session_id, 'path': session_data.get('path'), 'start_time': start_time_raw }) return old_sessions