feat(types): Complete strict static analysis and typing track
This commit is contained in:
@@ -3,6 +3,7 @@ import tomli_w
|
||||
import tomllib
|
||||
from datetime import datetime
|
||||
import os
|
||||
from typing import Any
|
||||
|
||||
class LogRegistry:
|
||||
"""
|
||||
@@ -18,7 +19,7 @@ class LogRegistry:
|
||||
registry_path (str): The file path to the TOML registry.
|
||||
"""
|
||||
self.registry_path = registry_path
|
||||
self.data = {}
|
||||
self.data: dict[str, dict[str, Any]] = {}
|
||||
self.load_registry()
|
||||
|
||||
def load_registry(self) -> None:
|
||||
@@ -56,16 +57,16 @@ class LogRegistry:
|
||||
"""
|
||||
try:
|
||||
# Convert datetime objects to ISO format strings for TOML serialization
|
||||
data_to_save = {}
|
||||
data_to_save: dict[str, Any] = {}
|
||||
for session_id, session_data in self.data.items():
|
||||
session_data_copy = {}
|
||||
session_data_copy: dict[str, Any] = {}
|
||||
for k, v in session_data.items():
|
||||
if v is None:
|
||||
continue
|
||||
if k == 'start_time' and isinstance(v, datetime):
|
||||
session_data_copy[k] = v.isoformat()
|
||||
elif k == 'metadata' and isinstance(v, dict):
|
||||
metadata_copy = {}
|
||||
metadata_copy: dict[str, Any] = {}
|
||||
for mk, mv in v.items():
|
||||
if mv is None:
|
||||
continue
|
||||
@@ -125,11 +126,13 @@ class LogRegistry:
|
||||
if self.data[session_id].get('metadata') is None:
|
||||
self.data[session_id]['metadata'] = {}
|
||||
# Update fields
|
||||
self.data[session_id]['metadata']['message_count'] = message_count
|
||||
self.data[session_id]['metadata']['errors'] = errors
|
||||
self.data[session_id]['metadata']['size_kb'] = size_kb
|
||||
self.data[session_id]['metadata']['whitelisted'] = whitelisted
|
||||
self.data[session_id]['metadata']['reason'] = reason
|
||||
metadata = self.data[session_id].get('metadata')
|
||||
if isinstance(metadata, dict):
|
||||
metadata['message_count'] = message_count
|
||||
metadata['errors'] = errors
|
||||
metadata['size_kb'] = size_kb
|
||||
metadata['whitelisted'] = whitelisted
|
||||
metadata['reason'] = reason
|
||||
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
|
||||
# Also update the top-level whitelisted flag if provided
|
||||
if whitelisted is not None:
|
||||
@@ -150,7 +153,7 @@ class LogRegistry:
|
||||
if session_data is None:
|
||||
return False # Non-existent sessions are not whitelisted
|
||||
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
|
||||
return session_data.get('whitelisted', False)
|
||||
return bool(session_data.get('whitelisted', False))
|
||||
|
||||
def update_auto_whitelist_status(self, session_id: str) -> None:
|
||||
"""
|
||||
@@ -165,14 +168,14 @@ class LogRegistry:
|
||||
return
|
||||
session_data = self.data[session_id]
|
||||
session_path = session_data.get('path')
|
||||
if not session_path or not os.path.isdir(session_path):
|
||||
if not session_path or not os.path.isdir(str(session_path)):
|
||||
return
|
||||
total_size_bytes = 0
|
||||
message_count = 0
|
||||
found_keywords = []
|
||||
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
|
||||
try:
|
||||
for entry in os.scandir(session_path):
|
||||
for entry in os.scandir(str(session_path)):
|
||||
if entry.is_file():
|
||||
size = entry.stat().st_size
|
||||
total_size_bytes += size
|
||||
@@ -210,7 +213,7 @@ class LogRegistry:
|
||||
reason=reason
|
||||
)
|
||||
|
||||
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict]:
|
||||
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]:
|
||||
"""
|
||||
Retrieves a list of sessions that are older than a specific cutoff time
|
||||
and are not marked as whitelisted.
|
||||
@@ -240,3 +243,4 @@ class LogRegistry:
|
||||
'start_time': start_time_raw
|
||||
})
|
||||
return old_sessions
|
||||
|
||||
|
||||
Reference in New Issue
Block a user