feat(logging): Implement auto-whitelisting heuristics for log sessions
This commit is contained in:
@@ -117,6 +117,66 @@ class LogRegistry:
|
||||
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
|
||||
return session_data.get('whitelisted', False)
|
||||
|
||||
def update_auto_whitelist_status(self, session_id: str):
|
||||
"""
|
||||
Analyzes session logs and updates whitelisting status based on heuristics.
|
||||
"""
|
||||
if session_id not in self.data:
|
||||
return
|
||||
|
||||
session_data = self.data[session_id]
|
||||
session_path = session_data.get('path')
|
||||
if not session_path or not os.path.isdir(session_path):
|
||||
return
|
||||
|
||||
total_size_bytes = 0
|
||||
message_count = 0
|
||||
found_keywords = []
|
||||
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
|
||||
|
||||
try:
|
||||
for entry in os.scandir(session_path):
|
||||
if entry.is_file():
|
||||
size = entry.stat().st_size
|
||||
total_size_bytes += size
|
||||
|
||||
# Analyze comms.log for messages and keywords
|
||||
if entry.name == "comms.log":
|
||||
try:
|
||||
with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
for line in f:
|
||||
message_count += 1
|
||||
for kw in keywords_to_check:
|
||||
if kw in line and kw not in found_keywords:
|
||||
found_keywords.append(kw)
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
size_kb = total_size_bytes / 1024
|
||||
whitelisted = False
|
||||
reason = ""
|
||||
|
||||
if found_keywords:
|
||||
whitelisted = True
|
||||
reason = f"Found keywords: {', '.join(found_keywords)}"
|
||||
elif message_count > 10:
|
||||
whitelisted = True
|
||||
reason = f"High message count: {message_count}"
|
||||
elif size_kb > 50:
|
||||
whitelisted = True
|
||||
reason = f"Large session size: {size_kb:.1f} KB"
|
||||
|
||||
self.update_session_metadata(
|
||||
session_id,
|
||||
message_count=message_count,
|
||||
errors=len(found_keywords),
|
||||
size_kb=int(size_kb),
|
||||
whitelisted=whitelisted,
|
||||
reason=reason
|
||||
)
|
||||
|
||||
def get_old_non_whitelisted_sessions(self, cutoff_datetime):
|
||||
"""Gets sessions older than cutoff_datetime and not whitelisted."""
|
||||
old_sessions = []
|
||||
|
||||
Reference in New Issue
Block a user