Private
Public Access
0
0

more organization

This commit is contained in:
2026-06-06 10:24:22 -04:00
parent 1c627bcc30
commit 7d555361f9
20 changed files with 630 additions and 725 deletions
+3 -3
View File
@@ -7,10 +7,10 @@ from pathlib import Path
@dataclass
class Bead:
id: str
title: str
id: str
title: str
description: str
status: str = "active"
status: str = "active"
class BeadsClient:
def __init__(self, working_dir: Path):
+3 -1
View File
@@ -1,10 +1,12 @@
# src/bg_shader.py
import time
import math
from typing import Optional
import numpy as np
from typing import Optional
from imgui_bundle import imgui, nanovg as nvg, hello_imgui
class BackgroundShader:
def __init__(self):
"""
+30 -39
View File
@@ -1,24 +1,26 @@
from __future__ import annotations
from imgui_bundle import imgui
from dataclasses import dataclass, field
from typing import Optional, Callable, List, Dict, Any
@dataclass
class Command:
id: str
title: str
category: str
shortcut: Optional[str] = None
description: str = ""
id: str
title: str
category: str
shortcut: Optional[str] = None
description: str = ""
enabled_when: Optional[str] = None
action: Optional[Callable] = None
action: Optional[Callable] = None
@dataclass
class ScoredCommand:
command: Command
score: float
score: float
class CommandRegistry:
@@ -70,13 +72,10 @@ def _is_subsequence(query: str, target: str) -> bool:
def _compute_score(query: str, target: str) -> float:
score = 0.0
if target.startswith(query):
score += 1.0
elif _starts_at_word_boundary(query, target):
score += 0.5
if _is_contiguous(query, target):
score += 0.3
gaps = _count_gaps(query, target)
if target.startswith(query): score += 1.0
elif _starts_at_word_boundary(query, target): score += 0.5
if _is_contiguous(query, target): score += 0.3
gaps = _count_gaps(query, target)
score -= 0.1 * gaps
return score
@@ -92,24 +91,23 @@ def _is_contiguous(query: str, target: str) -> bool:
def _count_gaps(query: str, target: str) -> int:
qi = 0
gaps = 0
qi = 0
gaps = 0
last_match = -1
for ti, ch in enumerate(target):
if qi < len(query) and ch == query[qi]:
if last_match >= 0 and ti - last_match > 1:
gaps += ti - last_match - 1
if last_match >= 0 and ti - last_match > 1: gaps += ti - last_match - 1
last_match = ti
qi += 1
qi += 1
return gaps
def _close_palette(app: Any) -> None:
"""Close the palette and reset all per-open state."""
app.show_command_palette = False
app._command_palette_query = ""
app._command_palette_selected = 0
app._command_palette_focused = False
app.show_command_palette = False
app._command_palette_query = ""
app._command_palette_selected = 0
app._command_palette_focused = False
app._command_palette_input_focused = False
@@ -128,19 +126,14 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None:
if not getattr(app, "show_command_palette", False):
return
from imgui_bundle import imgui
viewport = imgui.get_main_viewport()
center = viewport.get_center()
center = viewport.get_center()
imgui.set_next_window_pos((center.x - 300, center.y - 200), imgui.Cond_.always)
imgui.set_next_window_size((600, 400), imgui.Cond_.always)
if not hasattr(app, "_command_palette_query"):
app._command_palette_query = ""
if not hasattr(app, "_command_palette_selected"):
app._command_palette_selected = 0
if not hasattr(app, "_command_palette_focused"):
app._command_palette_focused = False
if not hasattr(app, "_command_palette_query"): app._command_palette_query = ""
if not hasattr(app, "_command_palette_selected"): app._command_palette_selected = 0
if not hasattr(app, "_command_palette_focused"): app._command_palette_focused = False
# Set focus on the window + input field ONCE per open.
if not app._command_palette_focused:
@@ -154,7 +147,7 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None:
expanded, opened = imgui.begin("Command Palette##manual_slop", True, imgui.WindowFlags_.no_collapse)
if not expanded or not opened:
app.show_command_palette = False
app.show_command_palette = False
app._command_palette_focused = False
imgui.end()
return
@@ -167,10 +160,8 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None:
# Process Up/Down/Enter BEFORE input_text so we see the keys before the
# input field consumes them for cursor movement / text editing.
results = fuzzy_match(app._command_palette_query, commands, top_n=20)
if results:
app._command_palette_selected = max(0, min(app._command_palette_selected, len(results) - 1))
else:
app._command_palette_selected = 0
if results: app._command_palette_selected = max(0, min(app._command_palette_selected, len(results) - 1))
else: app._command_palette_selected = 0
if imgui.is_key_pressed(imgui.Key.down_arrow):
if results:
@@ -188,7 +179,7 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None:
if imgui.begin_child("##results", (0, -1)):
for i, scored in enumerate(results):
is_selected = (i == app._command_palette_selected)
label = f"[{scored.command.category}] {scored.command.title}"
label = f"[{scored.command.category}] {scored.command.title}"
clicked, _ = imgui.selectable(label, is_selected)
if clicked:
app._command_palette_selected = i
+16 -24
View File
@@ -1,13 +1,19 @@
from __future__ import annotations
from typing import TYPE_CHECKING, Callable
import webbrowser
from pathlib import Path
from typing import TYPE_CHECKING, Callable
from src import models
from src import theme_2
from src.command_palette import CommandRegistry
from src.hot_reloader import HotReloader
if TYPE_CHECKING:
from src.gui_2 import App
registry = CommandRegistry()
@@ -38,14 +44,10 @@ def reset_session(app: "App") -> None:
"""Reset Session — Reset the AI session, clear comms and tool logs."""
from src import ai_client
ai_client.reset_session()
if hasattr(app, "_handle_reset_session"):
app._handle_reset_session()
if hasattr(app, "_comms_log"):
app._comms_log.clear()
if hasattr(app, "_tool_log"):
app._tool_log.clear()
if hasattr(app, "ai_response"):
app.ai_response = ""
if hasattr(app, "_handle_reset_session"): app._handle_reset_session()
if hasattr(app, "_comms_log"): app._comms_log.clear()
if hasattr(app, "_tool_log"): app._tool_log.clear()
if hasattr(app, "ai_response"): app.ai_response = ""
@registry.register
@@ -67,8 +69,8 @@ def generate_md_only(app: "App") -> None:
"""Generate MD Only — Run the AI to produce a markdown file without sending to the chat."""
if hasattr(app, "_do_generate"):
try:
md, path, *_ = app._do_generate()
app.last_md = md
md, path, *_ = app._do_generate()
app.last_md = md
app.last_md_path = path
if hasattr(app, "ai_status"):
app.ai_status = f"md written: {path.name}"
@@ -98,11 +100,8 @@ def save_project(app: "App") -> None:
@registry.register
def save_all(app: "App") -> None:
"""Save All — Flush to project, flush to config, save global config."""
from src import models
if hasattr(app, "_flush_to_project"):
app._flush_to_project()
if hasattr(app, "_flush_to_config"):
app._flush_to_config()
if hasattr(app, "_flush_to_project"): app._flush_to_project()
if hasattr(app, "_flush_to_config"): app._flush_to_config()
if hasattr(app, "config"):
try:
models.save_config(app.config)
@@ -229,7 +228,6 @@ def show_workspace_manager(app: "App") -> None:
@registry.register
def trigger_hot_reload(app: "App") -> None:
"""Hot Reload — Reload the GUI module to pick up code changes."""
from src.hot_reloader import HotReloader
HotReloader.reload("src.gui_2", app)
@@ -254,28 +252,24 @@ def redo(app: "App") -> None:
@registry.register
def switch_to_dark_theme(app: "App") -> None:
"""Switch to Dark Theme (10x Dark palette)."""
from src import theme_2
theme_2.apply("10x Dark")
@registry.register
def switch_to_light_theme(app: "App") -> None:
"""Switch to Light Theme (ImGui Light palette)."""
from src import theme_2
theme_2.apply("ImGui Light")
@registry.register
def switch_to_nerv_theme(app: "App") -> None:
"""Switch to NERV Theme (Tactical Console aesthetic)."""
from src import theme_2
theme_2.apply("NERV")
@registry.register
def cycle_theme(app: "App") -> None:
"""Cycle Theme — Switch to the next theme in the cycle (Dark → Light → NERV → Dark)."""
from src import theme_2
order = ["10x Dark", "ImGui Light", "NERV"]
current = theme_2.get_current_palette()
if current in order:
@@ -292,14 +286,12 @@ def cycle_theme(app: "App") -> None:
@registry.register
def show_documentation(app: "App") -> None:
"""Show Documentation — Open the project URL in the browser."""
import webbrowser
webbrowser.open("https://git.cozyair.dev/ed/manual_slop/")
@registry.register
def show_command_palette_help(app: "App") -> None:
"""Show Command Palette Help — Open the docs/Readme.md in the Text Viewer."""
from pathlib import Path
if hasattr(app, "readme_text"):
docs_readme = Path("docs/Readme.md")
if docs_readme.exists():
+12 -16
View File
@@ -44,16 +44,14 @@ from src import mma_prompts
def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, Any]]:
"""
Tier 2 (Tech Lead) call.
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
[C: tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_failure, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_success, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_success, tests/test_orchestration_logic.py:test_generate_tickets]
Tier 2 (Tech Lead) call.
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
[C: tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_failure, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_success, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_success, tests/test_orchestration_logic.py:test_generate_tickets]
"""
# 1. Set Tier 2 Model (Tech Lead - Flash)
# 2. Construct Prompt
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"
f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
"Please generate the implementation tickets for this track."
@@ -68,8 +66,8 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str,
try:
# 3. Call Tier 2 Model
response = ai_client.send(
md_content="",
user_message=user_message
md_content = "",
user_message = user_message
)
# 4. Parse JSON Output
# Extract JSON array from markdown code blocks if present
@@ -97,15 +95,13 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str,
ai_client.set_current_tier(None)
from src.dag_engine import TrackDAG
from src.models import Ticket
from src.models import Ticket
def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Sorts a list of tickets based on their 'depends_on' field.
Raises ValueError if a circular dependency or missing internal dependency is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
Sorts a list of tickets based on their 'depends_on' field.
Raises ValueError if a circular dependency or missing internal dependency is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
# 1. Convert to Ticket objects for TrackDAG
ticket_objs = []
@@ -123,7 +119,7 @@ def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]:
if __name__ == "__main__":
# Quick test if run directly
test_brief = "Implement a new feature."
test_brief = "Implement a new feature."
test_skeletons = "class NewFeature: pass"
tickets = generate_tickets(test_brief, test_skeletons)
print(json.dumps(tickets, indent=2))
print(json.dumps(tickets, indent=2))
+1 -1
View File
@@ -57,7 +57,7 @@ def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
for pattern, rates in MODEL_PRICING:
if re.search(pattern, model, re.IGNORECASE):
input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"]
input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"]
output_cost = (output_tokens / 1_000_000) * rates["output_per_mtok"]
return input_cost + output_cost
return 0.0
+12 -13
View File
@@ -45,7 +45,7 @@ class TrackDAG:
tickets: A list of Ticket instances defining the graph nodes and edges.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.tickets = tickets
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets}
def cascade_blocks(self) -> None:
@@ -64,7 +64,7 @@ class TrackDAG:
# Use a queue-based propagation (BFS) from all currently blocked tickets
queue = [t for t in self.tickets if t.status == 'blocked']
idx = 0
idx = 0
while idx < len(queue):
curr = queue[idx]
idx += 1
@@ -110,16 +110,14 @@ class TrackDAG:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path:
return True
if node_id in visited:
continue
if node_id in path: return True
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True))
@@ -140,7 +138,7 @@ class TrackDAG:
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
dependents = {t.id: [] for t in self.tickets}
for t in self.tickets:
for dep_id in t.depends_on:
@@ -148,11 +146,11 @@ class TrackDAG:
dependents[dep_id].append(t.id)
# Queue starts with nodes having no dependencies
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
queue = [t.id for t in self.tickets if in_degree[t.id] == 0]
result = []
idx = 0
idx = 0
while idx < len(queue):
u = queue[idx]
u = queue[idx]
idx += 1
result.append(u)
for v_id in dependents.get(u, []):
@@ -178,7 +176,7 @@ class ExecutionEngine:
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.dag = dag
self.dag = dag
self.auto_queue = auto_queue
def tick(self) -> List[Ticket]:
@@ -215,4 +213,5 @@ class ExecutionEngine:
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket:
ticket.status = status
ticket.status = status
+110 -114
View File
@@ -13,139 +13,135 @@ from src.models import ExternalEditorConfig, TextEditorConfig
class ExternalEditorLauncher:
def __init__(self, config: ExternalEditorConfig):
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.config = config
def __init__(self, config: ExternalEditorConfig):
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.config = config
def get_editor(self, editor_name: Optional[str] = None) -> Optional[TextEditorConfig]:
"""
[C: tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_by_name, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_returns_default, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_unknown_name]
"""
if editor_name:
return self.config.editors.get(editor_name)
return self.config.get_default()
def get_editor(self, editor_name: Optional[str] = None) -> Optional[TextEditorConfig]:
"""
[C: tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_by_name, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_returns_default, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_unknown_name]
"""
if editor_name:
return self.config.editors.get(editor_name)
return self.config.get_default()
def build_diff_command(
self, editor: TextEditorConfig, original_path: str, modified_path: str
) -> List[str]:
"""
[C: tests/test_external_editor.py:TestExternalEditorLauncher.test_build_diff_command, tests/test_external_editor_gui.py:test_verify_command_format, tests/test_external_editor_gui.py:test_verify_vscode_command_format]
"""
cmd = [editor.path] + editor.diff_args + [original_path, modified_path]
return cmd
def build_diff_command(self, editor: TextEditorConfig, original_path: str, modified_path: str) -> List[str]:
"""
[C: tests/test_external_editor.py:TestExternalEditorLauncher.test_build_diff_command, tests/test_external_editor_gui.py:test_verify_command_format, tests/test_external_editor_gui.py:test_verify_vscode_command_format]
"""
cmd = [editor.path] + editor.diff_args + [original_path, modified_path]
return cmd
def launch_diff(
self, editor_name: Optional[str], original_path: str, modified_path: str
) -> Optional[subprocess.Popen]:
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_file_not_found, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_missing_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_success]
"""
editor = self.get_editor(editor_name)
if not editor:
return None
cmd = self.build_diff_command(editor, original_path, modified_path)
try:
return subprocess.Popen(cmd)
except FileNotFoundError:
return None
def launch_diff(self, editor_name: Optional[str], original_path: str, modified_path: str) -> Optional[subprocess.Popen]:
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_file_not_found, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_missing_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_success]
"""
editor = self.get_editor(editor_name)
if not editor:
return None
cmd = self.build_diff_command(editor, original_path, modified_path)
try:
return subprocess.Popen(cmd)
except FileNotFoundError:
return None
def launch_editor(self, editor_name: Optional[str], file_path: str) -> Optional[subprocess.Popen]:
editor = self.get_editor(editor_name)
if not editor:
return None
try:
return subprocess.Popen([editor.path, file_path])
except FileNotFoundError:
return None
def launch_editor(self, editor_name: Optional[str], file_path: str) -> Optional[subprocess.Popen]:
editor = self.get_editor(editor_name)
if not editor:
return None
try:
return subprocess.Popen([editor.path, file_path])
except FileNotFoundError:
return None
_cached_vscode_config: Optional[TextEditorConfig] = None
def _find_vscode_in_registry() -> Optional[str]:
paths = []
reg_keys = [
r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*",
r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*",
r"HKLM\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\*",
]
for key in reg_keys:
try:
result = subprocess.run(
["powershell", "-Command", f"Get-ItemProperty -Path '{key}' -ErrorAction SilentlyContinue | Where-Object {{ $_.DisplayName -like '*Visual Studio Code*' }} | Select-Object -ExpandProperty InstallLocation"],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
line = line.strip()
if line and line != "":
exe_path = line.strip() + "\\Code.exe"
if os.path.exists(exe_path):
paths.append(exe_path)
except Exception:
pass
if paths:
return paths[0]
return None
paths = []
reg_keys = [
r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*",
r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*",
r"HKLM\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\*",
]
for key in reg_keys:
try:
result = subprocess.run(
["powershell", "-Command", f"Get-ItemProperty -Path '{key}' -ErrorAction SilentlyContinue | Where-Object {{ $_.DisplayName -like '*Visual Studio Code*' }} | Select-Object -ExpandProperty InstallLocation"],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
line = line.strip()
if line and line != "":
exe_path = line.strip() + "\\Code.exe"
if os.path.exists(exe_path):
paths.append(exe_path)
except Exception:
pass
if paths:
return paths[0]
return None
def _find_vscode_common_paths() -> Optional[str]:
candidates = [
r"C:\apps\Microsoft VS Code\Code.exe",
r"C:\Program Files\Microsoft VS Code\Code.exe",
r"C:\Program Files (x86)\Microsoft VS Code\Code.exe",
os.path.expanduser(r"~\AppData\Local\Programs\Microsoft VS Code\Code.exe"),
]
for path in candidates:
if os.path.exists(path):
return path
return None
candidates = [
r"C:\apps\Microsoft VS Code\Code.exe",
r"C:\Program Files\Microsoft VS Code\Code.exe",
r"C:\Program Files (x86)\Microsoft VS Code\Code.exe",
os.path.expanduser(r"~\AppData\Local\Programs\Microsoft VS Code\Code.exe"),
]
for path in candidates:
if os.path.exists(path):
return path
return None
def auto_detect_vscode() -> Optional[TextEditorConfig]:
global _cached_vscode_config
if _cached_vscode_config is not None:
return _cached_vscode_config
vscode_path = _find_vscode_in_registry() or _find_vscode_common_paths()
if vscode_path:
_cached_vscode_config = TextEditorConfig(
name="vscode",
path=vscode_path,
diff_args=["--new-window", "--diff"]
)
return _cached_vscode_config
global _cached_vscode_config
if _cached_vscode_config is not None:
return _cached_vscode_config
vscode_path = _find_vscode_in_registry() or _find_vscode_common_paths()
if vscode_path:
_cached_vscode_config = TextEditorConfig(
name="vscode",
path=vscode_path,
diff_args=["--new-window", "--diff"]
)
return _cached_vscode_config
def get_default_launcher() -> ExternalEditorLauncher:
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, src/gui_2.py:App._render_external_editor_panel]
"""
from src import models
config = models.load_config()
editors_config = config.get("tools", {}).get("text_editors", {})
default_editor = config.get("tools", {}).get("default_editor", {}).get("default_editor")
ext_config = ExternalEditorConfig.from_dict({
"editors": editors_config,
"default_editor": default_editor,
})
launcher = ExternalEditorLauncher(ext_config)
if not launcher.config.editors:
detected = auto_detect_vscode()
if detected:
launcher.config.editors["vscode"] = detected
launcher.config.default_editor = "vscode"
else:
vscode = launcher.config.editors.get("vscode")
if vscode and "--new-window" not in vscode.diff_args:
vscode.diff_args = ["--new-window", "--diff"]
return launcher
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, src/gui_2.py:App._render_external_editor_panel]
"""
from src import models
config = models.load_config()
editors_config = config.get("tools", {}).get("text_editors", {})
default_editor = config.get("tools", {}).get("default_editor", {}).get("default_editor")
ext_config = ExternalEditorConfig.from_dict({
"editors": editors_config,
"default_editor": default_editor,
})
launcher = ExternalEditorLauncher(ext_config)
if not launcher.config.editors:
detected = auto_detect_vscode()
if detected:
launcher.config.editors["vscode"] = detected
launcher.config.default_editor = "vscode"
else:
vscode = launcher.config.editors.get("vscode")
if vscode and "--new-window" not in vscode.diff_args:
vscode.diff_args = ["--new-window", "--diff"]
return launcher
def create_temp_modified_file(content: str) -> str:
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestHelperFunctions.test_create_temp_modified_file]
"""
with tempfile.NamedTemporaryFile(mode="w", suffix="_modified", delete=False, encoding="utf-8") as f:
f.write(content)
return f.name
"""
[C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestHelperFunctions.test_create_temp_modified_file]
"""
with tempfile.NamedTemporaryFile(mode="w", suffix="_modified", delete=False, encoding="utf-8") as f:
f.write(content)
return f.name
+78 -85
View File
@@ -49,33 +49,29 @@ _ast_cache: Dict[str, Tuple[float, tree_sitter.Tree]] = {}
class ASTParser:
"""
Parser for extracting AST-based views of source code.
Currently supports Python.
Parser for extracting AST-based views of source code.
Currently supports Python.
"""
#region: Core Operations
def __init__(self, language: str) -> None:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
if language not in ("python", "cpp", "c"):
raise ValueError(f"Language '{language}' not supported yet.")
self.language_name = language
# Load the tree-sitter language grammar
if language == "python":
self.language = tree_sitter.Language(tree_sitter_python.language())
elif language == "cpp":
self.language = tree_sitter.Language(tree_sitter_cpp.language())
elif language == "c":
self.language = tree_sitter.Language(tree_sitter_c.language())
if language == "python": self.language = tree_sitter.Language(tree_sitter_python.language())
elif language == "cpp": self.language = tree_sitter.Language(tree_sitter_cpp.language())
elif language == "c": self.language = tree_sitter.Language(tree_sitter_c.language())
self.parser = tree_sitter.Parser(self.language)
def parse(self, code: str) -> tree_sitter.Tree:
"""
Parse the given code and return the tree-sitter Tree.
[C: src/mcp_client.py:_search_file, src/mcp_client.py:derive_code_path, src/mcp_client.py:py_check_syntax, src/mcp_client.py:py_get_class_summary, src/mcp_client.py:py_get_definition, src/mcp_client.py:py_get_docstring, src/mcp_client.py:py_get_imports, src/mcp_client.py:py_get_signature, src/mcp_client.py:py_get_symbol_info, src/mcp_client.py:py_get_var_declaration, src/mcp_client.py:py_set_signature, src/mcp_client.py:py_set_var_declaration, src/mcp_client.py:py_update_definition, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/rag_engine.py:RAGEngine._chunk_code, src/summarize.py:_summarise_python, tests/test_ast_parser.py:test_ast_parser_parse, tests/test_tree_sitter_setup.py:test_tree_sitter_python_setup]
Parse the given code and return the tree-sitter Tree.
[C: src/mcp_client.py:_search_file, src/mcp_client.py:derive_code_path, src/mcp_client.py:py_check_syntax, src/mcp_client.py:py_get_class_summary, src/mcp_client.py:py_get_definition, src/mcp_client.py:py_get_docstring, src/mcp_client.py:py_get_imports, src/mcp_client.py:py_get_signature, src/mcp_client.py:py_get_symbol_info, src/mcp_client.py:py_get_var_declaration, src/mcp_client.py:py_set_signature, src/mcp_client.py:py_set_var_declaration, src/mcp_client.py:py_update_definition, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/rag_engine.py:RAGEngine._chunk_code, src/summarize.py:_summarise_python, tests/test_ast_parser.py:test_ast_parser_parse, tests/test_tree_sitter_setup.py:test_tree_sitter_python_setup]
"""
return self.parser.parse(bytes(code, "utf8"))
@@ -85,7 +81,7 @@ class ASTParser:
return self.parse(code)
try:
p = Path(path)
p = Path(path)
mtime = p.stat().st_mtime if p.exists() else 0.0
except Exception:
mtime = 0.0
@@ -185,17 +181,18 @@ class ASTParser:
if child.type in ("type_identifier", "identifier", "namespace_identifier", "qualified_identifier"):
return code_bytes[child.start_byte:child.end_byte].decode("utf8", errors="replace")
return ""
#endregion: Core Operations
#region: Skeleton & Curated Views
def get_skeleton(self, code: str, path: Optional[str] = None) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
[C: src/mcp_client.py:py_get_skeleton, src/mcp_client.py:ts_c_get_skeleton, src/mcp_client.py:ts_cpp_get_skeleton, src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_skeleton_c, tests/test_ast_parser.py:test_ast_parser_get_skeleton_cpp, tests/test_ast_parser.py:test_ast_parser_get_skeleton_python, tests/test_context_pruner.py:test_ast_caching, tests/test_context_pruner.py:test_performance_large_file]
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
[C: src/mcp_client.py:py_get_skeleton, src/mcp_client.py:ts_c_get_skeleton, src/mcp_client.py:ts_cpp_get_skeleton, src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_skeleton_c, tests/test_ast_parser.py:test_ast_parser_get_skeleton_cpp, tests/test_ast_parser.py:test_ast_parser_get_skeleton_python, tests/test_context_pruner.py:test_ast_caching, tests/test_context_pruner.py:test_performance_large_file]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
tree = self.get_cached_tree(path, code)
edits: List[Tuple[int, int, str]] = []
def is_docstring(node: tree_sitter.Node) -> bool:
@@ -206,7 +203,7 @@ class ASTParser:
def walk(node: tree_sitter.Node) -> None:
"""
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
"""
if node.type in ("function_definition", "method_definition"):
body = node.child_by_field_name("body")
@@ -218,7 +215,7 @@ class ASTParser:
break
if body and body.type in ("block", "compound_statement"):
indent = " " * body.start_point.column
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type not in ("comment", "{", "}"):
@@ -244,17 +241,17 @@ class ASTParser:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = initializer.start_byte if initializer else body.start_byte
end_byte = body.end_byte
end_byte = body.end_byte
# Try to preserve braces for C-style languages
if body.type == "compound_statement" and len(body.children) >= 2 and body.children[0].type == "{" and body.children[-1].type == "}":
if initializer:
start_byte = initializer.start_byte
end_byte = body.children[-1].start_byte
end_byte = body.children[-1].start_byte
edits.append((start_byte, end_byte, "{ ... "))
else:
start_byte = body.children[0].end_byte
end_byte = body.children[-1].start_byte
end_byte = body.children[-1].start_byte
edits.append((start_byte, end_byte, " ... "))
else:
edits.append((start_byte, end_byte, "..."))
@@ -275,15 +272,13 @@ class ASTParser:
return code_bytearray.decode("utf8")
def get_curated_view(self, code: str, path: Optional[str] = None) -> str:
"""
Returns a curated skeleton of a Python file.
Preserves function bodies if they have @core_logic decorator or # [HOT] comment.
Otherwise strips bodies but preserves docstrings.
[C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_curated_view]
Returns a curated skeleton of a Python file.
Preserves function bodies if they have @core_logic decorator or # [HOT] comment.
Otherwise strips bodies but preserves docstrings.
[C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_curated_view]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
tree = self.get_cached_tree(path, code)
edits: List[Tuple[int, int, str]] = []
def is_docstring(node: tree_sitter.Node) -> bool:
@@ -318,7 +313,7 @@ class ASTParser:
def walk(node: tree_sitter.Node) -> None:
"""
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
"""
if node.type == "function_definition":
body = node.child_by_field_name("body")
@@ -326,7 +321,7 @@ class ASTParser:
# Check if we should preserve it
preserve = has_core_logic_decorator(node) or has_hot_comment(node)
if not preserve:
indent = " " * body.start_point.column
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
@@ -334,12 +329,12 @@ class ASTParser:
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
@@ -350,16 +345,16 @@ class ASTParser:
for start, end, replacement in edits:
code_bytearray[start:end] = bytes(replacement, "utf8")
return code_bytearray.decode("utf8")
#endregion: Skeleton & Curated Views
#region: Targeted Views
def get_targeted_view(self, code: str, function_names: List[str], path: Optional[str] = None) -> str:
"""
Returns a targeted view of the code including only the specified functions
and their dependencies up to depth 2.
[C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_targeted_view, tests/test_context_pruner.py:test_class_targeted_extraction, tests/test_context_pruner.py:test_targeted_extraction]
Returns a targeted view of the code including only the specified functions
and their dependencies up to depth 2.
[C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_targeted_view, tests/test_context_pruner.py:test_class_targeted_extraction, tests/test_context_pruner.py:test_targeted_extraction]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
@@ -375,9 +370,9 @@ class ASTParser:
elif node.type == "class_definition":
name_node = node.child_by_field_name("name")
if name_node:
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace")
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace")
full_cname = f"{class_name}.{cname}" if class_name else cname
body = node.child_by_field_name("body")
body = node.child_by_field_name("body")
if body:
collect_functions(body, full_cname)
return
@@ -413,12 +408,12 @@ class ASTParser:
to_include.add(full_name)
current_layer = set(to_include)
all_found = set(to_include)
all_found = set(to_include)
for _ in range(2):
next_layer = set()
for name in current_layer:
if name in all_functions:
node = all_functions[name]
node = all_functions[name]
calls = get_calls(node)
for call in calls:
for func_name in all_functions:
@@ -440,14 +435,14 @@ class ASTParser:
def check_for_targeted(node, parent_class=None):
if node.type == "function_definition":
name_node = node.child_by_field_name("name")
fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
fullname = f"{parent_class}.{fname}" if parent_class else fname
fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
fullname = f"{parent_class}.{fname}" if parent_class else fname
return fullname in all_found
if node.type == "class_definition":
name_node = node.child_by_field_name("name")
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
name_node = node.child_by_field_name("name")
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
full_cname = f"{parent_class}.{cname}" if parent_class else cname
body = node.child_by_field_name("body")
body = node.child_by_field_name("body")
if body:
for child in body.children:
if check_for_targeted(child, full_cname):
@@ -461,12 +456,12 @@ class ASTParser:
def walk_edits(node, parent_class=None):
if node.type == "function_definition":
name_node = node.child_by_field_name("name")
fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
fullname = f"{parent_class}.{fname}" if parent_class else fname
fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
fullname = f"{parent_class}.{fname}" if parent_class else fname
if fullname in all_found:
body = node.child_by_field_name("body")
if body and body.type in ("block", "compound_statement"):
indent = " " * body.start_point.column
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
@@ -474,22 +469,22 @@ class ASTParser:
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
else:
edits.append((node.start_byte, node.end_byte, ""))
return
if node.type == "class_definition":
if check_for_targeted(node, parent_class):
name_node = node.child_by_field_name("name")
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
name_node = node.child_by_field_name("name")
cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else ""
full_cname = f"{parent_class}.{cname}" if parent_class else cname
body = node.child_by_field_name("body")
body = node.child_by_field_name("body")
if body:
for child in body.children:
walk_edits(child, full_cname)
@@ -517,15 +512,16 @@ class ASTParser:
result = code_bytearray.decode("utf8")
result = re.sub(r'\n\s*\n\s*\n+', '\n\n', result)
return result.strip() + "\n"
#endregion: Targeted Views
#region: Symbol Extraction
def get_definition(self, code: str, name: str, path: Optional[str] = None) -> str:
"""
Returns the full source code for a specific definition by name.
Supports 'ClassName::method' or 'method' for C++.
[C: src/mcp_client.py:trace, src/mcp_client.py:ts_c_get_definition, src/mcp_client.py:ts_cpp_get_definition, tests/test_ast_parser.py:test_ast_parser_get_definition_c, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp_template]
Returns the full source code for a specific definition by name.
Supports 'ClassName::method' or 'method' for C++.
[C: src/mcp_client.py:trace, src/mcp_client.py:ts_c_get_definition, src/mcp_client.py:ts_cpp_get_definition, tests/test_ast_parser.py:test_ast_parser_get_definition_c, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp_template]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
@@ -621,16 +617,13 @@ class ASTParser:
def get_signature(self, code: str, name: str, path: Optional[str] = None) -> str:
"""
Returns only the signature part of a function or method.
For C/C++, this is the code from the start of the definition until the block start '{'.
[C: src/mcp_client.py:ts_c_get_signature, src/mcp_client.py:ts_cpp_get_signature, tests/test_ast_parser.py:test_ast_parser_get_signature_c, tests/test_ast_parser.py:test_ast_parser_get_signature_cpp]
Returns only the signature part of a function or method.
For C/C++, this is the code from the start of the definition until the block start '{'.
[C: src/mcp_client.py:ts_c_get_signature, src/mcp_client.py:ts_cpp_get_signature, tests/test_ast_parser.py:test_ast_parser_get_signature_c, tests/test_ast_parser.py:test_ast_parser_get_signature_cpp]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
parts = re.split(r'::|\.', name)
tree = self.get_cached_tree(path, code)
parts = re.split(r'::|\.', name)
def walk(node: tree_sitter.Node, target_parts: List[str]) -> Optional[tree_sitter.Node]:
"""
@@ -638,7 +631,7 @@ class ASTParser:
"""
if not target_parts:
return None
target = target_parts[0]
target = target_parts[0]
best_match = None
for child in node.children:
@@ -649,7 +642,7 @@ class ASTParser:
if sub.type in ("class_specifier", "struct_specifier", "enum_specifier"):
check_node = sub
break
is_interesting = check_node.type in ("function_definition", "class_definition", "class_specifier", "struct_specifier", "enum_specifier", "enum_definition", "namespace_definition", "template_declaration", "field_declaration", "declaration")
if is_interesting:
node_name = self._get_name(check_node, code_bytes)
@@ -729,15 +722,15 @@ class ASTParser:
return code_bytes[found_node.start_byte:found_node.end_byte].decode("utf8", errors="replace").strip()
return f"ERROR: signature for '{name}' not found"
#endregion: Symbol Extraction
#region: Analysis & Updates
def get_code_outline(self, code: str, path: Optional[str] = None) -> str:
"""
Returns a hierarchical outline of the code (classes, structs, functions, methods).
[C: src/mcp_client.py:ts_c_get_code_outline, src/mcp_client.py:ts_cpp_get_code_outline, tests/test_ast_parser.py:test_ast_parser_get_code_outline_c, tests/test_ast_parser.py:test_ast_parser_get_code_outline_cpp]
Returns a hierarchical outline of the code (classes, structs, functions, methods).
[C: src/mcp_client.py:ts_c_get_code_outline, src/mcp_client.py:ts_cpp_get_code_outline, tests/test_ast_parser.py:test_ast_parser_get_code_outline_c, tests/test_ast_parser.py:test_ast_parser_get_code_outline_cpp]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
@@ -745,7 +738,7 @@ class ASTParser:
def walk(node: tree_sitter.Node, indent: int = 0) -> None:
"""
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
[C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python]
"""
ntype = node.type
label = ""
@@ -778,15 +771,12 @@ class ASTParser:
def update_definition(self, code: str, name: str, new_content: str, path: Optional[str] = None) -> str:
"""
Surgically replace the definition of a class or function by name.
[C: src/mcp_client.py:ts_c_update_definition, src/mcp_client.py:ts_cpp_update_definition, tests/test_ast_parser.py:test_ast_parser_update_definition_cpp]
Surgically replace the definition of a class or function by name.
[C: src/mcp_client.py:ts_c_update_definition, src/mcp_client.py:ts_cpp_update_definition, tests/test_ast_parser.py:test_ast_parser_update_definition_cpp]
"""
code_bytes = code.encode("utf8")
tree = self.get_cached_tree(path, code)
parts = re.split(r'::|\.', name)
tree = self.get_cached_tree(path, code)
parts = re.split(r'::|\.', name)
def walk(node: tree_sitter.Node, target_parts: List[str]) -> Optional[tree_sitter.Node]:
"""
@@ -794,7 +784,7 @@ class ASTParser:
"""
if not target_parts:
return None
target = target_parts[0]
target = target_parts[0]
best_match = None
for child in node.children:
@@ -876,12 +866,15 @@ class ASTParser:
code_bytearray[found_node.start_byte:found_node.end_byte] = bytes(new_content, "utf8")
return code_bytearray.decode("utf8")
return f"ERROR: definition '{name}' not found"
#endregion: Analysis & Updates
#region: Module Level Utilities
def reset_client() -> None:
pass
def get_file_id(path: Path) -> Optional[str]:
return None
#endregion: Module Level Utilities
+14 -14
View File
@@ -22,18 +22,18 @@ class FuzzyAnchor:
start_line and end_line are 1-based.
[C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations]
"""
lines = text.splitlines()
s_idx = max(0, start_line - 1)
e_idx = min(len(lines), end_line)
lines = text.splitlines()
s_idx = max(0, start_line - 1)
e_idx = min(len(lines), end_line)
slice_lines = lines[s_idx:e_idx]
slice_text = "\n".join(slice_lines)
slice_text = "\n".join(slice_lines)
return {
"start_line": start_line,
"end_line": end_line,
"start_line": start_line,
"end_line": end_line,
"start_context": cls.get_context(lines, s_idx, 3, 1),
"end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order
"content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest()
"end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order
"content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest()
}
@classmethod
@@ -47,13 +47,13 @@ class FuzzyAnchor:
e_idx = slice_data["end_line"]
if 0 <= s_idx < len(lines) and e_idx <= len(lines):
current_text = "\n".join(lines[s_idx:e_idx])
curr_hash = hashlib.md5(current_text.encode()).hexdigest()
curr_hash = hashlib.md5(current_text.encode()).hexdigest()
if curr_hash == slice_data["content_hash"]:
return (slice_data["start_line"], slice_data["end_line"])
# 2. Fuzzy match
start_ctx = slice_data["start_context"]
end_ctx = slice_data["end_context"]
end_ctx = slice_data["end_context"]
if not start_ctx or not end_ctx: return None
# Search for start_ctx
@@ -67,7 +67,7 @@ class FuzzyAnchor:
if match:
best_s = i
break
if best_s == -1: return None
# Search for end_ctx after start_ctx
@@ -83,8 +83,8 @@ class FuzzyAnchor:
if match:
best_e = i + 1
break
if best_e != -1:
return (best_s + 1, best_e)
return None
return None
+27 -33
View File
@@ -46,31 +46,25 @@ from src import session_logger
class GeminiCliAdapter:
"""
Adapter for the Gemini CLI that parses streaming JSON output.
Adapter for the Gemini CLI that parses streaming JSON output.
"""
def __init__(self, binary_path: str = "gemini"):
"""
Initializes the adapter with the path to the gemini CLI executable.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
Initializes the adapter with the path to the gemini CLI executable.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.binary_path = binary_path
self.session_id: Optional[str] = None
self.last_usage: Optional[dict[str, Any]] = None
self.session_id: Optional[str] = None
self.last_usage: Optional[dict[str, Any]] = None
self.last_latency: float = 0.0
def send(self, message: str, safety_settings: list[Any] | None = None, system_instruction: str | None = None,
model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]:
def send(self, message: str, safety_settings: list[Any] | None = None, system_instruction: str | None = None, model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]:
"""
Sends a message to the Gemini CLI and processes the streaming JSON output.
Uses non-blocking line-by-line reading to allow stream_callback.
[C: simulation/user_agent.py:UserSimAgent.generate_response, src/multi_agent_conductor.py:run_worker_lifecycle, src/orchestrator_pm.py:generate_tracks, tests/test_ai_cache_tracking.py:test_gemini_cache_tracking, tests/test_ai_client_cli.py:test_ai_client_send_gemini_cli, tests/test_api_events.py:test_send_emits_events_proper, tests/test_api_events.py:test_send_emits_tool_events, tests/test_deepseek_provider.py:test_deepseek_completion_logic, tests/test_deepseek_provider.py:test_deepseek_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoner_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoning_logic, tests/test_deepseek_provider.py:test_deepseek_streaming, tests/test_deepseek_provider.py:test_deepseek_tool_calling, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_full_flow_integration, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_captures_usage_metadata, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_handles_tool_use_events, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_parses_jsonl_output, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_starts_subprocess_with_correct_args, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_parses_tool_calls_from_streaming_json, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_starts_subprocess_with_model, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_context_bleed_prevention, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_loop_termination, tests/test_gemini_cli_integration.py:test_gemini_cli_full_integration, tests/test_gemini_cli_integration.py:test_gemini_cli_rejection_and_history, tests/test_gemini_cli_parity_regression.py:test_send_invokes_adapter_send, tests/test_gui2_mcp.py:test_mcp_tool_call_is_dispatched, tests/test_tier4_interceptor.py:test_ai_client_passes_qa_callback, tests/test_token_usage.py:test_token_usage_tracking, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
Sends a message to the Gemini CLI and processes the streaming JSON output.
Uses non-blocking line-by-line reading to allow stream_callback.
[C: simulation/user_agent.py:UserSimAgent.generate_response, src/multi_agent_conductor.py:run_worker_lifecycle, src/orchestrator_pm.py:generate_tracks, tests/test_ai_cache_tracking.py:test_gemini_cache_tracking, tests/test_ai_client_cli.py:test_ai_client_send_gemini_cli, tests/test_api_events.py:test_send_emits_events_proper, tests/test_api_events.py:test_send_emits_tool_events, tests/test_deepseek_provider.py:test_deepseek_completion_logic, tests/test_deepseek_provider.py:test_deepseek_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoner_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoning_logic, tests/test_deepseek_provider.py:test_deepseek_streaming, tests/test_deepseek_provider.py:test_deepseek_tool_calling, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_full_flow_integration, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_captures_usage_metadata, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_handles_tool_use_events, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_parses_jsonl_output, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_starts_subprocess_with_correct_args, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_parses_tool_calls_from_streaming_json, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_starts_subprocess_with_model, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_context_bleed_prevention, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_loop_termination, tests/test_gemini_cli_integration.py:test_gemini_cli_full_integration, tests/test_gemini_cli_integration.py:test_gemini_cli_rejection_and_history, tests/test_gemini_cli_parity_regression.py:test_send_invokes_adapter_send, tests/test_gui2_mcp.py:test_mcp_tool_call_is_dispatched, tests/test_tier4_interceptor.py:test_ai_client_passes_qa_callback, tests/test_token_usage.py:test_token_usage_tracking, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast]
"""
start_time = time.time()
start_time = time.time()
command_parts = [self.binary_path]
if model:
command_parts.extend(['-m', f'"{model}"'])
@@ -85,8 +79,8 @@ class GeminiCliAdapter:
prompt_text = f"{system_instruction}\n\n{message}"
accumulated_text = ""
tool_calls = []
stdout_content = []
tool_calls = []
stdout_content = []
env = os.environ.copy()
env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop"
@@ -115,13 +109,13 @@ class GeminiCliAdapter:
process = subprocess.Popen(
cmd_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
shell=False,
env=env
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
text = True,
encoding = "utf-8",
shell = False,
env = env
)
# Use communicate to avoid pipe deadlocks with large input/output.
@@ -142,7 +136,7 @@ class GeminiCliAdapter:
if not line: continue
stdout_content.append(line)
try:
data = json.loads(line)
data = json.loads(line)
msg_type = data.get("type")
if msg_type == "init":
if "session_id" in data:
@@ -163,9 +157,9 @@ class GeminiCliAdapter:
self.session_id = data.get("session_id")
elif msg_type == "tool_use":
tc = {
"name": data.get("tool_name", data.get("name")),
"name": data.get("tool_name", data.get("name")),
"args": data.get("parameters", data.get("args", {})),
"id": data.get("tool_id", data.get("id"))
"id": data.get("tool_id", data.get("id"))
}
if tc["name"]:
tool_calls.append(tc)
@@ -180,11 +174,11 @@ class GeminiCliAdapter:
raise Exception(f"Gemini CLI failed with exit {process.returncode}\nStderr: {stderr_final}")
session_logger.open_session()
session_logger.log_cli_call(
command=command,
stdin_content=prompt_text,
stdout_content="\n".join(stdout_content),
stderr_content=stderr_final,
latency=current_latency
command = command,
stdin_content = prompt_text,
stdout_content = "\n".join(stdout_content),
stderr_content = stderr_final,
latency = current_latency
)
self.last_latency = current_latency
+61 -81
View File
@@ -7,38 +7,38 @@ from dataclasses import dataclass, field
@dataclass
class UISnapshot:
"""Capture of restorable UI state."""
ai_input: str
project_system_prompt: str
global_system_prompt: str
base_system_prompt: str
ai_input: str
project_system_prompt: str
global_system_prompt: str
base_system_prompt: str
use_default_base_prompt: bool
temperature: float
top_p: float
max_tokens: int
auto_add_history: bool
disc_entries: list[dict]
files: list[dict]
context_files: list[dict]
screenshots: list[str]
temperature: float
top_p: float
max_tokens: int
auto_add_history: bool
disc_entries: list[dict]
files: list[dict]
context_files: list[dict]
screenshots: list[str]
def to_dict(self) -> dict:
"""
[C: src/models.py:ContextPreset.to_dict, src/models.py:ExternalEditorConfig.to_dict, src/models.py:MCPConfiguration.to_dict, src/models.py:RAGConfig.to_dict, src/models.py:ToolPreset.to_dict, src/models.py:Track.to_dict, src/models.py:TrackState.to_dict, src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
[C: src/models.py:ContextPreset.to_dict, src/models.py:ExternalEditorConfig.to_dict, src/models.py:MCPConfiguration.to_dict, src/models.py:RAGConfig.to_dict, src/models.py:ToolPreset.to_dict, src/models.py:Track.to_dict, src/models.py:TrackState.to_dict, src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return {
"ai_input": self.ai_input,
"project_system_prompt": self.project_system_prompt,
"global_system_prompt": self.global_system_prompt,
"base_system_prompt": self.base_system_prompt,
"ai_input": self.ai_input,
"project_system_prompt": self.project_system_prompt,
"global_system_prompt": self.global_system_prompt,
"base_system_prompt": self.base_system_prompt,
"use_default_base_prompt": self.use_default_base_prompt,
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"auto_add_history": self.auto_add_history,
"disc_entries": self.disc_entries,
"files": self.files,
"context_files": self.context_files,
"screenshots": self.screenshots
"temperature": self.temperature,
"top_p": self.top_p,
"max_tokens": self.max_tokens,
"auto_add_history": self.auto_add_history,
"disc_entries": self.disc_entries,
"files": self.files,
"context_files": self.context_files,
"screenshots": self.screenshots
}
@classmethod
@@ -47,31 +47,31 @@ class UISnapshot:
[C: src/models.py:ContextPreset.from_dict, src/models.py:ExternalEditorConfig.from_dict, src/models.py:MCPConfiguration.from_dict, src/models.py:RAGConfig.from_dict, src/models.py:ToolPreset.from_dict, src/models.py:Track.from_dict, src/models.py:TrackState.from_dict, src/models.py:load_mcp_config, src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return cls(
ai_input=data.get("ai_input", ""),
project_system_prompt=data.get("project_system_prompt", ""),
global_system_prompt=data.get("global_system_prompt", ""),
base_system_prompt=data.get("base_system_prompt", ""),
use_default_base_prompt=data.get("use_default_base_prompt", True),
temperature=data.get("temperature", 0.0),
top_p=data.get("top_p", 1.0),
max_tokens=data.get("max_tokens", 4096),
auto_add_history=data.get("auto_add_history", False),
disc_entries=data.get("disc_entries", []),
files=data.get("files", []),
context_files=data.get("context_files", []),
screenshots=data.get("screenshots", [])
ai_input = data.get("ai_input", ""),
project_system_prompt = data.get("project_system_prompt", ""),
global_system_prompt = data.get("global_system_prompt", ""),
base_system_prompt = data.get("base_system_prompt", ""),
use_default_base_prompt = data.get("use_default_base_prompt", True),
temperature = data.get("temperature", 0.0),
top_p = data.get("top_p", 1.0),
max_tokens = data.get("max_tokens", 4096),
auto_add_history = data.get("auto_add_history", False),
disc_entries = data.get("disc_entries", []),
files = data.get("files", []),
context_files = data.get("context_files", []),
screenshots = data.get("screenshots", [])
)
@dataclass
class HistoryEntry:
state: typing.Any
state: typing.Any
description: str
timestamp: float = field(default_factory=lambda: time.time())
timestamp: float = field(default_factory=lambda: time.time())
class HistoryManager:
def __init__(self, max_capacity: int = 100):
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.max_capacity = max_capacity
self._undo_stack: typing.List[HistoryEntry] = []
@@ -79,11 +79,9 @@ class HistoryManager:
def push(self, state: typing.Any, description: str) -> None:
"""
Pushes a new state to the undo stack and clears the redo stack.
If the undo stack exceeds max_capacity, the oldest state is removed.
[C: tests/test_history.py:test_jump_to_undo, tests/test_history.py:test_max_capacity, tests/test_history.py:test_push_state, tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo]
Pushes a new state to the undo stack and clears the redo stack.
If the undo stack exceeds max_capacity, the oldest state is removed.
[C: tests/test_history.py:test_jump_to_undo, tests/test_history.py:test_max_capacity, tests/test_history.py:test_push_state, tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo]
"""
entry = HistoryEntry(state=state, description=description)
self._undo_stack.append(entry)
@@ -93,47 +91,35 @@ class HistoryManager:
def undo(self, current_state: typing.Any, current_description: str = "Current State") -> typing.Optional[HistoryEntry]:
"""
Undoes the last action by moving the current_state to the redo stack
and returning the top of the undo stack.
[C: tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo, tests/test_history_manager.py:TestHistoryManager.test_undo_no_history_returns_none]
Undoes the last action by moving the current_state to the redo stack
and returning the top of the undo stack.
[C: tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo, tests/test_history_manager.py:TestHistoryManager.test_undo_no_history_returns_none]
"""
if not self._undo_stack:
return None
if not self._undo_stack: return None
redo_entry = HistoryEntry(state=current_state, description=current_description)
self._redo_stack.append(redo_entry)
return self._undo_stack.pop()
def redo(self, current_state: typing.Any, current_description: str = "Current State") -> typing.Optional[HistoryEntry]:
"""
Redoes the last undone action by moving the current_state to the undo stack
and returning the top of the redo stack.
[C: tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_redo_no_history_returns_none, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo]
Redoes the last undone action by moving the current_state to the undo stack
and returning the top of the redo stack.
[C: tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_redo_no_history_returns_none, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo]
"""
if not self._redo_stack:
return None
if not self._redo_stack: return None
undo_entry = HistoryEntry(state=current_state, description=current_description)
self._undo_stack.append(undo_entry)
return self._redo_stack.pop()
@property
def can_undo(self) -> bool:
return len(self._undo_stack) > 0
def can_undo(self) -> bool: return len(self._undo_stack) > 0
@property
def can_redo(self) -> bool:
return len(self._redo_stack) > 0
def can_redo(self) -> bool: return len(self._redo_stack) > 0
def get_history(self) -> typing.List[typing.Dict[str, typing.Any]]:
"""
Returns a list of descriptions and timestamps for the undo stack.
[C: tests/test_history.py:test_initial_state, tests/test_history.py:test_push_state, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions]
Returns a list of descriptions and timestamps for the undo stack.
[C: tests/test_history.py:test_initial_state, tests/test_history.py:test_push_state, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions]
"""
return [
{"description": e.description, "timestamp": e.timestamp}
@@ -142,20 +128,14 @@ class HistoryManager:
def jump_to_undo(self, index: int, current_state: typing.Any, current_description: str = "Before Jump") -> typing.Optional[HistoryEntry]:
"""
Jumps to a specific state in the undo stack by moving subsequent states
and the current_state to the redo stack.
[C: tests/test_history.py:test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo]
Jumps to a specific state in the undo stack by moving subsequent states
and the current_state to the redo stack.
[C: tests/test_history.py:test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo]
"""
if index < 0 or index >= len(self._undo_stack):
return None
if index < 0 or index >= len(self._undo_stack): return None
# Move current state to redo
self._redo_stack.append(HistoryEntry(state=current_state, description=current_description))
# Move states between index and top of undo to redo
while len(self._undo_stack) > index + 1:
self._redo_stack.append(self._undo_stack.pop())
return self._undo_stack.pop()
return self._undo_stack.pop()
+49 -50
View File
@@ -1,6 +1,8 @@
from __future__ import annotations
import copy
import importlib
import sys
import traceback
from dataclasses import dataclass, field
@@ -9,62 +11,59 @@ from typing import Any
@dataclass
class HotModule:
name: str
file_path: str
state_keys: list[str] = field(default_factory=list)
delegation_targets: list[str] = field(default_factory=list)
name: str
file_path: str
state_keys: list[str] = field(default_factory=list)
delegation_targets: list[str] = field(default_factory=list)
class HotReloader:
HOT_MODULES: dict[str, HotModule] = {}
last_error: str | None = None
is_error_state: bool = False
HOT_MODULES: dict[str, HotModule] = {}
last_error: str | None = None
is_error_state: bool = False
@classmethod
def register(cls, module: HotModule) -> None:
if module.name in cls.HOT_MODULES:
raise ValueError(f"Module {module.name} already registered")
cls.HOT_MODULES[module.name] = module
@classmethod
def register(cls, module: HotModule) -> None:
if module.name in cls.HOT_MODULES:
raise ValueError(f"Module {module.name} already registered")
cls.HOT_MODULES[module.name] = module
@classmethod
def capture_state(cls, app: Any, state_keys: list[str]) -> dict[str, Any]:
return {key: copy.deepcopy(getattr(app, key, None)) for key in state_keys if hasattr(app, key)}
@classmethod
def capture_state(cls, app: Any, state_keys: list[str]) -> dict[str, Any]:
return {key: copy.deepcopy(getattr(app, key, None)) for key in state_keys if hasattr(app, key)}
@classmethod
def restore_state(cls, app: Any, state: dict[str, Any]) -> None:
for key, value in state.items():
setattr(app, key, value)
@classmethod
def restore_state(cls, app: Any, state: dict[str, Any]) -> None:
for key, value in state.items():
setattr(app, key, value)
@classmethod
def reload(cls, module_name: str, app: Any) -> bool:
if module_name not in cls.HOT_MODULES:
cls.last_error = f"Module {module_name} not registered"
cls.is_error_state = True
return False
@classmethod
def reload(cls, module_name: str, app: Any) -> bool:
if module_name not in cls.HOT_MODULES:
cls.last_error = f"Module {module_name} not registered"
cls.is_error_state = True
return False
hm = cls.HOT_MODULES[module_name]
state = cls.capture_state(app, hm.state_keys)
hm = cls.HOT_MODULES[module_name]
state = cls.capture_state(app, hm.state_keys)
try:
import importlib
import sys
if module_name in sys.modules:
old_module = sys.modules[module_name]
importlib.reload(old_module)
else:
importlib.import_module(module_name)
cls.last_error = None
cls.is_error_state = False
return True
except Exception:
cls.restore_state(app, state)
cls.last_error = traceback.format_exc()
cls.is_error_state = True
return False
try:
if module_name in sys.modules:
old_module = sys.modules[module_name]
importlib.reload(old_module)
else:
importlib.import_module(module_name)
cls.last_error = None
cls.is_error_state = False
return True
except Exception:
cls.restore_state(app, state)
cls.last_error = traceback.format_exc()
cls.is_error_state = True
return False
@classmethod
def reload_all(cls, app: Any) -> bool:
success = True
for name in cls.HOT_MODULES:
if not cls.reload(name, app):
success = False
return success
@classmethod
def reload_all(cls, app: Any) -> bool:
success = True
for name in cls.HOT_MODULES:
if not cls.reload(name, app): success = False
return success
+18 -18
View File
@@ -65,7 +65,7 @@ class _ScopeMenu:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._label = label
self._label = label
self._active = False
def __enter__(self):
self._active = imgui.begin_menu(self._label)
@@ -109,7 +109,7 @@ class _ScopePopup:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._id = id_str
self._id = id_str
self._active = False
def __enter__(self):
self._active = imgui.begin_popup(self._id)
@@ -125,10 +125,10 @@ class _ScopePopupModal:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._name = name
self._name = name
self._visible = visible
self._flags = flags
self._active = False
self._flags = flags
self._active = False
def __enter__(self):
self._active, self._visible = imgui.begin_popup_modal(self._name, self._visible, self._flags)
return self._active, self._visible
@@ -172,10 +172,10 @@ class _ScopeTable:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._name = name
self._name = name
self._columns = columns
self._flags = flags
self._active = False
self._flags = flags
self._active = False
def __enter__(self):
self._active = imgui.begin_table(self._name, self._columns, self._flags)
return self._active
@@ -190,8 +190,8 @@ class _ScopeTabBar:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._id = id_str
self._flags = flags
self._id = id_str
self._flags = flags
self._active = False
def __enter__(self):
self._active = imgui.begin_tab_bar(self._id, self._flags)
@@ -207,10 +207,10 @@ class _ScopeTabItem:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._label = label
self._flags = flags
self._label = label
self._flags = flags
self._expanded = False
self._open = None
self._open = None
def __enter__(self):
self._expanded, self._open = imgui.begin_tab_item(self._label, flags=self._flags)
return self._expanded, self._open
@@ -246,8 +246,8 @@ class _ScopeTreeNodeEx:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._label = label
self._flags = flags
self._label = label
self._flags = flags
self._opened = False
def __enter__(self):
self._opened = imgui.tree_node_ex(self._label, self._flags)
@@ -263,10 +263,10 @@ class _ScopeWindow:
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self._name = name
self._name = name
self._visible = visible
self._flags = flags
self._result = None
self._flags = flags
self._result = None
def __enter__(self):
self._result = imgui.begin(self._name, self._visible, self._flags)
return self._result
+20 -26
View File
@@ -10,40 +10,34 @@ from src.log_registry import LogRegistry
class LogPruner:
"""
Handles the automated deletion of old and insignificant session logs.
Ensures that only whitelisted or significant sessions (based on size/content)
are preserved long-term.
Handles the automated deletion of old and insignificant session logs.
Ensures that only whitelisted or significant sessions (based on size/content)
are preserved long-term.
"""
def __init__(self, log_registry: LogRegistry, logs_dir: str) -> None:
"""
Initializes the LogPruner.
Args:
log_registry: An instance of LogRegistry to check session data.
logs_dir: The path to the directory containing session sub-directories.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
Initializes the LogPruner.
Args:
log_registry: An instance of LogRegistry to check session data.
logs_dir: The path to the directory containing session sub-directories.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.log_registry = log_registry
self.logs_dir = logs_dir
self.logs_dir = logs_dir
def prune(self, max_age_days: int = 1, min_size_kb: int = 2) -> None:
"""
Prunes old and small session directories from the logs directory.
Deletes session directories that meet the following criteria:
1. The session start time is older than max_age_days.
2. The session name is NOT in the whitelist provided by the LogRegistry.
3. The total size of all files within the session directory is less than min_size_kb.
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_logging_e2e.py:test_logging_e2e]
Prunes old and small session directories from the logs directory.
Deletes session directories that meet the following criteria:
1. The session start time is older than max_age_days.
2. The session name is NOT in the whitelist provided by the LogRegistry.
3. The total size of all files within the session directory is less than min_size_kb.
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_logging_e2e.py:test_logging_e2e]
"""
now = datetime.now()
now = datetime.now()
cutoff_time = now - timedelta(days=max_age_days)
# Ensure the base logs directory exists.
if not os.path.isdir(self.logs_dir):
@@ -56,7 +50,7 @@ class LogPruner:
# Prune sessions if their size is less than threshold
for session_info in old_sessions_to_check:
session_id = session_info['session_id']
session_id = session_info['session_id']
session_path = session_info['path']
if not session_path:
continue
@@ -128,4 +122,4 @@ class LogPruner:
except OSError as e:
sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n")
self.log_registry.save_registry()
self.log_registry.save_registry()
+77 -95
View File
@@ -49,21 +49,17 @@ from typing import Any
class LogRegistry:
"""
Manages a persistent registry of session logs using a TOML file.
Tracks session paths, start times, whitelisting status, and metadata.
Manages a persistent registry of session logs using a TOML file.
Tracks session paths, start times, whitelisting status, and metadata.
"""
def __init__(self, registry_path: str) -> None:
"""
Initializes the LogRegistry with a path to the registry file.
Initializes the LogRegistry with a path to the registry file.
Args:
registry_path (str): The file path to the TOML registry.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
Args:
registry_path (str): The file path to the TOML registry.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.registry_path = registry_path
self.data: dict[str, dict[str, Any]] = {}
@@ -76,10 +72,8 @@ class LogRegistry:
def load_registry(self) -> None:
"""
Loads the registry data from the TOML file into memory.
Handles date/time conversions from TOML-native formats to strings for consistency.
Loads the registry data from the TOML file into memory.
Handles date/time conversions from TOML-native formats to strings for consistency.
"""
if os.path.exists(self.registry_path):
try:
@@ -106,11 +100,9 @@ class LogRegistry:
def save_registry(self) -> None:
"""
Serializes and saves the current registry data to the TOML file.
Converts internal datetime objects to ISO format strings for compatibility.
[C: tests/test_logging_e2e.py:test_logging_e2e]
Serializes and saves the current registry data to the TOML file.
Converts internal datetime objects to ISO format strings for compatibility.
[C: tests/test_logging_e2e.py:test_logging_e2e]
"""
try:
# Convert datetime objects to ISO format strings for TOML serialization
@@ -130,7 +122,7 @@ class LogRegistry:
if mk == 'timestamp' and isinstance(mv, datetime):
metadata_copy[mk] = mv.isoformat()
else:
metadata_copy[mk] = mv
metadata_copy[mk] = mv
session_data_copy[k] = metadata_copy
else:
session_data_copy[k] = v
@@ -142,15 +134,13 @@ class LogRegistry:
def register_session(self, session_id: str, path: str, start_time: datetime | str) -> None:
"""
Registers a new session in the registry.
Args:
session_id (str): Unique identifier for the session.
path (str): File path to the session's log directory.
start_time (datetime|str): The timestamp when the session started.
[C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e]
Registers a new session in the registry.
Args:
session_id (str): Unique identifier for the session.
path (str): File path to the session's log directory.
start_time (datetime|str): The timestamp when the session started.
[C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e]
"""
if session_id in self.data:
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
@@ -160,27 +150,25 @@ class LogRegistry:
else:
start_time_str = start_time
self.data[session_id] = {
'path': path,
'start_time': start_time_str,
'path': path,
'start_time': start_time_str,
'whitelisted': False,
'metadata': None
'metadata': None
}
self.save_registry()
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
"""
Updates metadata fields for an existing session.
Args:
session_id (str): Unique identifier for the session.
message_count (int): Total number of messages in the session.
errors (int): Number of errors identified in logs.
size_kb (int): Total size of the session logs in kilobytes.
whitelisted (bool): Whether the session should be protected from pruning.
reason (str): Explanation for the current whitelisting status.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata]
Updates metadata fields for an existing session.
Args:
session_id (str): Unique identifier for the session.
message_count (int): Total number of messages in the session.
errors (int): Number of errors identified in logs.
size_kb (int): Total size of the session logs in kilobytes.
whitelisted (bool): Whether the session should be protected from pruning.
reason (str): Explanation for the current whitelisting status.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata]
"""
if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for metadata update.")
@@ -192,10 +180,10 @@ class LogRegistry:
metadata = self.data[session_id].get('metadata')
if isinstance(metadata, dict):
metadata['message_count'] = message_count
metadata['errors'] = errors
metadata['size_kb'] = size_kb
metadata['whitelisted'] = whitelisted
metadata['reason'] = reason
metadata['errors'] = errors
metadata['size_kb'] = size_kb
metadata['whitelisted'] = whitelisted
metadata['reason'] = reason
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
# Also update the top-level whitelisted flag if provided
if whitelisted is not None:
@@ -204,16 +192,14 @@ class LogRegistry:
def is_session_whitelisted(self, session_id: str) -> bool:
"""
Checks if a specific session is marked as whitelisted.
Args:
session_id (str): Unique identifier for the session.
Returns:
bool: True if whitelisted, False otherwise.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
Checks if a specific session is marked as whitelisted.
Args:
session_id (str): Unique identifier for the session.
Returns:
bool: True if whitelisted, False otherwise.
[C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e]
"""
session_data = self.data.get(session_id)
if session_data is None:
@@ -223,15 +209,13 @@ class LogRegistry:
def update_auto_whitelist_status(self, session_id: str) -> None:
"""
Analyzes session logs and updates whitelisting status based on heuristics.
Sessions are automatically whitelisted if they contain error keywords,
have a high message count, or exceed a size threshold.
Args:
session_id (str): Unique identifier for the session to analyze.
[C: src/session_logger.py:close_session]
Analyzes session logs and updates whitelisting status based on heuristics.
Sessions are automatically whitelisted if they contain error keywords,
have a high message count, or exceed a size threshold.
Args:
session_id (str): Unique identifier for the session to analyze.
[C: src/session_logger.py:close_session]
"""
if session_id not in self.data:
return
@@ -239,9 +223,9 @@ class LogRegistry:
session_path = session_data.get('path')
if not session_path or not os.path.isdir(str(session_path)):
return
total_size_bytes = 0
message_count = 0
found_keywords = []
total_size_bytes = 0
message_count = 0
found_keywords = []
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
try:
for entry in os.scandir(str(session_path)):
@@ -261,41 +245,39 @@ class LogRegistry:
pass
except Exception:
pass
size_kb = total_size_bytes / 1024
size_kb = total_size_bytes / 1024
whitelisted = False
reason = ""
reason = ""
if found_keywords:
whitelisted = True
reason = f"Found keywords: {', '.join(found_keywords)}"
reason = f"Found keywords: {', '.join(found_keywords)}"
elif message_count > 10:
whitelisted = True
reason = f"High message count: {message_count}"
reason = f"High message count: {message_count}"
elif size_kb > 50:
whitelisted = True
reason = f"Large session size: {size_kb:.1f} KB"
reason = f"Large session size: {size_kb:.1f} KB"
self.update_session_metadata(
session_id,
message_count=message_count,
errors=len(found_keywords),
size_kb=int(size_kb),
whitelisted=whitelisted,
reason=reason
message_count = message_count,
errors = len(found_keywords),
size_kb = int(size_kb),
whitelisted = whitelisted,
reason = reason
)
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]:
"""
Retrieves a list of sessions that are older than a specific cutoff time
and are not marked as whitelisted.
Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0).
Args:
cutoff_datetime (datetime): The threshold time for identifying old sessions.
Returns:
list: A list of dictionaries containing session details (id, path, start_time).
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
Retrieves a list of sessions that are older than a specific cutoff time
and are not marked as whitelisted.
Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0).
Args:
cutoff_datetime (datetime): The threshold time for identifying old sessions.
Returns:
list: A list of dictionaries containing session details (id, path, start_time).
[C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions]
"""
old_sessions = []
for session_id, session_data in self.data.items():
@@ -316,14 +298,14 @@ class LogRegistry:
is_empty = True
else:
message_count = metadata.get('message_count', -1)
size_kb = metadata.get('size_kb', -1)
is_empty = (message_count == 0 or size_kb == 0)
size_kb = metadata.get('size_kb', -1)
is_empty = (message_count == 0 or size_kb == 0)
if not is_whitelisted:
if is_empty or (start_time is not None and start_time < cutoff_datetime):
old_sessions.append({
'session_id': session_id,
'path': session_data.get('path'),
'path': session_data.get('path'),
'start_time': start_time_raw
})
return old_sessions
+46 -56
View File
@@ -10,6 +10,10 @@ from imgui_bundle import imgui_md, imgui, immapp, imgui_color_text_edit as ed
from pathlib import Path
from typing import Optional, Dict, Callable
from src import theme_2
from src.markdown_table import parse_tables, render_table
def _get_language_id(name: str):
"""Get a language identifier for ImGuiColorTextEdit.
@@ -41,28 +45,24 @@ def _set_editor_language(editor, lang_obj) -> None:
1.92.801+: editor.set_language(obj). 1.92.5: editor.set_language_definition(obj).
No-op when lang_obj is None (used to skip the call for unknown languages).
"""
if lang_obj is None:
return
if hasattr(editor, "set_language"):
editor.set_language(lang_obj)
elif hasattr(editor, "set_language_definition"):
editor.set_language_definition(lang_obj)
if lang_obj is None: return
if hasattr(editor, "set_language"): editor.set_language(lang_obj)
elif hasattr(editor, "set_language_definition"): editor.set_language_definition(lang_obj)
class MarkdownRenderer:
"""
Hybrid Markdown renderer that uses imgui_md for text/headers
and ImGuiColorTextEdit for syntax-highlighted code blocks.
Hybrid Markdown renderer that uses imgui_md for text/headers
and ImGuiColorTextEdit for syntax-highlighted code blocks.
"""
def __init__(self):
"""
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.options = imgui_md.MarkdownOptions()
# Base path for fonts (Inter family)
self.options.font_options.font_base_path = "fonts/Inter"
self.options.font_options.regular_size = 18.0
self.options.font_options.regular_size = 18.0
# Configure callbacks
self.options.callbacks.on_open_link = self._on_open_link
@@ -80,22 +80,21 @@ class MarkdownRenderer:
# Apply the current theme's syntax palette on construction so new
# editors we create pick up the right colors. The renderer is re-created
# when the theme changes (see theme_2 module-load behavior).
from src import theme_2
palette_id = theme_2.get_syntax_palette_for_theme(theme_2.get_current_palette())
theme_2.apply_syntax_palette(palette_id)
# Language mapping for ImGuiColorTextEdit
self._lang_map = {
"python": _get_language_id("python"),
"py": _get_language_id("python"),
"json": _get_language_id("json"),
"cpp": _get_language_id("cpp"),
"c++": _get_language_id("cpp"),
"c": _get_language_id("c"),
"lua": _get_language_id("lua"),
"sql": _get_language_id("sql"),
"cs": _get_language_id("cs"),
"c#": _get_language_id("cs"),
"py": _get_language_id("python"),
"json": _get_language_id("json"),
"cpp": _get_language_id("cpp"),
"c++": _get_language_id("cpp"),
"c": _get_language_id("c"),
"lua": _get_language_id("lua"),
"sql": _get_language_id("sql"),
"cs": _get_language_id("cs"),
"c#": _get_language_id("cs"),
}
def _on_open_link(self, url: str) -> None:
@@ -119,28 +118,24 @@ class MarkdownRenderer:
def render(self, text: str, context_id: str = "default") -> None:
"""
Render Markdown text with code block interception and GFM table substitution.
[C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
Render Markdown text with code block interception and GFM table substitution.
[C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
"""
if not text:
return
from src.markdown_table import parse_tables, render_table
if not text: return
text = self._normalize_bullet_delimiters(text)
text = self._normalize_nested_list_endings(text)
text = self._normalize_list_continuations(text)
blocks = parse_tables(text)
lines = text.splitlines(keepends=True)
if not lines:
return
if not lines: return
table_at_line: dict[int, int] = {b.span[0]: i for i, b in enumerate(blocks)}
table_end: dict[int, int] = {b.span[0]: b.span[1] for i, b in enumerate(blocks)}
table_at_line: dict[int, int] = {b.span[0]: i for i, b in enumerate(blocks)}
table_end: dict[int, int] = {b.span[0]: b.span[1] for i, b in enumerate(blocks)}
md_buf: list[str] = []
md_buf: list[str] = []
code_buf: list[str] = []
block_idx = 0
in_fence = False
block_idx = 0
in_fence = False
fence_marker = ""
def flush_md() -> None:
@@ -161,11 +156,11 @@ class MarkdownRenderer:
i = 0
while i < len(lines):
line = lines[i]
line = lines[i]
stripped = line.lstrip().rstrip("\r\n")
if stripped.startswith("```"):
if not in_fence:
in_fence = True
in_fence = True
fence_marker = stripped[:3]
flush_md()
code_buf.append(line)
@@ -213,7 +208,6 @@ class MarkdownRenderer:
(we cannot subclass the C++ imgui_md class).
[C: src/markdown_helper.py:MarkdownRenderer.render]
"""
import re
return re.sub(r"(?m)^([ \t]*)\*[ \t]+", r"\1- ", text)
def _normalize_nested_list_endings(self, text: str) -> str:
@@ -226,7 +220,6 @@ class MarkdownRenderer:
paragraph break. Cannot fix the upstream C++ from Python.
[C: src/markdown_helper.py:MarkdownRenderer.render]
"""
import re
lines = text.split("\n")
out: list[str] = []
for i, line in enumerate(lines):
@@ -261,17 +254,16 @@ class MarkdownRenderer:
a single list item. Acceptable for our use case.
[C: src.markdown_helper:MarkdownRenderer.render]
"""
import re
lines = text.split("\n")
out: list[str] = []
prev_was_list = False
prev_indent = 0
prev_indent = 0
for i, line in enumerate(lines):
if line.strip():
out.append(line)
if re.match(r"^\s*[-*+\d]\s+", line):
prev_was_list = True
prev_indent = len(line) - len(line.lstrip())
prev_indent = len(line) - len(line.lstrip())
else:
prev_was_list = False
continue
@@ -285,11 +277,10 @@ class MarkdownRenderer:
out.append(line)
prev_was_list = False
continue
next_line = lines[j]
curr_indent = len(next_line) - len(next_line.lstrip())
next_line = lines[j]
curr_indent = len(next_line) - len(next_line.lstrip())
is_next_list = bool(re.match(r"^\s*[-*+\d]", next_line))
if curr_indent > prev_indent and not is_next_list:
continue
if curr_indent > prev_indent and not is_next_list: continue
out.append(line)
prev_was_list = False
return "\n".join(out)
@@ -300,7 +291,7 @@ class MarkdownRenderer:
def _render_code_block(self, block: str, context_id: str, block_idx: int) -> None:
"""Render a code block using TextEditor for syntax highlighting."""
lines = block.strip('`').split('\n')
lines = block.strip('`').split('\n')
lang_tag = lines[0].strip().lower() if lines else ""
# Heuristic to separate lang tag from code
@@ -333,10 +324,10 @@ class MarkdownRenderer:
if p_id is not None:
editor.set_palette(p_id)
self._editor_cache[cache_key] = editor
self._editor_cache[cache_key] = editor
self._editor_lang_cache[cache_key] = None
editor = self._editor_cache[cache_key]
editor = self._editor_cache[cache_key]
current_lang = self._editor_lang_cache[cache_key]
# Sync text and language. None means "no language set" (skip the call).
@@ -354,10 +345,10 @@ class MarkdownRenderer:
self._editor_lang_cache[cache_key] = lang_tag
# Dynamic height calculation
line_count = code.count('\n') + 1
line_count = code.count('\n') + 1
line_height = imgui.get_text_line_height()
height = (line_count * line_height) + 20
height = min(max(height, 40), 500)
height = (line_count * line_height) + 20
height = min(max(height, 40), 500)
editor.render(f"##code_{context_id}_{block_idx}", a_size=imgui.ImVec2(0, height))
@@ -386,13 +377,12 @@ _renderer: Optional[MarkdownRenderer] = None
def get_renderer() -> MarkdownRenderer:
global _renderer
if _renderer is None:
_renderer = MarkdownRenderer()
if _renderer is None: _renderer = MarkdownRenderer()
return _renderer
def render(text: str, context_id: str = "default") -> None:
"""
[C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
[C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render]
"""
get_renderer().render(text, context_id)
@@ -400,4 +390,4 @@ def render_unindented(text: str) -> None:
get_renderer().render_unindented(text)
def render_code(code: str, lang: str = "", context_id: str = "default", block_idx: int = 0) -> None:
get_renderer().render_code(code, lang, context_id, block_idx)
get_renderer().render_code(code, lang, context_id, block_idx)
+5 -5
View File
@@ -30,8 +30,8 @@ class TableBlock:
[C: src/markdown_helper.py:MarkdownRenderer.render]
"""
headers: list[str]
rows: list[list[str]]
span: tuple[int, int]
rows: list[list[str]]
span: tuple[int, int]
def _split_row(line: str) -> list[str]:
line = line.strip()
@@ -45,7 +45,7 @@ def _is_table_at(lines: list[str], i: int) -> bool:
return bool(_TABLE_SEPARATOR.match(lines[i + 1]))
def parse_tables(text: str) -> list[TableBlock]:
lines = text.splitlines()
lines = text.splitlines()
in_fence = False
blocks: list[TableBlock] = []
i = 0
@@ -53,14 +53,14 @@ def parse_tables(text: str) -> list[TableBlock]:
line = lines[i]
if line.strip().startswith("```"):
in_fence = not in_fence
i += 1
i += 1
continue
if in_fence:
i += 1
continue
if _is_table_at(lines, i):
headers = _split_row(lines[i])
j = i + 2
j = i + 2
rows: list[list[str]] = []
while j < len(lines) and "|" in lines[j] and not _TABLE_SEPARATOR.match(lines[j]):
rows.append(_split_row(lines[j]))
+8 -10
View File
@@ -97,8 +97,8 @@ MUTATING_TOOLS: frozenset[str] = frozenset({
# Set by configure() before the AI send loop starts.
# allowed_paths : set of resolved absolute Path objects (files or dirs)
# base_dirs : set of resolved absolute Path dirs that act as roots
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
_allowed_paths: set[Path] = set()
_base_dirs: set[Path] = set()
_primary_base_dir: Path | None = None
# Injected by gui_2.py - returns a dict of performance metrics
@@ -106,14 +106,12 @@ perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None
def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | None = None) -> None:
"""
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
[C: tests/conftest.py:reset_ai_client, tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration]
Build the allowlist from aggregate file_items.
Called by ai_client before each send so the list reflects the current project.
file_items : list of dicts from aggregate.build_file_items()
extra_base_dirs : additional directory roots to allow traversal of
[C: tests/conftest.py:reset_ai_client, tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration]
"""
global _allowed_paths, _base_dirs, _primary_base_dir
_allowed_paths = set()
+40 -41
View File
@@ -41,6 +41,7 @@ from __future__ import annotations
import datetime
import json
import os
import sys
import tomllib
from dataclasses import dataclass, field
@@ -48,7 +49,8 @@ from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel
from src.paths import get_config_path
from src.dag_engine import TrackDAG
from src.paths import get_config_path
#region: Constants
@@ -164,8 +166,6 @@ def load_config() -> dict[str, Any]:
return tomllib.load(f)
def save_config(config: dict[str, Any]) -> None:
import tomli_w
import sys
config = _clean_nones(config)
sys.stderr.write(f"[DEBUG] Saving config. Theme: {config.get('theme')}\n")
sys.stderr.flush()
@@ -345,7 +345,6 @@ class Track:
"""
[C: tests/test_mma_models.py:test_track_get_executable_tickets, tests/test_mma_models.py:test_track_get_executable_tickets_complex]
"""
from src.dag_engine import TrackDAG
dag = TrackDAG(self.tickets)
return dag.get_ready_tasks()
@@ -926,8 +925,8 @@ class MCPServerConfig:
"""
res = {'auto_start': self.auto_start}
if self.command: res['command'] = self.command
if self.args: res['args'] = self.args
if self.url: res['url'] = self.url
if self.args: res['args'] = self.args
if self.url: res['url'] = self.url
return res
@classmethod
@@ -936,11 +935,11 @@ class MCPServerConfig:
[C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return cls(
name=name,
command=data.get('command'),
args=data.get('args', []),
url=data.get('url'),
auto_start=data.get('auto_start', False),
name = name,
command = data.get('command'),
args = data.get('args', []),
url = data.get('url'),
auto_start = data.get('auto_start', False),
)
@dataclass
@@ -958,30 +957,30 @@ class MCPConfiguration:
"""
[C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
raw_servers = data.get('mcpServers', {})
raw_servers = data.get('mcpServers', {})
parsed_servers = {name: MCPServerConfig.from_dict(name, cfg) for name, cfg in raw_servers.items()}
return cls(mcpServers=parsed_servers)
@dataclass
class VectorStoreConfig:
provider: str
url: Optional[str] = None
api_key: Optional[str] = None
provider: str
url: Optional[str] = None
api_key: Optional[str] = None
collection_name: str = 'manual_slop'
mcp_server: Optional[str] = None
mcp_tool: Optional[str] = None
mcp_server: Optional[str] = None
mcp_tool: Optional[str] = None
def to_dict(self) -> Dict[str, Any]:
"""
[C: src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return {
"provider": self.provider,
"url": self.url,
"api_key": self.api_key,
"provider": self.provider,
"url": self.url,
"api_key": self.api_key,
"collection_name": self.collection_name,
"mcp_server": self.mcp_server,
"mcp_tool": self.mcp_tool,
"mcp_server": self.mcp_server,
"mcp_tool": self.mcp_tool,
}
@classmethod
@@ -990,32 +989,32 @@ class VectorStoreConfig:
[C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return cls(
provider=data["provider"],
url=data.get("url"),
api_key=data.get("api_key"),
collection_name=data.get("collection_name", "manual_slop"),
mcp_server=data.get("mcp_server"),
mcp_tool=data.get("mcp_tool"),
provider = data["provider"],
url = data.get("url"),
api_key = data.get("api_key"),
collection_name = data.get("collection_name", "manual_slop"),
mcp_server = data.get("mcp_server"),
mcp_tool = data.get("mcp_tool"),
)
@dataclass
class RAGConfig:
enabled: bool = False
vector_store: VectorStoreConfig = field(default_factory=lambda: VectorStoreConfig(provider='mock'))
enabled: bool = False
vector_store: VectorStoreConfig = field(default_factory=lambda: VectorStoreConfig(provider='mock'))
embedding_provider: str = 'gemini'
chunk_size: int = 1000
chunk_overlap: int = 200
chunk_size: int = 1000
chunk_overlap: int = 200
def to_dict(self) -> Dict[str, Any]:
"""
[C: src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return {
"enabled": self.enabled,
"vector_store": self.vector_store.to_dict(),
"enabled": self.enabled,
"vector_store": self.vector_store.to_dict(),
"embedding_provider": self.embedding_provider,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
"chunk_size": self.chunk_size,
"chunk_overlap": self.chunk_overlap,
}
@classmethod
@@ -1024,11 +1023,11 @@ class RAGConfig:
[C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags]
"""
return cls(
enabled=data.get("enabled", False),
vector_store=VectorStoreConfig.from_dict(data.get("vector_store", {"provider": "mock"})),
embedding_provider=data.get("embedding_provider", "gemini"),
chunk_size=data.get("chunk_size", 1000),
chunk_overlap=data.get("chunk_overlap", 200),
enabled = data.get("enabled", False),
vector_store = VectorStoreConfig.from_dict(data.get("vector_store", {"provider": "mock"})),
embedding_provider = data.get("embedding_provider", "gemini"),
chunk_size = data.get("chunk_size", 1000),
chunk_overlap = data.get("chunk_overlap", 200),
)
def load_mcp_config(path: str) -> MCPConfiguration: