diff --git a/src/beads_client.py b/src/beads_client.py index 73a87688..6a263468 100644 --- a/src/beads_client.py +++ b/src/beads_client.py @@ -7,10 +7,10 @@ from pathlib import Path @dataclass class Bead: - id: str - title: str + id: str + title: str description: str - status: str = "active" + status: str = "active" class BeadsClient: def __init__(self, working_dir: Path): diff --git a/src/bg_shader.py b/src/bg_shader.py index a693277f..d2d88e06 100644 --- a/src/bg_shader.py +++ b/src/bg_shader.py @@ -1,10 +1,12 @@ # src/bg_shader.py import time import math -from typing import Optional import numpy as np + +from typing import Optional from imgui_bundle import imgui, nanovg as nvg, hello_imgui + class BackgroundShader: def __init__(self): """ diff --git a/src/command_palette.py b/src/command_palette.py index 7da55d6c..d1204e3b 100644 --- a/src/command_palette.py +++ b/src/command_palette.py @@ -1,24 +1,26 @@ from __future__ import annotations +from imgui_bundle import imgui + from dataclasses import dataclass, field from typing import Optional, Callable, List, Dict, Any + @dataclass class Command: - id: str - title: str - category: str - shortcut: Optional[str] = None - description: str = "" + id: str + title: str + category: str + shortcut: Optional[str] = None + description: str = "" enabled_when: Optional[str] = None - action: Optional[Callable] = None - + action: Optional[Callable] = None @dataclass class ScoredCommand: command: Command - score: float + score: float class CommandRegistry: @@ -70,13 +72,10 @@ def _is_subsequence(query: str, target: str) -> bool: def _compute_score(query: str, target: str) -> float: score = 0.0 - if target.startswith(query): - score += 1.0 - elif _starts_at_word_boundary(query, target): - score += 0.5 - if _is_contiguous(query, target): - score += 0.3 - gaps = _count_gaps(query, target) + if target.startswith(query): score += 1.0 + elif _starts_at_word_boundary(query, target): score += 0.5 + if _is_contiguous(query, target): score += 0.3 + gaps = _count_gaps(query, target) score -= 0.1 * gaps return score @@ -92,24 +91,23 @@ def _is_contiguous(query: str, target: str) -> bool: def _count_gaps(query: str, target: str) -> int: - qi = 0 - gaps = 0 + qi = 0 + gaps = 0 last_match = -1 for ti, ch in enumerate(target): if qi < len(query) and ch == query[qi]: - if last_match >= 0 and ti - last_match > 1: - gaps += ti - last_match - 1 + if last_match >= 0 and ti - last_match > 1: gaps += ti - last_match - 1 last_match = ti - qi += 1 + qi += 1 return gaps def _close_palette(app: Any) -> None: """Close the palette and reset all per-open state.""" - app.show_command_palette = False - app._command_palette_query = "" - app._command_palette_selected = 0 - app._command_palette_focused = False + app.show_command_palette = False + app._command_palette_query = "" + app._command_palette_selected = 0 + app._command_palette_focused = False app._command_palette_input_focused = False @@ -128,19 +126,14 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None: if not getattr(app, "show_command_palette", False): return - from imgui_bundle import imgui - viewport = imgui.get_main_viewport() - center = viewport.get_center() + center = viewport.get_center() imgui.set_next_window_pos((center.x - 300, center.y - 200), imgui.Cond_.always) imgui.set_next_window_size((600, 400), imgui.Cond_.always) - if not hasattr(app, "_command_palette_query"): - app._command_palette_query = "" - if not hasattr(app, "_command_palette_selected"): - app._command_palette_selected = 0 - if not hasattr(app, "_command_palette_focused"): - app._command_palette_focused = False + if not hasattr(app, "_command_palette_query"): app._command_palette_query = "" + if not hasattr(app, "_command_palette_selected"): app._command_palette_selected = 0 + if not hasattr(app, "_command_palette_focused"): app._command_palette_focused = False # Set focus on the window + input field ONCE per open. if not app._command_palette_focused: @@ -154,7 +147,7 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None: expanded, opened = imgui.begin("Command Palette##manual_slop", True, imgui.WindowFlags_.no_collapse) if not expanded or not opened: - app.show_command_palette = False + app.show_command_palette = False app._command_palette_focused = False imgui.end() return @@ -167,10 +160,8 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None: # Process Up/Down/Enter BEFORE input_text so we see the keys before the # input field consumes them for cursor movement / text editing. results = fuzzy_match(app._command_palette_query, commands, top_n=20) - if results: - app._command_palette_selected = max(0, min(app._command_palette_selected, len(results) - 1)) - else: - app._command_palette_selected = 0 + if results: app._command_palette_selected = max(0, min(app._command_palette_selected, len(results) - 1)) + else: app._command_palette_selected = 0 if imgui.is_key_pressed(imgui.Key.down_arrow): if results: @@ -188,7 +179,7 @@ def render_palette_modal(app: Any, commands: List[Command]) -> None: if imgui.begin_child("##results", (0, -1)): for i, scored in enumerate(results): is_selected = (i == app._command_palette_selected) - label = f"[{scored.command.category}] {scored.command.title}" + label = f"[{scored.command.category}] {scored.command.title}" clicked, _ = imgui.selectable(label, is_selected) if clicked: app._command_palette_selected = i diff --git a/src/commands.py b/src/commands.py index 812a500b..56419259 100644 --- a/src/commands.py +++ b/src/commands.py @@ -1,13 +1,19 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Callable +import webbrowser + +from pathlib import Path +from typing import TYPE_CHECKING, Callable + +from src import models +from src import theme_2 from src.command_palette import CommandRegistry +from src.hot_reloader import HotReloader if TYPE_CHECKING: from src.gui_2 import App - registry = CommandRegistry() @@ -38,14 +44,10 @@ def reset_session(app: "App") -> None: """Reset Session — Reset the AI session, clear comms and tool logs.""" from src import ai_client ai_client.reset_session() - if hasattr(app, "_handle_reset_session"): - app._handle_reset_session() - if hasattr(app, "_comms_log"): - app._comms_log.clear() - if hasattr(app, "_tool_log"): - app._tool_log.clear() - if hasattr(app, "ai_response"): - app.ai_response = "" + if hasattr(app, "_handle_reset_session"): app._handle_reset_session() + if hasattr(app, "_comms_log"): app._comms_log.clear() + if hasattr(app, "_tool_log"): app._tool_log.clear() + if hasattr(app, "ai_response"): app.ai_response = "" @registry.register @@ -67,8 +69,8 @@ def generate_md_only(app: "App") -> None: """Generate MD Only — Run the AI to produce a markdown file without sending to the chat.""" if hasattr(app, "_do_generate"): try: - md, path, *_ = app._do_generate() - app.last_md = md + md, path, *_ = app._do_generate() + app.last_md = md app.last_md_path = path if hasattr(app, "ai_status"): app.ai_status = f"md written: {path.name}" @@ -98,11 +100,8 @@ def save_project(app: "App") -> None: @registry.register def save_all(app: "App") -> None: """Save All — Flush to project, flush to config, save global config.""" - from src import models - if hasattr(app, "_flush_to_project"): - app._flush_to_project() - if hasattr(app, "_flush_to_config"): - app._flush_to_config() + if hasattr(app, "_flush_to_project"): app._flush_to_project() + if hasattr(app, "_flush_to_config"): app._flush_to_config() if hasattr(app, "config"): try: models.save_config(app.config) @@ -229,7 +228,6 @@ def show_workspace_manager(app: "App") -> None: @registry.register def trigger_hot_reload(app: "App") -> None: """Hot Reload — Reload the GUI module to pick up code changes.""" - from src.hot_reloader import HotReloader HotReloader.reload("src.gui_2", app) @@ -254,28 +252,24 @@ def redo(app: "App") -> None: @registry.register def switch_to_dark_theme(app: "App") -> None: """Switch to Dark Theme (10x Dark palette).""" - from src import theme_2 theme_2.apply("10x Dark") @registry.register def switch_to_light_theme(app: "App") -> None: """Switch to Light Theme (ImGui Light palette).""" - from src import theme_2 theme_2.apply("ImGui Light") @registry.register def switch_to_nerv_theme(app: "App") -> None: """Switch to NERV Theme (Tactical Console aesthetic).""" - from src import theme_2 theme_2.apply("NERV") @registry.register def cycle_theme(app: "App") -> None: """Cycle Theme — Switch to the next theme in the cycle (Dark → Light → NERV → Dark).""" - from src import theme_2 order = ["10x Dark", "ImGui Light", "NERV"] current = theme_2.get_current_palette() if current in order: @@ -292,14 +286,12 @@ def cycle_theme(app: "App") -> None: @registry.register def show_documentation(app: "App") -> None: """Show Documentation — Open the project URL in the browser.""" - import webbrowser webbrowser.open("https://git.cozyair.dev/ed/manual_slop/") @registry.register def show_command_palette_help(app: "App") -> None: """Show Command Palette Help — Open the docs/Readme.md in the Text Viewer.""" - from pathlib import Path if hasattr(app, "readme_text"): docs_readme = Path("docs/Readme.md") if docs_readme.exists(): diff --git a/src/conductor_tech_lead.py b/src/conductor_tech_lead.py index 2beb66db..ddd3b366 100644 --- a/src/conductor_tech_lead.py +++ b/src/conductor_tech_lead.py @@ -44,16 +44,14 @@ from src import mma_prompts def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, Any]]: """ - - - Tier 2 (Tech Lead) call. - Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets. - [C: tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_failure, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_success, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_success, tests/test_orchestration_logic.py:test_generate_tickets] + Tier 2 (Tech Lead) call. + Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets. + [C: tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_failure, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_retry_success, tests/test_conductor_tech_lead.py:TestConductorTechLead.test_generate_tickets_success, tests/test_orchestration_logic.py:test_generate_tickets] """ # 1. Set Tier 2 Model (Tech Lead - Flash) # 2. Construct Prompt system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning") - user_message = ( + user_message = ( f"### TRACK BRIEF:\n{track_brief}\n\n" f"### MODULE SKELETONS:\n{module_skeletons}\n\n" "Please generate the implementation tickets for this track." @@ -68,8 +66,8 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, try: # 3. Call Tier 2 Model response = ai_client.send( - md_content="", - user_message=user_message + md_content = "", + user_message = user_message ) # 4. Parse JSON Output # Extract JSON array from markdown code blocks if present @@ -97,15 +95,13 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, ai_client.set_current_tier(None) from src.dag_engine import TrackDAG -from src.models import Ticket +from src.models import Ticket def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]: """ - - - Sorts a list of tickets based on their 'depends_on' field. - Raises ValueError if a circular dependency or missing internal dependency is detected. - [C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance] + Sorts a list of tickets based on their 'depends_on' field. + Raises ValueError if a circular dependency or missing internal dependency is detected. + [C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance] """ # 1. Convert to Ticket objects for TrackDAG ticket_objs = [] @@ -123,7 +119,7 @@ def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]: if __name__ == "__main__": # Quick test if run directly - test_brief = "Implement a new feature." + test_brief = "Implement a new feature." test_skeletons = "class NewFeature: pass" tickets = generate_tickets(test_brief, test_skeletons) - print(json.dumps(tickets, indent=2)) \ No newline at end of file + print(json.dumps(tickets, indent=2)) diff --git a/src/cost_tracker.py b/src/cost_tracker.py index 7c9d616d..60b9b15f 100644 --- a/src/cost_tracker.py +++ b/src/cost_tracker.py @@ -57,7 +57,7 @@ def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float: for pattern, rates in MODEL_PRICING: if re.search(pattern, model, re.IGNORECASE): - input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"] + input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"] output_cost = (output_tokens / 1_000_000) * rates["output_per_mtok"] return input_cost + output_cost return 0.0 diff --git a/src/dag_engine.py b/src/dag_engine.py index 09057a74..0eabac1e 100644 --- a/src/dag_engine.py +++ b/src/dag_engine.py @@ -45,7 +45,7 @@ class TrackDAG: tickets: A list of Ticket instances defining the graph nodes and edges. [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self.tickets = tickets + self.tickets = tickets self.ticket_map = {t.id: t for t in tickets} def cascade_blocks(self) -> None: @@ -64,7 +64,7 @@ class TrackDAG: # Use a queue-based propagation (BFS) from all currently blocked tickets queue = [t for t in self.tickets if t.status == 'blocked'] - idx = 0 + idx = 0 while idx < len(queue): curr = queue[idx] idx += 1 @@ -110,16 +110,14 @@ class TrackDAG: if start_ticket.id in visited: continue stack = [(start_ticket.id, False)] # (id, is_backtracking) - path = set() + path = set() while stack: node_id, is_backtracking = stack.pop() if is_backtracking: path.remove(node_id) continue - if node_id in path: - return True - if node_id in visited: - continue + if node_id in path: return True + if node_id in visited: continue visited.add(node_id) path.add(node_id) stack.append((node_id, True)) @@ -140,7 +138,7 @@ class TrackDAG: [C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance] """ with get_monitor().scope("dag_topological_sort"): - in_degree = {t.id: len(t.depends_on) for t in self.tickets} + in_degree = {t.id: len(t.depends_on) for t in self.tickets} dependents = {t.id: [] for t in self.tickets} for t in self.tickets: for dep_id in t.depends_on: @@ -148,11 +146,11 @@ class TrackDAG: dependents[dep_id].append(t.id) # Queue starts with nodes having no dependencies - queue = [t.id for t in self.tickets if in_degree[t.id] == 0] + queue = [t.id for t in self.tickets if in_degree[t.id] == 0] result = [] - idx = 0 + idx = 0 while idx < len(queue): - u = queue[idx] + u = queue[idx] idx += 1 result.append(u) for v_id in dependents.get(u, []): @@ -178,7 +176,7 @@ class ExecutionEngine: auto_queue: If True, ready tasks will automatically move to 'in_progress'. [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self.dag = dag + self.dag = dag self.auto_queue = auto_queue def tick(self) -> List[Ticket]: @@ -215,4 +213,5 @@ class ExecutionEngine: """ ticket = self.dag.ticket_map.get(task_id) if ticket: - ticket.status = status \ No newline at end of file + ticket.status = status + \ No newline at end of file diff --git a/src/external_editor.py b/src/external_editor.py index 817a6182..2ad28f77 100644 --- a/src/external_editor.py +++ b/src/external_editor.py @@ -13,139 +13,135 @@ from src.models import ExternalEditorConfig, TextEditorConfig class ExternalEditorLauncher: - def __init__(self, config: ExternalEditorConfig): - """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] - """ - self.config = config + def __init__(self, config: ExternalEditorConfig): + """ + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + """ + self.config = config - def get_editor(self, editor_name: Optional[str] = None) -> Optional[TextEditorConfig]: - """ - [C: tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_by_name, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_returns_default, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_unknown_name] - """ - if editor_name: - return self.config.editors.get(editor_name) - return self.config.get_default() + def get_editor(self, editor_name: Optional[str] = None) -> Optional[TextEditorConfig]: + """ + [C: tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_by_name, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_returns_default, tests/test_external_editor.py:TestExternalEditorLauncher.test_get_editor_unknown_name] + """ + if editor_name: + return self.config.editors.get(editor_name) + return self.config.get_default() - def build_diff_command( - self, editor: TextEditorConfig, original_path: str, modified_path: str - ) -> List[str]: - """ - [C: tests/test_external_editor.py:TestExternalEditorLauncher.test_build_diff_command, tests/test_external_editor_gui.py:test_verify_command_format, tests/test_external_editor_gui.py:test_verify_vscode_command_format] - """ - cmd = [editor.path] + editor.diff_args + [original_path, modified_path] - return cmd + def build_diff_command(self, editor: TextEditorConfig, original_path: str, modified_path: str) -> List[str]: + """ + [C: tests/test_external_editor.py:TestExternalEditorLauncher.test_build_diff_command, tests/test_external_editor_gui.py:test_verify_command_format, tests/test_external_editor_gui.py:test_verify_vscode_command_format] + """ + cmd = [editor.path] + editor.diff_args + [original_path, modified_path] + return cmd - def launch_diff( - self, editor_name: Optional[str], original_path: str, modified_path: str - ) -> Optional[subprocess.Popen]: - """ - [C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_file_not_found, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_missing_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_success] - """ - editor = self.get_editor(editor_name) - if not editor: - return None - cmd = self.build_diff_command(editor, original_path, modified_path) - try: - return subprocess.Popen(cmd) - except FileNotFoundError: - return None + def launch_diff(self, editor_name: Optional[str], original_path: str, modified_path: str) -> Optional[subprocess.Popen]: + """ + [C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_file_not_found, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_missing_editor, tests/test_external_editor.py:TestExternalEditorLauncher.test_launch_diff_success] + """ + editor = self.get_editor(editor_name) + if not editor: + return None + cmd = self.build_diff_command(editor, original_path, modified_path) + try: + return subprocess.Popen(cmd) + except FileNotFoundError: + return None - def launch_editor(self, editor_name: Optional[str], file_path: str) -> Optional[subprocess.Popen]: - editor = self.get_editor(editor_name) - if not editor: - return None - try: - return subprocess.Popen([editor.path, file_path]) - except FileNotFoundError: - return None + def launch_editor(self, editor_name: Optional[str], file_path: str) -> Optional[subprocess.Popen]: + editor = self.get_editor(editor_name) + if not editor: + return None + try: + return subprocess.Popen([editor.path, file_path]) + except FileNotFoundError: + return None _cached_vscode_config: Optional[TextEditorConfig] = None def _find_vscode_in_registry() -> Optional[str]: - paths = [] - reg_keys = [ - r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*", - r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*", - r"HKLM\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\*", - ] - for key in reg_keys: - try: - result = subprocess.run( - ["powershell", "-Command", f"Get-ItemProperty -Path '{key}' -ErrorAction SilentlyContinue | Where-Object {{ $_.DisplayName -like '*Visual Studio Code*' }} | Select-Object -ExpandProperty InstallLocation"], - capture_output=True, text=True, timeout=5 - ) - for line in result.stdout.strip().split('\n'): - line = line.strip() - if line and line != "": - exe_path = line.strip() + "\\Code.exe" - if os.path.exists(exe_path): - paths.append(exe_path) - except Exception: - pass - if paths: - return paths[0] - return None + paths = [] + reg_keys = [ + r"HKLM\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*", + r"HKCU\SOFTWARE\Microsoft\Windows\CurrentVersion\Uninstall\*", + r"HKLM\SOFTWARE\WOW6432Node\Microsoft\Windows\CurrentVersion\Uninstall\*", + ] + for key in reg_keys: + try: + result = subprocess.run( + ["powershell", "-Command", f"Get-ItemProperty -Path '{key}' -ErrorAction SilentlyContinue | Where-Object {{ $_.DisplayName -like '*Visual Studio Code*' }} | Select-Object -ExpandProperty InstallLocation"], + capture_output=True, text=True, timeout=5 + ) + for line in result.stdout.strip().split('\n'): + line = line.strip() + if line and line != "": + exe_path = line.strip() + "\\Code.exe" + if os.path.exists(exe_path): + paths.append(exe_path) + except Exception: + pass + if paths: + return paths[0] + return None def _find_vscode_common_paths() -> Optional[str]: - candidates = [ - r"C:\apps\Microsoft VS Code\Code.exe", - r"C:\Program Files\Microsoft VS Code\Code.exe", - r"C:\Program Files (x86)\Microsoft VS Code\Code.exe", - os.path.expanduser(r"~\AppData\Local\Programs\Microsoft VS Code\Code.exe"), - ] - for path in candidates: - if os.path.exists(path): - return path - return None + candidates = [ + r"C:\apps\Microsoft VS Code\Code.exe", + r"C:\Program Files\Microsoft VS Code\Code.exe", + r"C:\Program Files (x86)\Microsoft VS Code\Code.exe", + os.path.expanduser(r"~\AppData\Local\Programs\Microsoft VS Code\Code.exe"), + ] + for path in candidates: + if os.path.exists(path): + return path + return None def auto_detect_vscode() -> Optional[TextEditorConfig]: - global _cached_vscode_config - if _cached_vscode_config is not None: - return _cached_vscode_config - vscode_path = _find_vscode_in_registry() or _find_vscode_common_paths() - if vscode_path: - _cached_vscode_config = TextEditorConfig( - name="vscode", - path=vscode_path, - diff_args=["--new-window", "--diff"] - ) - return _cached_vscode_config + global _cached_vscode_config + if _cached_vscode_config is not None: + return _cached_vscode_config + vscode_path = _find_vscode_in_registry() or _find_vscode_common_paths() + if vscode_path: + _cached_vscode_config = TextEditorConfig( + name="vscode", + path=vscode_path, + diff_args=["--new-window", "--diff"] + ) + return _cached_vscode_config def get_default_launcher() -> ExternalEditorLauncher: - """ - [C: src/gui_2.py:App._open_patch_in_external_editor, src/gui_2.py:App._render_external_editor_panel] - """ - from src import models - config = models.load_config() - editors_config = config.get("tools", {}).get("text_editors", {}) - default_editor = config.get("tools", {}).get("default_editor", {}).get("default_editor") - ext_config = ExternalEditorConfig.from_dict({ - "editors": editors_config, - "default_editor": default_editor, - }) - launcher = ExternalEditorLauncher(ext_config) - if not launcher.config.editors: - detected = auto_detect_vscode() - if detected: - launcher.config.editors["vscode"] = detected - launcher.config.default_editor = "vscode" - else: - vscode = launcher.config.editors.get("vscode") - if vscode and "--new-window" not in vscode.diff_args: - vscode.diff_args = ["--new-window", "--diff"] - return launcher + """ + [C: src/gui_2.py:App._open_patch_in_external_editor, src/gui_2.py:App._render_external_editor_panel] + """ + from src import models + config = models.load_config() + editors_config = config.get("tools", {}).get("text_editors", {}) + default_editor = config.get("tools", {}).get("default_editor", {}).get("default_editor") + ext_config = ExternalEditorConfig.from_dict({ + "editors": editors_config, + "default_editor": default_editor, + }) + launcher = ExternalEditorLauncher(ext_config) + if not launcher.config.editors: + detected = auto_detect_vscode() + if detected: + launcher.config.editors["vscode"] = detected + launcher.config.default_editor = "vscode" + else: + vscode = launcher.config.editors.get("vscode") + if vscode and "--new-window" not in vscode.diff_args: + vscode.diff_args = ["--new-window", "--diff"] + return launcher def create_temp_modified_file(content: str) -> str: - """ - [C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestHelperFunctions.test_create_temp_modified_file] - """ - with tempfile.NamedTemporaryFile(mode="w", suffix="_modified", delete=False, encoding="utf-8") as f: - f.write(content) - return f.name \ No newline at end of file + """ + [C: src/gui_2.py:App._open_patch_in_external_editor, tests/test_external_editor.py:TestHelperFunctions.test_create_temp_modified_file] + """ + with tempfile.NamedTemporaryFile(mode="w", suffix="_modified", delete=False, encoding="utf-8") as f: + f.write(content) + return f.name diff --git a/src/file_cache.py b/src/file_cache.py index 1489c80a..cb2a42ea 100644 --- a/src/file_cache.py +++ b/src/file_cache.py @@ -49,33 +49,29 @@ _ast_cache: Dict[str, Tuple[float, tree_sitter.Tree]] = {} class ASTParser: """ - - - Parser for extracting AST-based views of source code. - Currently supports Python. + Parser for extracting AST-based views of source code. + Currently supports Python. """ + #region: Core Operations + def __init__(self, language: str) -> None: """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ if language not in ("python", "cpp", "c"): raise ValueError(f"Language '{language}' not supported yet.") self.language_name = language # Load the tree-sitter language grammar - if language == "python": - self.language = tree_sitter.Language(tree_sitter_python.language()) - elif language == "cpp": - self.language = tree_sitter.Language(tree_sitter_cpp.language()) - elif language == "c": - self.language = tree_sitter.Language(tree_sitter_c.language()) + if language == "python": self.language = tree_sitter.Language(tree_sitter_python.language()) + elif language == "cpp": self.language = tree_sitter.Language(tree_sitter_cpp.language()) + elif language == "c": self.language = tree_sitter.Language(tree_sitter_c.language()) self.parser = tree_sitter.Parser(self.language) def parse(self, code: str) -> tree_sitter.Tree: """ - - Parse the given code and return the tree-sitter Tree. - [C: src/mcp_client.py:_search_file, src/mcp_client.py:derive_code_path, src/mcp_client.py:py_check_syntax, src/mcp_client.py:py_get_class_summary, src/mcp_client.py:py_get_definition, src/mcp_client.py:py_get_docstring, src/mcp_client.py:py_get_imports, src/mcp_client.py:py_get_signature, src/mcp_client.py:py_get_symbol_info, src/mcp_client.py:py_get_var_declaration, src/mcp_client.py:py_set_signature, src/mcp_client.py:py_set_var_declaration, src/mcp_client.py:py_update_definition, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/rag_engine.py:RAGEngine._chunk_code, src/summarize.py:_summarise_python, tests/test_ast_parser.py:test_ast_parser_parse, tests/test_tree_sitter_setup.py:test_tree_sitter_python_setup] + Parse the given code and return the tree-sitter Tree. + [C: src/mcp_client.py:_search_file, src/mcp_client.py:derive_code_path, src/mcp_client.py:py_check_syntax, src/mcp_client.py:py_get_class_summary, src/mcp_client.py:py_get_definition, src/mcp_client.py:py_get_docstring, src/mcp_client.py:py_get_imports, src/mcp_client.py:py_get_signature, src/mcp_client.py:py_get_symbol_info, src/mcp_client.py:py_get_var_declaration, src/mcp_client.py:py_set_signature, src/mcp_client.py:py_set_var_declaration, src/mcp_client.py:py_update_definition, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/rag_engine.py:RAGEngine._chunk_code, src/summarize.py:_summarise_python, tests/test_ast_parser.py:test_ast_parser_parse, tests/test_tree_sitter_setup.py:test_tree_sitter_python_setup] """ return self.parser.parse(bytes(code, "utf8")) @@ -85,7 +81,7 @@ class ASTParser: return self.parse(code) try: - p = Path(path) + p = Path(path) mtime = p.stat().st_mtime if p.exists() else 0.0 except Exception: mtime = 0.0 @@ -185,17 +181,18 @@ class ASTParser: if child.type in ("type_identifier", "identifier", "namespace_identifier", "qualified_identifier"): return code_bytes[child.start_byte:child.end_byte].decode("utf8", errors="replace") return "" + #endregion: Core Operations + #region: Skeleton & Curated Views + def get_skeleton(self, code: str, path: Optional[str] = None) -> str: """ - - - Returns a skeleton of a Python file (preserving docstrings, stripping function bodies). - [C: src/mcp_client.py:py_get_skeleton, src/mcp_client.py:ts_c_get_skeleton, src/mcp_client.py:ts_cpp_get_skeleton, src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_skeleton_c, tests/test_ast_parser.py:test_ast_parser_get_skeleton_cpp, tests/test_ast_parser.py:test_ast_parser_get_skeleton_python, tests/test_context_pruner.py:test_ast_caching, tests/test_context_pruner.py:test_performance_large_file] + Returns a skeleton of a Python file (preserving docstrings, stripping function bodies). + [C: src/mcp_client.py:py_get_skeleton, src/mcp_client.py:ts_c_get_skeleton, src/mcp_client.py:ts_cpp_get_skeleton, src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_skeleton_c, tests/test_ast_parser.py:test_ast_parser_get_skeleton_cpp, tests/test_ast_parser.py:test_ast_parser_get_skeleton_python, tests/test_context_pruner.py:test_ast_caching, tests/test_context_pruner.py:test_performance_large_file] """ code_bytes = code.encode("utf8") - tree = self.get_cached_tree(path, code) + tree = self.get_cached_tree(path, code) edits: List[Tuple[int, int, str]] = [] def is_docstring(node: tree_sitter.Node) -> bool: @@ -206,7 +203,7 @@ class ASTParser: def walk(node: tree_sitter.Node) -> None: """ - [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] + [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] """ if node.type in ("function_definition", "method_definition"): body = node.child_by_field_name("body") @@ -218,7 +215,7 @@ class ASTParser: break if body and body.type in ("block", "compound_statement"): - indent = " " * body.start_point.column + indent = " " * body.start_point.column first_stmt = None for child in body.children: if child.type not in ("comment", "{", "}"): @@ -244,17 +241,17 @@ class ASTParser: edits.append((start_byte, end_byte, f"\n{indent}...")) else: start_byte = initializer.start_byte if initializer else body.start_byte - end_byte = body.end_byte + end_byte = body.end_byte # Try to preserve braces for C-style languages if body.type == "compound_statement" and len(body.children) >= 2 and body.children[0].type == "{" and body.children[-1].type == "}": if initializer: start_byte = initializer.start_byte - end_byte = body.children[-1].start_byte + end_byte = body.children[-1].start_byte edits.append((start_byte, end_byte, "{ ... ")) else: start_byte = body.children[0].end_byte - end_byte = body.children[-1].start_byte + end_byte = body.children[-1].start_byte edits.append((start_byte, end_byte, " ... ")) else: edits.append((start_byte, end_byte, "...")) @@ -275,15 +272,13 @@ class ASTParser: return code_bytearray.decode("utf8") def get_curated_view(self, code: str, path: Optional[str] = None) -> str: """ - - - Returns a curated skeleton of a Python file. - Preserves function bodies if they have @core_logic decorator or # [HOT] comment. - Otherwise strips bodies but preserves docstrings. - [C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_curated_view] + Returns a curated skeleton of a Python file. + Preserves function bodies if they have @core_logic decorator or # [HOT] comment. + Otherwise strips bodies but preserves docstrings. + [C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_curated_view] """ code_bytes = code.encode("utf8") - tree = self.get_cached_tree(path, code) + tree = self.get_cached_tree(path, code) edits: List[Tuple[int, int, str]] = [] def is_docstring(node: tree_sitter.Node) -> bool: @@ -318,7 +313,7 @@ class ASTParser: def walk(node: tree_sitter.Node) -> None: """ - [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] + [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] """ if node.type == "function_definition": body = node.child_by_field_name("body") @@ -326,7 +321,7 @@ class ASTParser: # Check if we should preserve it preserve = has_core_logic_decorator(node) or has_hot_comment(node) if not preserve: - indent = " " * body.start_point.column + indent = " " * body.start_point.column first_stmt = None for child in body.children: if child.type != "comment": @@ -334,12 +329,12 @@ class ASTParser: break if first_stmt and is_docstring(first_stmt): start_byte = first_stmt.end_byte - end_byte = body.end_byte + end_byte = body.end_byte if end_byte > start_byte: edits.append((start_byte, end_byte, f"\n{indent}...")) else: start_byte = body.start_byte - end_byte = body.end_byte + end_byte = body.end_byte edits.append((start_byte, end_byte, "...")) for child in node.children: walk(child) @@ -350,16 +345,16 @@ class ASTParser: for start, end, replacement in edits: code_bytearray[start:end] = bytes(replacement, "utf8") return code_bytearray.decode("utf8") + #endregion: Skeleton & Curated Views #region: Targeted Views + def get_targeted_view(self, code: str, function_names: List[str], path: Optional[str] = None) -> str: """ - - - Returns a targeted view of the code including only the specified functions - and their dependencies up to depth 2. - [C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_targeted_view, tests/test_context_pruner.py:test_class_targeted_extraction, tests/test_context_pruner.py:test_targeted_extraction] + Returns a targeted view of the code including only the specified functions + and their dependencies up to depth 2. + [C: src/multi_agent_conductor.py:run_worker_lifecycle, tests/test_ast_parser.py:test_ast_parser_get_targeted_view, tests/test_context_pruner.py:test_class_targeted_extraction, tests/test_context_pruner.py:test_targeted_extraction] """ code_bytes = code.encode("utf8") tree = self.get_cached_tree(path, code) @@ -375,9 +370,9 @@ class ASTParser: elif node.type == "class_definition": name_node = node.child_by_field_name("name") if name_node: - cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") + cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") full_cname = f"{class_name}.{cname}" if class_name else cname - body = node.child_by_field_name("body") + body = node.child_by_field_name("body") if body: collect_functions(body, full_cname) return @@ -413,12 +408,12 @@ class ASTParser: to_include.add(full_name) current_layer = set(to_include) - all_found = set(to_include) + all_found = set(to_include) for _ in range(2): next_layer = set() for name in current_layer: if name in all_functions: - node = all_functions[name] + node = all_functions[name] calls = get_calls(node) for call in calls: for func_name in all_functions: @@ -440,14 +435,14 @@ class ASTParser: def check_for_targeted(node, parent_class=None): if node.type == "function_definition": name_node = node.child_by_field_name("name") - fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" - fullname = f"{parent_class}.{fname}" if parent_class else fname + fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" + fullname = f"{parent_class}.{fname}" if parent_class else fname return fullname in all_found if node.type == "class_definition": - name_node = node.child_by_field_name("name") - cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" + name_node = node.child_by_field_name("name") + cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" full_cname = f"{parent_class}.{cname}" if parent_class else cname - body = node.child_by_field_name("body") + body = node.child_by_field_name("body") if body: for child in body.children: if check_for_targeted(child, full_cname): @@ -461,12 +456,12 @@ class ASTParser: def walk_edits(node, parent_class=None): if node.type == "function_definition": name_node = node.child_by_field_name("name") - fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" - fullname = f"{parent_class}.{fname}" if parent_class else fname + fname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" + fullname = f"{parent_class}.{fname}" if parent_class else fname if fullname in all_found: body = node.child_by_field_name("body") if body and body.type in ("block", "compound_statement"): - indent = " " * body.start_point.column + indent = " " * body.start_point.column first_stmt = None for child in body.children: if child.type != "comment": @@ -474,22 +469,22 @@ class ASTParser: break if first_stmt and is_docstring(first_stmt): start_byte = first_stmt.end_byte - end_byte = body.end_byte + end_byte = body.end_byte if end_byte > start_byte: edits.append((start_byte, end_byte, f"\n{indent}...")) else: start_byte = body.start_byte - end_byte = body.end_byte + end_byte = body.end_byte edits.append((start_byte, end_byte, "...")) else: edits.append((node.start_byte, node.end_byte, "")) return if node.type == "class_definition": if check_for_targeted(node, parent_class): - name_node = node.child_by_field_name("name") - cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" + name_node = node.child_by_field_name("name") + cname = code_bytes[name_node.start_byte:name_node.end_byte].decode("utf8", errors="replace") if name_node else "" full_cname = f"{parent_class}.{cname}" if parent_class else cname - body = node.child_by_field_name("body") + body = node.child_by_field_name("body") if body: for child in body.children: walk_edits(child, full_cname) @@ -517,15 +512,16 @@ class ASTParser: result = code_bytearray.decode("utf8") result = re.sub(r'\n\s*\n\s*\n+', '\n\n', result) return result.strip() + "\n" + #endregion: Targeted Views #region: Symbol Extraction + def get_definition(self, code: str, name: str, path: Optional[str] = None) -> str: """ - - Returns the full source code for a specific definition by name. - Supports 'ClassName::method' or 'method' for C++. - [C: src/mcp_client.py:trace, src/mcp_client.py:ts_c_get_definition, src/mcp_client.py:ts_cpp_get_definition, tests/test_ast_parser.py:test_ast_parser_get_definition_c, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp_template] + Returns the full source code for a specific definition by name. + Supports 'ClassName::method' or 'method' for C++. + [C: src/mcp_client.py:trace, src/mcp_client.py:ts_c_get_definition, src/mcp_client.py:ts_cpp_get_definition, tests/test_ast_parser.py:test_ast_parser_get_definition_c, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp, tests/test_ast_parser.py:test_ast_parser_get_definition_cpp_template] """ code_bytes = code.encode("utf8") tree = self.get_cached_tree(path, code) @@ -621,16 +617,13 @@ class ASTParser: def get_signature(self, code: str, name: str, path: Optional[str] = None) -> str: """ - - - Returns only the signature part of a function or method. - For C/C++, this is the code from the start of the definition until the block start '{'. - [C: src/mcp_client.py:ts_c_get_signature, src/mcp_client.py:ts_cpp_get_signature, tests/test_ast_parser.py:test_ast_parser_get_signature_c, tests/test_ast_parser.py:test_ast_parser_get_signature_cpp] + Returns only the signature part of a function or method. + For C/C++, this is the code from the start of the definition until the block start '{'. + [C: src/mcp_client.py:ts_c_get_signature, src/mcp_client.py:ts_cpp_get_signature, tests/test_ast_parser.py:test_ast_parser_get_signature_c, tests/test_ast_parser.py:test_ast_parser_get_signature_cpp] """ code_bytes = code.encode("utf8") - tree = self.get_cached_tree(path, code) - - parts = re.split(r'::|\.', name) + tree = self.get_cached_tree(path, code) + parts = re.split(r'::|\.', name) def walk(node: tree_sitter.Node, target_parts: List[str]) -> Optional[tree_sitter.Node]: """ @@ -638,7 +631,7 @@ class ASTParser: """ if not target_parts: return None - target = target_parts[0] + target = target_parts[0] best_match = None for child in node.children: @@ -649,7 +642,7 @@ class ASTParser: if sub.type in ("class_specifier", "struct_specifier", "enum_specifier"): check_node = sub break - + is_interesting = check_node.type in ("function_definition", "class_definition", "class_specifier", "struct_specifier", "enum_specifier", "enum_definition", "namespace_definition", "template_declaration", "field_declaration", "declaration") if is_interesting: node_name = self._get_name(check_node, code_bytes) @@ -729,15 +722,15 @@ class ASTParser: return code_bytes[found_node.start_byte:found_node.end_byte].decode("utf8", errors="replace").strip() return f"ERROR: signature for '{name}' not found" + #endregion: Symbol Extraction #region: Analysis & Updates + def get_code_outline(self, code: str, path: Optional[str] = None) -> str: """ - - - Returns a hierarchical outline of the code (classes, structs, functions, methods). - [C: src/mcp_client.py:ts_c_get_code_outline, src/mcp_client.py:ts_cpp_get_code_outline, tests/test_ast_parser.py:test_ast_parser_get_code_outline_c, tests/test_ast_parser.py:test_ast_parser_get_code_outline_cpp] + Returns a hierarchical outline of the code (classes, structs, functions, methods). + [C: src/mcp_client.py:ts_c_get_code_outline, src/mcp_client.py:ts_cpp_get_code_outline, tests/test_ast_parser.py:test_ast_parser_get_code_outline_c, tests/test_ast_parser.py:test_ast_parser_get_code_outline_cpp] """ code_bytes = code.encode("utf8") tree = self.get_cached_tree(path, code) @@ -745,7 +738,7 @@ class ASTParser: def walk(node: tree_sitter.Node, indent: int = 0) -> None: """ - [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] + [C: src/mcp_client.py:_search_file, src/mcp_client.py:py_find_usages, src/mcp_client.py:py_get_hierarchy, src/mcp_client.py:trace, src/outline_tool.py:CodeOutliner.outline, src/outline_tool.py:CodeOutliner.walk, src/summarize.py:_summarise_python] """ ntype = node.type label = "" @@ -778,15 +771,12 @@ class ASTParser: def update_definition(self, code: str, name: str, new_content: str, path: Optional[str] = None) -> str: """ - - - Surgically replace the definition of a class or function by name. - [C: src/mcp_client.py:ts_c_update_definition, src/mcp_client.py:ts_cpp_update_definition, tests/test_ast_parser.py:test_ast_parser_update_definition_cpp] + Surgically replace the definition of a class or function by name. + [C: src/mcp_client.py:ts_c_update_definition, src/mcp_client.py:ts_cpp_update_definition, tests/test_ast_parser.py:test_ast_parser_update_definition_cpp] """ code_bytes = code.encode("utf8") - tree = self.get_cached_tree(path, code) - - parts = re.split(r'::|\.', name) + tree = self.get_cached_tree(path, code) + parts = re.split(r'::|\.', name) def walk(node: tree_sitter.Node, target_parts: List[str]) -> Optional[tree_sitter.Node]: """ @@ -794,7 +784,7 @@ class ASTParser: """ if not target_parts: return None - target = target_parts[0] + target = target_parts[0] best_match = None for child in node.children: @@ -876,12 +866,15 @@ class ASTParser: code_bytearray[found_node.start_byte:found_node.end_byte] = bytes(new_content, "utf8") return code_bytearray.decode("utf8") return f"ERROR: definition '{name}' not found" + #endregion: Analysis & Updates #region: Module Level Utilities + def reset_client() -> None: pass def get_file_id(path: Path) -> Optional[str]: return None + #endregion: Module Level Utilities diff --git a/src/fuzzy_anchor.py b/src/fuzzy_anchor.py index f4fe2183..938640fe 100644 --- a/src/fuzzy_anchor.py +++ b/src/fuzzy_anchor.py @@ -22,18 +22,18 @@ class FuzzyAnchor: start_line and end_line are 1-based. [C: src/gui_2.py:App._populate_auto_slices, src/gui_2.py:App._render_text_viewer_window, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_create_slice_basic, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_anchor_mismatch_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_exact_match, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_deleted_before_returns_none, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_line_inserted_before, tests/test_fuzzy_anchor.py:TestFuzzyAnchor.test_resolve_slice_multiple_lines_changed, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations] """ - lines = text.splitlines() - s_idx = max(0, start_line - 1) - e_idx = min(len(lines), end_line) + lines = text.splitlines() + s_idx = max(0, start_line - 1) + e_idx = min(len(lines), end_line) slice_lines = lines[s_idx:e_idx] - slice_text = "\n".join(slice_lines) + slice_text = "\n".join(slice_lines) return { - "start_line": start_line, - "end_line": end_line, + "start_line": start_line, + "end_line": end_line, "start_context": cls.get_context(lines, s_idx, 3, 1), - "end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order - "content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest() + "end_context": cls.get_context(lines, e_idx - 1, 3, -1)[::-1], # Reverse back to normal order + "content_hash": hashlib.mdsafe(slice_text.encode()).hexdigest() if hasattr(hashlib, 'mdsafe') else hashlib.md5(slice_text.encode()).hexdigest() } @classmethod @@ -47,13 +47,13 @@ class FuzzyAnchor: e_idx = slice_data["end_line"] if 0 <= s_idx < len(lines) and e_idx <= len(lines): current_text = "\n".join(lines[s_idx:e_idx]) - curr_hash = hashlib.md5(current_text.encode()).hexdigest() + curr_hash = hashlib.md5(current_text.encode()).hexdigest() if curr_hash == slice_data["content_hash"]: return (slice_data["start_line"], slice_data["end_line"]) # 2. Fuzzy match start_ctx = slice_data["start_context"] - end_ctx = slice_data["end_context"] + end_ctx = slice_data["end_context"] if not start_ctx or not end_ctx: return None # Search for start_ctx @@ -67,7 +67,7 @@ class FuzzyAnchor: if match: best_s = i break - + if best_s == -1: return None # Search for end_ctx after start_ctx @@ -83,8 +83,8 @@ class FuzzyAnchor: if match: best_e = i + 1 break - + if best_e != -1: return (best_s + 1, best_e) - - return None \ No newline at end of file + + return None diff --git a/src/gemini_cli_adapter.py b/src/gemini_cli_adapter.py index a7136da1..ea7fcb56 100644 --- a/src/gemini_cli_adapter.py +++ b/src/gemini_cli_adapter.py @@ -46,31 +46,25 @@ from src import session_logger class GeminiCliAdapter: """ - - - Adapter for the Gemini CLI that parses streaming JSON output. + Adapter for the Gemini CLI that parses streaming JSON output. """ def __init__(self, binary_path: str = "gemini"): """ - - Initializes the adapter with the path to the gemini CLI executable. - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + Initializes the adapter with the path to the gemini CLI executable. + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.binary_path = binary_path - self.session_id: Optional[str] = None - self.last_usage: Optional[dict[str, Any]] = None + self.session_id: Optional[str] = None + self.last_usage: Optional[dict[str, Any]] = None self.last_latency: float = 0.0 - def send(self, message: str, safety_settings: list[Any] | None = None, system_instruction: str | None = None, - model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]: + def send(self, message: str, safety_settings: list[Any] | None = None, system_instruction: str | None = None, model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]: """ - - - Sends a message to the Gemini CLI and processes the streaming JSON output. - Uses non-blocking line-by-line reading to allow stream_callback. - [C: simulation/user_agent.py:UserSimAgent.generate_response, src/multi_agent_conductor.py:run_worker_lifecycle, src/orchestrator_pm.py:generate_tracks, tests/test_ai_cache_tracking.py:test_gemini_cache_tracking, tests/test_ai_client_cli.py:test_ai_client_send_gemini_cli, tests/test_api_events.py:test_send_emits_events_proper, tests/test_api_events.py:test_send_emits_tool_events, tests/test_deepseek_provider.py:test_deepseek_completion_logic, tests/test_deepseek_provider.py:test_deepseek_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoner_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoning_logic, tests/test_deepseek_provider.py:test_deepseek_streaming, tests/test_deepseek_provider.py:test_deepseek_tool_calling, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_full_flow_integration, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_captures_usage_metadata, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_handles_tool_use_events, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_parses_jsonl_output, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_starts_subprocess_with_correct_args, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_parses_tool_calls_from_streaming_json, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_starts_subprocess_with_model, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_context_bleed_prevention, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_loop_termination, tests/test_gemini_cli_integration.py:test_gemini_cli_full_integration, tests/test_gemini_cli_integration.py:test_gemini_cli_rejection_and_history, tests/test_gemini_cli_parity_regression.py:test_send_invokes_adapter_send, tests/test_gui2_mcp.py:test_mcp_tool_call_is_dispatched, tests/test_tier4_interceptor.py:test_ai_client_passes_qa_callback, tests/test_token_usage.py:test_token_usage_tracking, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast] + Sends a message to the Gemini CLI and processes the streaming JSON output. + Uses non-blocking line-by-line reading to allow stream_callback. + [C: simulation/user_agent.py:UserSimAgent.generate_response, src/multi_agent_conductor.py:run_worker_lifecycle, src/orchestrator_pm.py:generate_tracks, tests/test_ai_cache_tracking.py:test_gemini_cache_tracking, tests/test_ai_client_cli.py:test_ai_client_send_gemini_cli, tests/test_api_events.py:test_send_emits_events_proper, tests/test_api_events.py:test_send_emits_tool_events, tests/test_deepseek_provider.py:test_deepseek_completion_logic, tests/test_deepseek_provider.py:test_deepseek_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoner_payload_verification, tests/test_deepseek_provider.py:test_deepseek_reasoning_logic, tests/test_deepseek_provider.py:test_deepseek_streaming, tests/test_deepseek_provider.py:test_deepseek_tool_calling, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_full_flow_integration, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_captures_usage_metadata, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_handles_tool_use_events, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_parses_jsonl_output, tests/test_gemini_cli_adapter.py:TestGeminiCliAdapter.test_send_starts_subprocess_with_correct_args, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_parses_tool_calls_from_streaming_json, tests/test_gemini_cli_adapter_parity.py:TestGeminiCliAdapterParity.test_send_starts_subprocess_with_model, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_context_bleed_prevention, tests/test_gemini_cli_edge_cases.py:test_gemini_cli_loop_termination, tests/test_gemini_cli_integration.py:test_gemini_cli_full_integration, tests/test_gemini_cli_integration.py:test_gemini_cli_rejection_and_history, tests/test_gemini_cli_parity_regression.py:test_send_invokes_adapter_send, tests/test_gui2_mcp.py:test_mcp_tool_call_is_dispatched, tests/test_tier4_interceptor.py:test_ai_client_passes_qa_callback, tests/test_token_usage.py:test_token_usage_tracking, tests/test_websocket_server.py:test_websocket_subscription_and_broadcast] """ - start_time = time.time() + start_time = time.time() command_parts = [self.binary_path] if model: command_parts.extend(['-m', f'"{model}"']) @@ -85,8 +79,8 @@ class GeminiCliAdapter: prompt_text = f"{system_instruction}\n\n{message}" accumulated_text = "" - tool_calls = [] - stdout_content = [] + tool_calls = [] + stdout_content = [] env = os.environ.copy() env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop" @@ -115,13 +109,13 @@ class GeminiCliAdapter: process = subprocess.Popen( cmd_list, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - encoding="utf-8", - shell=False, - env=env + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + stderr = subprocess.PIPE, + text = True, + encoding = "utf-8", + shell = False, + env = env ) # Use communicate to avoid pipe deadlocks with large input/output. @@ -142,7 +136,7 @@ class GeminiCliAdapter: if not line: continue stdout_content.append(line) try: - data = json.loads(line) + data = json.loads(line) msg_type = data.get("type") if msg_type == "init": if "session_id" in data: @@ -163,9 +157,9 @@ class GeminiCliAdapter: self.session_id = data.get("session_id") elif msg_type == "tool_use": tc = { - "name": data.get("tool_name", data.get("name")), + "name": data.get("tool_name", data.get("name")), "args": data.get("parameters", data.get("args", {})), - "id": data.get("tool_id", data.get("id")) + "id": data.get("tool_id", data.get("id")) } if tc["name"]: tool_calls.append(tc) @@ -180,11 +174,11 @@ class GeminiCliAdapter: raise Exception(f"Gemini CLI failed with exit {process.returncode}\nStderr: {stderr_final}") session_logger.open_session() session_logger.log_cli_call( - command=command, - stdin_content=prompt_text, - stdout_content="\n".join(stdout_content), - stderr_content=stderr_final, - latency=current_latency + command = command, + stdin_content = prompt_text, + stdout_content = "\n".join(stdout_content), + stderr_content = stderr_final, + latency = current_latency ) self.last_latency = current_latency diff --git a/src/history.py b/src/history.py index 091f5516..ab78dce7 100644 --- a/src/history.py +++ b/src/history.py @@ -7,38 +7,38 @@ from dataclasses import dataclass, field @dataclass class UISnapshot: """Capture of restorable UI state.""" - ai_input: str - project_system_prompt: str - global_system_prompt: str - base_system_prompt: str + ai_input: str + project_system_prompt: str + global_system_prompt: str + base_system_prompt: str use_default_base_prompt: bool - temperature: float - top_p: float - max_tokens: int - auto_add_history: bool - disc_entries: list[dict] - files: list[dict] - context_files: list[dict] - screenshots: list[str] + temperature: float + top_p: float + max_tokens: int + auto_add_history: bool + disc_entries: list[dict] + files: list[dict] + context_files: list[dict] + screenshots: list[str] def to_dict(self) -> dict: """ - [C: src/models.py:ContextPreset.to_dict, src/models.py:ExternalEditorConfig.to_dict, src/models.py:MCPConfiguration.to_dict, src/models.py:RAGConfig.to_dict, src/models.py:ToolPreset.to_dict, src/models.py:Track.to_dict, src/models.py:TrackState.to_dict, src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] + [C: src/models.py:ContextPreset.to_dict, src/models.py:ExternalEditorConfig.to_dict, src/models.py:MCPConfiguration.to_dict, src/models.py:RAGConfig.to_dict, src/models.py:ToolPreset.to_dict, src/models.py:Track.to_dict, src/models.py:TrackState.to_dict, src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return { - "ai_input": self.ai_input, - "project_system_prompt": self.project_system_prompt, - "global_system_prompt": self.global_system_prompt, - "base_system_prompt": self.base_system_prompt, + "ai_input": self.ai_input, + "project_system_prompt": self.project_system_prompt, + "global_system_prompt": self.global_system_prompt, + "base_system_prompt": self.base_system_prompt, "use_default_base_prompt": self.use_default_base_prompt, - "temperature": self.temperature, - "top_p": self.top_p, - "max_tokens": self.max_tokens, - "auto_add_history": self.auto_add_history, - "disc_entries": self.disc_entries, - "files": self.files, - "context_files": self.context_files, - "screenshots": self.screenshots + "temperature": self.temperature, + "top_p": self.top_p, + "max_tokens": self.max_tokens, + "auto_add_history": self.auto_add_history, + "disc_entries": self.disc_entries, + "files": self.files, + "context_files": self.context_files, + "screenshots": self.screenshots } @classmethod @@ -47,31 +47,31 @@ class UISnapshot: [C: src/models.py:ContextPreset.from_dict, src/models.py:ExternalEditorConfig.from_dict, src/models.py:MCPConfiguration.from_dict, src/models.py:RAGConfig.from_dict, src/models.py:ToolPreset.from_dict, src/models.py:Track.from_dict, src/models.py:TrackState.from_dict, src/models.py:load_mcp_config, src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return cls( - ai_input=data.get("ai_input", ""), - project_system_prompt=data.get("project_system_prompt", ""), - global_system_prompt=data.get("global_system_prompt", ""), - base_system_prompt=data.get("base_system_prompt", ""), - use_default_base_prompt=data.get("use_default_base_prompt", True), - temperature=data.get("temperature", 0.0), - top_p=data.get("top_p", 1.0), - max_tokens=data.get("max_tokens", 4096), - auto_add_history=data.get("auto_add_history", False), - disc_entries=data.get("disc_entries", []), - files=data.get("files", []), - context_files=data.get("context_files", []), - screenshots=data.get("screenshots", []) + ai_input = data.get("ai_input", ""), + project_system_prompt = data.get("project_system_prompt", ""), + global_system_prompt = data.get("global_system_prompt", ""), + base_system_prompt = data.get("base_system_prompt", ""), + use_default_base_prompt = data.get("use_default_base_prompt", True), + temperature = data.get("temperature", 0.0), + top_p = data.get("top_p", 1.0), + max_tokens = data.get("max_tokens", 4096), + auto_add_history = data.get("auto_add_history", False), + disc_entries = data.get("disc_entries", []), + files = data.get("files", []), + context_files = data.get("context_files", []), + screenshots = data.get("screenshots", []) ) @dataclass class HistoryEntry: - state: typing.Any + state: typing.Any description: str - timestamp: float = field(default_factory=lambda: time.time()) + timestamp: float = field(default_factory=lambda: time.time()) class HistoryManager: def __init__(self, max_capacity: int = 100): """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.max_capacity = max_capacity self._undo_stack: typing.List[HistoryEntry] = [] @@ -79,11 +79,9 @@ class HistoryManager: def push(self, state: typing.Any, description: str) -> None: """ - - - Pushes a new state to the undo stack and clears the redo stack. - If the undo stack exceeds max_capacity, the oldest state is removed. - [C: tests/test_history.py:test_jump_to_undo, tests/test_history.py:test_max_capacity, tests/test_history.py:test_push_state, tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo] + Pushes a new state to the undo stack and clears the redo stack. + If the undo stack exceeds max_capacity, the oldest state is removed. + [C: tests/test_history.py:test_jump_to_undo, tests/test_history.py:test_max_capacity, tests/test_history.py:test_push_state, tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo] """ entry = HistoryEntry(state=state, description=description) self._undo_stack.append(entry) @@ -93,47 +91,35 @@ class HistoryManager: def undo(self, current_state: typing.Any, current_description: str = "Current State") -> typing.Optional[HistoryEntry]: """ - - - Undoes the last action by moving the current_state to the redo stack - and returning the top of the undo stack. - [C: tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo, tests/test_history_manager.py:TestHistoryManager.test_undo_no_history_returns_none] + Undoes the last action by moving the current_state to the redo stack + and returning the top of the undo stack. + [C: tests/test_history.py:test_redo_cleared_on_push, tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_push_and_undo, tests/test_history_manager.py:TestHistoryManager.test_push_clears_redo_stack, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo, tests/test_history_manager.py:TestHistoryManager.test_undo_no_history_returns_none] """ - if not self._undo_stack: - return None - + if not self._undo_stack: return None redo_entry = HistoryEntry(state=current_state, description=current_description) self._redo_stack.append(redo_entry) return self._undo_stack.pop() def redo(self, current_state: typing.Any, current_description: str = "Current State") -> typing.Optional[HistoryEntry]: """ - - - Redoes the last undone action by moving the current_state to the undo stack - and returning the top of the redo stack. - [C: tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_redo_no_history_returns_none, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo] + Redoes the last undone action by moving the current_state to the undo stack + and returning the top of the redo stack. + [C: tests/test_history.py:test_undo_redo, tests/test_history_manager.py:TestHistoryManager.test_redo_no_history_returns_none, tests/test_history_manager.py:TestHistoryManager.test_undo_and_redo] """ - if not self._redo_stack: - return None - + if not self._redo_stack: return None undo_entry = HistoryEntry(state=current_state, description=current_description) self._undo_stack.append(undo_entry) return self._redo_stack.pop() @property - def can_undo(self) -> bool: - return len(self._undo_stack) > 0 - + def can_undo(self) -> bool: return len(self._undo_stack) > 0 @property - def can_redo(self) -> bool: - return len(self._redo_stack) > 0 + def can_redo(self) -> bool: return len(self._redo_stack) > 0 def get_history(self) -> typing.List[typing.Dict[str, typing.Any]]: """ - - Returns a list of descriptions and timestamps for the undo stack. - [C: tests/test_history.py:test_initial_state, tests/test_history.py:test_push_state, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions] + Returns a list of descriptions and timestamps for the undo stack. + [C: tests/test_history.py:test_initial_state, tests/test_history.py:test_push_state, tests/test_history_manager.py:TestHistoryManager.test_get_history_returns_descriptions] """ return [ {"description": e.description, "timestamp": e.timestamp} @@ -142,20 +128,14 @@ class HistoryManager: def jump_to_undo(self, index: int, current_state: typing.Any, current_description: str = "Before Jump") -> typing.Optional[HistoryEntry]: """ - - - Jumps to a specific state in the undo stack by moving subsequent states - and the current_state to the redo stack. - [C: tests/test_history.py:test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo] + Jumps to a specific state in the undo stack by moving subsequent states + and the current_state to the redo stack. + [C: tests/test_history.py:test_jump_to_undo, tests/test_history_manager.py:TestHistoryManager.test_jump_to_undo] """ - if index < 0 or index >= len(self._undo_stack): - return None - + if index < 0 or index >= len(self._undo_stack): return None # Move current state to redo self._redo_stack.append(HistoryEntry(state=current_state, description=current_description)) - # Move states between index and top of undo to redo while len(self._undo_stack) > index + 1: self._redo_stack.append(self._undo_stack.pop()) - - return self._undo_stack.pop() \ No newline at end of file + return self._undo_stack.pop() diff --git a/src/hot_reloader.py b/src/hot_reloader.py index 3dd0494c..9e5a2aa5 100644 --- a/src/hot_reloader.py +++ b/src/hot_reloader.py @@ -1,6 +1,8 @@ from __future__ import annotations import copy +import importlib +import sys import traceback from dataclasses import dataclass, field @@ -9,62 +11,59 @@ from typing import Any @dataclass class HotModule: - name: str - file_path: str - state_keys: list[str] = field(default_factory=list) - delegation_targets: list[str] = field(default_factory=list) + name: str + file_path: str + state_keys: list[str] = field(default_factory=list) + delegation_targets: list[str] = field(default_factory=list) class HotReloader: - HOT_MODULES: dict[str, HotModule] = {} - last_error: str | None = None - is_error_state: bool = False + HOT_MODULES: dict[str, HotModule] = {} + last_error: str | None = None + is_error_state: bool = False - @classmethod - def register(cls, module: HotModule) -> None: - if module.name in cls.HOT_MODULES: - raise ValueError(f"Module {module.name} already registered") - cls.HOT_MODULES[module.name] = module + @classmethod + def register(cls, module: HotModule) -> None: + if module.name in cls.HOT_MODULES: + raise ValueError(f"Module {module.name} already registered") + cls.HOT_MODULES[module.name] = module - @classmethod - def capture_state(cls, app: Any, state_keys: list[str]) -> dict[str, Any]: - return {key: copy.deepcopy(getattr(app, key, None)) for key in state_keys if hasattr(app, key)} + @classmethod + def capture_state(cls, app: Any, state_keys: list[str]) -> dict[str, Any]: + return {key: copy.deepcopy(getattr(app, key, None)) for key in state_keys if hasattr(app, key)} - @classmethod - def restore_state(cls, app: Any, state: dict[str, Any]) -> None: - for key, value in state.items(): - setattr(app, key, value) + @classmethod + def restore_state(cls, app: Any, state: dict[str, Any]) -> None: + for key, value in state.items(): + setattr(app, key, value) - @classmethod - def reload(cls, module_name: str, app: Any) -> bool: - if module_name not in cls.HOT_MODULES: - cls.last_error = f"Module {module_name} not registered" - cls.is_error_state = True - return False + @classmethod + def reload(cls, module_name: str, app: Any) -> bool: + if module_name not in cls.HOT_MODULES: + cls.last_error = f"Module {module_name} not registered" + cls.is_error_state = True + return False - hm = cls.HOT_MODULES[module_name] - state = cls.capture_state(app, hm.state_keys) + hm = cls.HOT_MODULES[module_name] + state = cls.capture_state(app, hm.state_keys) - try: - import importlib - import sys - if module_name in sys.modules: - old_module = sys.modules[module_name] - importlib.reload(old_module) - else: - importlib.import_module(module_name) - cls.last_error = None - cls.is_error_state = False - return True - except Exception: - cls.restore_state(app, state) - cls.last_error = traceback.format_exc() - cls.is_error_state = True - return False + try: + if module_name in sys.modules: + old_module = sys.modules[module_name] + importlib.reload(old_module) + else: + importlib.import_module(module_name) + cls.last_error = None + cls.is_error_state = False + return True + except Exception: + cls.restore_state(app, state) + cls.last_error = traceback.format_exc() + cls.is_error_state = True + return False - @classmethod - def reload_all(cls, app: Any) -> bool: - success = True - for name in cls.HOT_MODULES: - if not cls.reload(name, app): - success = False - return success + @classmethod + def reload_all(cls, app: Any) -> bool: + success = True + for name in cls.HOT_MODULES: + if not cls.reload(name, app): success = False + return success diff --git a/src/imgui_scopes.py b/src/imgui_scopes.py index e89dff93..771fa9d1 100644 --- a/src/imgui_scopes.py +++ b/src/imgui_scopes.py @@ -65,7 +65,7 @@ class _ScopeMenu: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._label = label + self._label = label self._active = False def __enter__(self): self._active = imgui.begin_menu(self._label) @@ -109,7 +109,7 @@ class _ScopePopup: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._id = id_str + self._id = id_str self._active = False def __enter__(self): self._active = imgui.begin_popup(self._id) @@ -125,10 +125,10 @@ class _ScopePopupModal: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._name = name + self._name = name self._visible = visible - self._flags = flags - self._active = False + self._flags = flags + self._active = False def __enter__(self): self._active, self._visible = imgui.begin_popup_modal(self._name, self._visible, self._flags) return self._active, self._visible @@ -172,10 +172,10 @@ class _ScopeTable: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._name = name + self._name = name self._columns = columns - self._flags = flags - self._active = False + self._flags = flags + self._active = False def __enter__(self): self._active = imgui.begin_table(self._name, self._columns, self._flags) return self._active @@ -190,8 +190,8 @@ class _ScopeTabBar: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._id = id_str - self._flags = flags + self._id = id_str + self._flags = flags self._active = False def __enter__(self): self._active = imgui.begin_tab_bar(self._id, self._flags) @@ -207,10 +207,10 @@ class _ScopeTabItem: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._label = label - self._flags = flags + self._label = label + self._flags = flags self._expanded = False - self._open = None + self._open = None def __enter__(self): self._expanded, self._open = imgui.begin_tab_item(self._label, flags=self._flags) return self._expanded, self._open @@ -246,8 +246,8 @@ class _ScopeTreeNodeEx: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._label = label - self._flags = flags + self._label = label + self._flags = flags self._opened = False def __enter__(self): self._opened = imgui.tree_node_ex(self._label, self._flags) @@ -263,10 +263,10 @@ class _ScopeWindow: """ [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ - self._name = name + self._name = name self._visible = visible - self._flags = flags - self._result = None + self._flags = flags + self._result = None def __enter__(self): self._result = imgui.begin(self._name, self._visible, self._flags) return self._result diff --git a/src/log_pruner.py b/src/log_pruner.py index 3d2eac5d..7a94da09 100644 --- a/src/log_pruner.py +++ b/src/log_pruner.py @@ -10,40 +10,34 @@ from src.log_registry import LogRegistry class LogPruner: """ - - - Handles the automated deletion of old and insignificant session logs. - Ensures that only whitelisted or significant sessions (based on size/content) - are preserved long-term. + Handles the automated deletion of old and insignificant session logs. + Ensures that only whitelisted or significant sessions (based on size/content) + are preserved long-term. """ def __init__(self, log_registry: LogRegistry, logs_dir: str) -> None: """ - - - Initializes the LogPruner. - - Args: - log_registry: An instance of LogRegistry to check session data. - logs_dir: The path to the directory containing session sub-directories. - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + Initializes the LogPruner. + + Args: + log_registry: An instance of LogRegistry to check session data. + logs_dir: The path to the directory containing session sub-directories. + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.log_registry = log_registry - self.logs_dir = logs_dir + self.logs_dir = logs_dir def prune(self, max_age_days: int = 1, min_size_kb: int = 2) -> None: """ - - - Prunes old and small session directories from the logs directory. - - Deletes session directories that meet the following criteria: - 1. The session start time is older than max_age_days. - 2. The session name is NOT in the whitelist provided by the LogRegistry. - 3. The total size of all files within the session directory is less than min_size_kb. - [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_logging_e2e.py:test_logging_e2e] + Prunes old and small session directories from the logs directory. + + Deletes session directories that meet the following criteria: + 1. The session start time is older than max_age_days. + 2. The session name is NOT in the whitelist provided by the LogRegistry. + 3. The total size of all files within the session directory is less than min_size_kb. + [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_logging_e2e.py:test_logging_e2e] """ - now = datetime.now() + now = datetime.now() cutoff_time = now - timedelta(days=max_age_days) # Ensure the base logs directory exists. if not os.path.isdir(self.logs_dir): @@ -56,7 +50,7 @@ class LogPruner: # Prune sessions if their size is less than threshold for session_info in old_sessions_to_check: - session_id = session_info['session_id'] + session_id = session_info['session_id'] session_path = session_info['path'] if not session_path: continue @@ -128,4 +122,4 @@ class LogPruner: except OSError as e: sys.stderr.write(f"[LogPruner] Error removing {resolved_path}: {e}\n") - self.log_registry.save_registry() \ No newline at end of file + self.log_registry.save_registry() diff --git a/src/log_registry.py b/src/log_registry.py index 338bde55..133d7758 100644 --- a/src/log_registry.py +++ b/src/log_registry.py @@ -49,21 +49,17 @@ from typing import Any class LogRegistry: """ - - - Manages a persistent registry of session logs using a TOML file. - Tracks session paths, start times, whitelisting status, and metadata. + Manages a persistent registry of session logs using a TOML file. + Tracks session paths, start times, whitelisting status, and metadata. """ def __init__(self, registry_path: str) -> None: """ + Initializes the LogRegistry with a path to the registry file. - - Initializes the LogRegistry with a path to the registry file. - - Args: - registry_path (str): The file path to the TOML registry. - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + Args: + registry_path (str): The file path to the TOML registry. + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.registry_path = registry_path self.data: dict[str, dict[str, Any]] = {} @@ -76,10 +72,8 @@ class LogRegistry: def load_registry(self) -> None: """ - - - Loads the registry data from the TOML file into memory. - Handles date/time conversions from TOML-native formats to strings for consistency. + Loads the registry data from the TOML file into memory. + Handles date/time conversions from TOML-native formats to strings for consistency. """ if os.path.exists(self.registry_path): try: @@ -106,11 +100,9 @@ class LogRegistry: def save_registry(self) -> None: """ - - - Serializes and saves the current registry data to the TOML file. - Converts internal datetime objects to ISO format strings for compatibility. - [C: tests/test_logging_e2e.py:test_logging_e2e] + Serializes and saves the current registry data to the TOML file. + Converts internal datetime objects to ISO format strings for compatibility. + [C: tests/test_logging_e2e.py:test_logging_e2e] """ try: # Convert datetime objects to ISO format strings for TOML serialization @@ -130,7 +122,7 @@ class LogRegistry: if mk == 'timestamp' and isinstance(mv, datetime): metadata_copy[mk] = mv.isoformat() else: - metadata_copy[mk] = mv + metadata_copy[mk] = mv session_data_copy[k] = metadata_copy else: session_data_copy[k] = v @@ -142,15 +134,13 @@ class LogRegistry: def register_session(self, session_id: str, path: str, start_time: datetime | str) -> None: """ - - - Registers a new session in the registry. - - Args: - session_id (str): Unique identifier for the session. - path (str): File path to the session's log directory. - start_time (datetime|str): The timestamp when the session started. - [C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e] + Registers a new session in the registry. + + Args: + session_id (str): Unique identifier for the session. + path (str): File path to the session's log directory. + start_time (datetime|str): The timestamp when the session started. + [C: src/session_logger.py:open_session, tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_handles_relative_paths_starting_with_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_sessions_without_metadata_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_register_session, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata, tests/test_logging_e2e.py:test_logging_e2e] """ if session_id in self.data: print(f"Warning: Session ID '{session_id}' already exists. Overwriting.") @@ -160,27 +150,25 @@ class LogRegistry: else: start_time_str = start_time self.data[session_id] = { - 'path': path, - 'start_time': start_time_str, + 'path': path, + 'start_time': start_time_str, 'whitelisted': False, - 'metadata': None + 'metadata': None } self.save_registry() def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None: """ - - - Updates metadata fields for an existing session. - - Args: - session_id (str): Unique identifier for the session. - message_count (int): Total number of messages in the session. - errors (int): Number of errors identified in logs. - size_kb (int): Total size of the session logs in kilobytes. - whitelisted (bool): Whether the session should be protected from pruning. - reason (str): Explanation for the current whitelisting status. - [C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata] + Updates metadata fields for an existing session. + + Args: + session_id (str): Unique identifier for the session. + message_count (int): Total number of messages in the session. + errors (int): Number of errors identified in logs. + size_kb (int): Total size of the session logs in kilobytes. + whitelisted (bool): Whether the session should be protected from pruning. + reason (str): Explanation for the current whitelisting status. + [C: tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_prune_removes_empty_sessions_regardless_of_age, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_log_registry.py:TestLogRegistry.test_update_session_metadata] """ if session_id not in self.data: print(f"Error: Session ID '{session_id}' not found for metadata update.") @@ -192,10 +180,10 @@ class LogRegistry: metadata = self.data[session_id].get('metadata') if isinstance(metadata, dict): metadata['message_count'] = message_count - metadata['errors'] = errors - metadata['size_kb'] = size_kb - metadata['whitelisted'] = whitelisted - metadata['reason'] = reason + metadata['errors'] = errors + metadata['size_kb'] = size_kb + metadata['whitelisted'] = whitelisted + metadata['reason'] = reason # self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp # Also update the top-level whitelisted flag if provided if whitelisted is not None: @@ -204,16 +192,14 @@ class LogRegistry: def is_session_whitelisted(self, session_id: str) -> bool: """ - - - Checks if a specific session is marked as whitelisted. - - Args: - session_id (str): Unique identifier for the session. - - Returns: - bool: True if whitelisted, False otherwise. - [C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e] + Checks if a specific session is marked as whitelisted. + + Args: + session_id (str): Unique identifier for the session. + + Returns: + bool: True if whitelisted, False otherwise. + [C: tests/test_auto_whitelist.py:test_auto_whitelist_keywords, tests/test_auto_whitelist.py:test_auto_whitelist_large_size, tests/test_auto_whitelist.py:test_auto_whitelist_message_count, tests/test_auto_whitelist.py:test_no_auto_whitelist_insignificant, tests/test_log_registry.py:TestLogRegistry.test_is_session_whitelisted, tests/test_logging_e2e.py:test_logging_e2e] """ session_data = self.data.get(session_id) if session_data is None: @@ -223,15 +209,13 @@ class LogRegistry: def update_auto_whitelist_status(self, session_id: str) -> None: """ - - - Analyzes session logs and updates whitelisting status based on heuristics. - Sessions are automatically whitelisted if they contain error keywords, - have a high message count, or exceed a size threshold. - - Args: - session_id (str): Unique identifier for the session to analyze. - [C: src/session_logger.py:close_session] + Analyzes session logs and updates whitelisting status based on heuristics. + Sessions are automatically whitelisted if they contain error keywords, + have a high message count, or exceed a size threshold. + + Args: + session_id (str): Unique identifier for the session to analyze. + [C: src/session_logger.py:close_session] """ if session_id not in self.data: return @@ -239,9 +223,9 @@ class LogRegistry: session_path = session_data.get('path') if not session_path or not os.path.isdir(str(session_path)): return - total_size_bytes = 0 - message_count = 0 - found_keywords = [] + total_size_bytes = 0 + message_count = 0 + found_keywords = [] keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION'] try: for entry in os.scandir(str(session_path)): @@ -261,41 +245,39 @@ class LogRegistry: pass except Exception: pass - size_kb = total_size_bytes / 1024 + size_kb = total_size_bytes / 1024 whitelisted = False - reason = "" + reason = "" if found_keywords: whitelisted = True - reason = f"Found keywords: {', '.join(found_keywords)}" + reason = f"Found keywords: {', '.join(found_keywords)}" elif message_count > 10: whitelisted = True - reason = f"High message count: {message_count}" + reason = f"High message count: {message_count}" elif size_kb > 50: whitelisted = True - reason = f"Large session size: {size_kb:.1f} KB" + reason = f"Large session size: {size_kb:.1f} KB" self.update_session_metadata( session_id, - message_count=message_count, - errors=len(found_keywords), - size_kb=int(size_kb), - whitelisted=whitelisted, - reason=reason + message_count = message_count, + errors = len(found_keywords), + size_kb = int(size_kb), + whitelisted = whitelisted, + reason = reason ) def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]: """ - - - Retrieves a list of sessions that are older than a specific cutoff time - and are not marked as whitelisted. - Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0). - - Args: - cutoff_datetime (datetime): The threshold time for identifying old sessions. - - Returns: - list: A list of dictionaries containing session details (id, path, start_time). - [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions] + Retrieves a list of sessions that are older than a specific cutoff time + and are not marked as whitelisted. + Also includes non-whitelisted sessions that are empty (message_count=0 or size_kb=0). + + Args: + cutoff_datetime (datetime): The threshold time for identifying old sessions. + + Returns: + list: A list of dictionaries containing session details (id, path, start_time). + [C: tests/test_log_pruner.py:test_prune_old_insignificant_logs, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_empty_sessions, tests/test_log_pruning_heuristic.py:TestLogPruningHeuristic.test_get_old_non_whitelisted_sessions_includes_sessions_without_metadata, tests/test_log_registry.py:TestLogRegistry.test_get_old_non_whitelisted_sessions] """ old_sessions = [] for session_id, session_data in self.data.items(): @@ -316,14 +298,14 @@ class LogRegistry: is_empty = True else: message_count = metadata.get('message_count', -1) - size_kb = metadata.get('size_kb', -1) - is_empty = (message_count == 0 or size_kb == 0) + size_kb = metadata.get('size_kb', -1) + is_empty = (message_count == 0 or size_kb == 0) if not is_whitelisted: if is_empty or (start_time is not None and start_time < cutoff_datetime): old_sessions.append({ 'session_id': session_id, - 'path': session_data.get('path'), + 'path': session_data.get('path'), 'start_time': start_time_raw }) return old_sessions diff --git a/src/markdown_helper.py b/src/markdown_helper.py index cbf62d6b..a33c74cd 100644 --- a/src/markdown_helper.py +++ b/src/markdown_helper.py @@ -10,6 +10,10 @@ from imgui_bundle import imgui_md, imgui, immapp, imgui_color_text_edit as ed from pathlib import Path from typing import Optional, Dict, Callable +from src import theme_2 + +from src.markdown_table import parse_tables, render_table + def _get_language_id(name: str): """Get a language identifier for ImGuiColorTextEdit. @@ -41,28 +45,24 @@ def _set_editor_language(editor, lang_obj) -> None: 1.92.801+: editor.set_language(obj). 1.92.5: editor.set_language_definition(obj). No-op when lang_obj is None (used to skip the call for unknown languages). """ - if lang_obj is None: - return - if hasattr(editor, "set_language"): - editor.set_language(lang_obj) - elif hasattr(editor, "set_language_definition"): - editor.set_language_definition(lang_obj) + if lang_obj is None: return + + if hasattr(editor, "set_language"): editor.set_language(lang_obj) + elif hasattr(editor, "set_language_definition"): editor.set_language_definition(lang_obj) class MarkdownRenderer: """ - - - Hybrid Markdown renderer that uses imgui_md for text/headers - and ImGuiColorTextEdit for syntax-highlighted code blocks. + Hybrid Markdown renderer that uses imgui_md for text/headers + and ImGuiColorTextEdit for syntax-highlighted code blocks. """ def __init__(self): """ - [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] + [C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__] """ self.options = imgui_md.MarkdownOptions() # Base path for fonts (Inter family) self.options.font_options.font_base_path = "fonts/Inter" - self.options.font_options.regular_size = 18.0 + self.options.font_options.regular_size = 18.0 # Configure callbacks self.options.callbacks.on_open_link = self._on_open_link @@ -80,22 +80,21 @@ class MarkdownRenderer: # Apply the current theme's syntax palette on construction so new # editors we create pick up the right colors. The renderer is re-created # when the theme changes (see theme_2 module-load behavior). - from src import theme_2 palette_id = theme_2.get_syntax_palette_for_theme(theme_2.get_current_palette()) theme_2.apply_syntax_palette(palette_id) # Language mapping for ImGuiColorTextEdit self._lang_map = { "python": _get_language_id("python"), - "py": _get_language_id("python"), - "json": _get_language_id("json"), - "cpp": _get_language_id("cpp"), - "c++": _get_language_id("cpp"), - "c": _get_language_id("c"), - "lua": _get_language_id("lua"), - "sql": _get_language_id("sql"), - "cs": _get_language_id("cs"), - "c#": _get_language_id("cs"), + "py": _get_language_id("python"), + "json": _get_language_id("json"), + "cpp": _get_language_id("cpp"), + "c++": _get_language_id("cpp"), + "c": _get_language_id("c"), + "lua": _get_language_id("lua"), + "sql": _get_language_id("sql"), + "cs": _get_language_id("cs"), + "c#": _get_language_id("cs"), } def _on_open_link(self, url: str) -> None: @@ -119,28 +118,24 @@ class MarkdownRenderer: def render(self, text: str, context_id: str = "default") -> None: """ - - Render Markdown text with code block interception and GFM table substitution. - [C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render] + Render Markdown text with code block interception and GFM table substitution. + [C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render] """ - if not text: - return - from src.markdown_table import parse_tables, render_table + if not text: return text = self._normalize_bullet_delimiters(text) text = self._normalize_nested_list_endings(text) text = self._normalize_list_continuations(text) blocks = parse_tables(text) lines = text.splitlines(keepends=True) - if not lines: - return + if not lines: return - table_at_line: dict[int, int] = {b.span[0]: i for i, b in enumerate(blocks)} - table_end: dict[int, int] = {b.span[0]: b.span[1] for i, b in enumerate(blocks)} + table_at_line: dict[int, int] = {b.span[0]: i for i, b in enumerate(blocks)} + table_end: dict[int, int] = {b.span[0]: b.span[1] for i, b in enumerate(blocks)} - md_buf: list[str] = [] + md_buf: list[str] = [] code_buf: list[str] = [] - block_idx = 0 - in_fence = False + block_idx = 0 + in_fence = False fence_marker = "" def flush_md() -> None: @@ -161,11 +156,11 @@ class MarkdownRenderer: i = 0 while i < len(lines): - line = lines[i] + line = lines[i] stripped = line.lstrip().rstrip("\r\n") if stripped.startswith("```"): if not in_fence: - in_fence = True + in_fence = True fence_marker = stripped[:3] flush_md() code_buf.append(line) @@ -213,7 +208,6 @@ class MarkdownRenderer: (we cannot subclass the C++ imgui_md class). [C: src/markdown_helper.py:MarkdownRenderer.render] """ - import re return re.sub(r"(?m)^([ \t]*)\*[ \t]+", r"\1- ", text) def _normalize_nested_list_endings(self, text: str) -> str: @@ -226,7 +220,6 @@ class MarkdownRenderer: paragraph break. Cannot fix the upstream C++ from Python. [C: src/markdown_helper.py:MarkdownRenderer.render] """ - import re lines = text.split("\n") out: list[str] = [] for i, line in enumerate(lines): @@ -261,17 +254,16 @@ class MarkdownRenderer: a single list item. Acceptable for our use case. [C: src.markdown_helper:MarkdownRenderer.render] """ - import re lines = text.split("\n") out: list[str] = [] prev_was_list = False - prev_indent = 0 + prev_indent = 0 for i, line in enumerate(lines): if line.strip(): out.append(line) if re.match(r"^\s*[-*+\d]\s+", line): prev_was_list = True - prev_indent = len(line) - len(line.lstrip()) + prev_indent = len(line) - len(line.lstrip()) else: prev_was_list = False continue @@ -285,11 +277,10 @@ class MarkdownRenderer: out.append(line) prev_was_list = False continue - next_line = lines[j] - curr_indent = len(next_line) - len(next_line.lstrip()) + next_line = lines[j] + curr_indent = len(next_line) - len(next_line.lstrip()) is_next_list = bool(re.match(r"^\s*[-*+\d]", next_line)) - if curr_indent > prev_indent and not is_next_list: - continue + if curr_indent > prev_indent and not is_next_list: continue out.append(line) prev_was_list = False return "\n".join(out) @@ -300,7 +291,7 @@ class MarkdownRenderer: def _render_code_block(self, block: str, context_id: str, block_idx: int) -> None: """Render a code block using TextEditor for syntax highlighting.""" - lines = block.strip('`').split('\n') + lines = block.strip('`').split('\n') lang_tag = lines[0].strip().lower() if lines else "" # Heuristic to separate lang tag from code @@ -333,10 +324,10 @@ class MarkdownRenderer: if p_id is not None: editor.set_palette(p_id) - self._editor_cache[cache_key] = editor + self._editor_cache[cache_key] = editor self._editor_lang_cache[cache_key] = None - editor = self._editor_cache[cache_key] + editor = self._editor_cache[cache_key] current_lang = self._editor_lang_cache[cache_key] # Sync text and language. None means "no language set" (skip the call). @@ -354,10 +345,10 @@ class MarkdownRenderer: self._editor_lang_cache[cache_key] = lang_tag # Dynamic height calculation - line_count = code.count('\n') + 1 + line_count = code.count('\n') + 1 line_height = imgui.get_text_line_height() - height = (line_count * line_height) + 20 - height = min(max(height, 40), 500) + height = (line_count * line_height) + 20 + height = min(max(height, 40), 500) editor.render(f"##code_{context_id}_{block_idx}", a_size=imgui.ImVec2(0, height)) @@ -386,13 +377,12 @@ _renderer: Optional[MarkdownRenderer] = None def get_renderer() -> MarkdownRenderer: global _renderer - if _renderer is None: - _renderer = MarkdownRenderer() + if _renderer is None: _renderer = MarkdownRenderer() return _renderer def render(text: str, context_id: str = "default") -> None: """ - [C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render] + [C: src/theme_2.py:render_post_fx, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_active, tests/test_theme_nerv_alert.py:test_alert_pulsing_render_inactive, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_alert_pulsing_render, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_disabled, tests/test_theme_nerv_fx.py:TestThemeNervFx.test_crt_filter_render] """ get_renderer().render(text, context_id) @@ -400,4 +390,4 @@ def render_unindented(text: str) -> None: get_renderer().render_unindented(text) def render_code(code: str, lang: str = "", context_id: str = "default", block_idx: int = 0) -> None: - get_renderer().render_code(code, lang, context_id, block_idx) \ No newline at end of file + get_renderer().render_code(code, lang, context_id, block_idx) diff --git a/src/markdown_table.py b/src/markdown_table.py index ab969f9c..9dfad20d 100644 --- a/src/markdown_table.py +++ b/src/markdown_table.py @@ -30,8 +30,8 @@ class TableBlock: [C: src/markdown_helper.py:MarkdownRenderer.render] """ headers: list[str] - rows: list[list[str]] - span: tuple[int, int] + rows: list[list[str]] + span: tuple[int, int] def _split_row(line: str) -> list[str]: line = line.strip() @@ -45,7 +45,7 @@ def _is_table_at(lines: list[str], i: int) -> bool: return bool(_TABLE_SEPARATOR.match(lines[i + 1])) def parse_tables(text: str) -> list[TableBlock]: - lines = text.splitlines() + lines = text.splitlines() in_fence = False blocks: list[TableBlock] = [] i = 0 @@ -53,14 +53,14 @@ def parse_tables(text: str) -> list[TableBlock]: line = lines[i] if line.strip().startswith("```"): in_fence = not in_fence - i += 1 + i += 1 continue if in_fence: i += 1 continue if _is_table_at(lines, i): headers = _split_row(lines[i]) - j = i + 2 + j = i + 2 rows: list[list[str]] = [] while j < len(lines) and "|" in lines[j] and not _TABLE_SEPARATOR.match(lines[j]): rows.append(_split_row(lines[j])) diff --git a/src/mcp_client.py b/src/mcp_client.py index cf175030..e314a968 100644 --- a/src/mcp_client.py +++ b/src/mcp_client.py @@ -97,8 +97,8 @@ MUTATING_TOOLS: frozenset[str] = frozenset({ # Set by configure() before the AI send loop starts. # allowed_paths : set of resolved absolute Path objects (files or dirs) # base_dirs : set of resolved absolute Path dirs that act as roots -_allowed_paths: set[Path] = set() -_base_dirs: set[Path] = set() +_allowed_paths: set[Path] = set() +_base_dirs: set[Path] = set() _primary_base_dir: Path | None = None # Injected by gui_2.py - returns a dict of performance metrics @@ -106,14 +106,12 @@ perf_monitor_callback: Optional[Callable[[], dict[str, Any]]] = None def configure(file_items: list[dict[str, Any]], extra_base_dirs: list[str] | None = None) -> None: """ - - - Build the allowlist from aggregate file_items. - Called by ai_client before each send so the list reflects the current project. - - file_items : list of dicts from aggregate.build_file_items() - extra_base_dirs : additional directory roots to allow traversal of - [C: tests/conftest.py:reset_ai_client, tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration] + Build the allowlist from aggregate file_items. + Called by ai_client before each send so the list reflects the current project. + + file_items : list of dicts from aggregate.build_file_items() + extra_base_dirs : additional directory roots to allow traversal of + [C: tests/conftest.py:reset_ai_client, tests/test_arch_boundary_phase1.py:TestArchBoundaryPhase1.test_mcp_client_whitelist_enforcement, tests/test_mcp_client_beads.py:test_bd_mcp_tools, tests/test_py_struct_tools.py:test_mcp_dispatch_errors, tests/test_py_struct_tools.py:test_mcp_dispatch_integration] """ global _allowed_paths, _base_dirs, _primary_base_dir _allowed_paths = set() diff --git a/src/models.py b/src/models.py index 38a82d4b..7d84db24 100644 --- a/src/models.py +++ b/src/models.py @@ -41,6 +41,7 @@ from __future__ import annotations import datetime import json import os +import sys import tomllib from dataclasses import dataclass, field @@ -48,7 +49,8 @@ from pathlib import Path from typing import Any, Dict, List, Optional, Union from pydantic import BaseModel -from src.paths import get_config_path +from src.dag_engine import TrackDAG +from src.paths import get_config_path #region: Constants @@ -164,8 +166,6 @@ def load_config() -> dict[str, Any]: return tomllib.load(f) def save_config(config: dict[str, Any]) -> None: - import tomli_w - import sys config = _clean_nones(config) sys.stderr.write(f"[DEBUG] Saving config. Theme: {config.get('theme')}\n") sys.stderr.flush() @@ -345,7 +345,6 @@ class Track: """ [C: tests/test_mma_models.py:test_track_get_executable_tickets, tests/test_mma_models.py:test_track_get_executable_tickets_complex] """ - from src.dag_engine import TrackDAG dag = TrackDAG(self.tickets) return dag.get_ready_tasks() @@ -926,8 +925,8 @@ class MCPServerConfig: """ res = {'auto_start': self.auto_start} if self.command: res['command'] = self.command - if self.args: res['args'] = self.args - if self.url: res['url'] = self.url + if self.args: res['args'] = self.args + if self.url: res['url'] = self.url return res @classmethod @@ -936,11 +935,11 @@ class MCPServerConfig: [C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return cls( - name=name, - command=data.get('command'), - args=data.get('args', []), - url=data.get('url'), - auto_start=data.get('auto_start', False), + name = name, + command = data.get('command'), + args = data.get('args', []), + url = data.get('url'), + auto_start = data.get('auto_start', False), ) @dataclass @@ -958,30 +957,30 @@ class MCPConfiguration: """ [C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ - raw_servers = data.get('mcpServers', {}) + raw_servers = data.get('mcpServers', {}) parsed_servers = {name: MCPServerConfig.from_dict(name, cfg) for name, cfg in raw_servers.items()} return cls(mcpServers=parsed_servers) @dataclass class VectorStoreConfig: - provider: str - url: Optional[str] = None - api_key: Optional[str] = None + provider: str + url: Optional[str] = None + api_key: Optional[str] = None collection_name: str = 'manual_slop' - mcp_server: Optional[str] = None - mcp_tool: Optional[str] = None + mcp_server: Optional[str] = None + mcp_tool: Optional[str] = None def to_dict(self) -> Dict[str, Any]: """ [C: src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return { - "provider": self.provider, - "url": self.url, - "api_key": self.api_key, + "provider": self.provider, + "url": self.url, + "api_key": self.api_key, "collection_name": self.collection_name, - "mcp_server": self.mcp_server, - "mcp_tool": self.mcp_tool, + "mcp_server": self.mcp_server, + "mcp_tool": self.mcp_tool, } @classmethod @@ -990,32 +989,32 @@ class VectorStoreConfig: [C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return cls( - provider=data["provider"], - url=data.get("url"), - api_key=data.get("api_key"), - collection_name=data.get("collection_name", "manual_slop"), - mcp_server=data.get("mcp_server"), - mcp_tool=data.get("mcp_tool"), + provider = data["provider"], + url = data.get("url"), + api_key = data.get("api_key"), + collection_name = data.get("collection_name", "manual_slop"), + mcp_server = data.get("mcp_server"), + mcp_tool = data.get("mcp_tool"), ) @dataclass class RAGConfig: - enabled: bool = False - vector_store: VectorStoreConfig = field(default_factory=lambda: VectorStoreConfig(provider='mock')) + enabled: bool = False + vector_store: VectorStoreConfig = field(default_factory=lambda: VectorStoreConfig(provider='mock')) embedding_provider: str = 'gemini' - chunk_size: int = 1000 - chunk_overlap: int = 200 + chunk_size: int = 1000 + chunk_overlap: int = 200 def to_dict(self) -> Dict[str, Any]: """ [C: src/personas.py:PersonaManager.save_persona, src/presets.py:PresetManager.save_preset, src/project_manager.py:save_project, src/project_manager.py:save_track_state, src/tool_presets.py:ToolPresetManager.save_bias_profile, src/tool_presets.py:ToolPresetManager.save_preset, src/workspace_manager.py:WorkspaceManager.save_profile, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_serialization_with_annotations, tests/test_event_serialization.py:test_user_request_event_serialization, tests/test_external_editor.py:TestExternalEditorConfig.test_to_dict, tests/test_external_editor.py:TestTextEditorConfig.test_to_dict, tests/test_file_item_model.py:test_file_item_to_dict, tests/test_gui_events_v2.py:test_user_request_event_payload, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_serialization, tests/test_persona_id.py:test_ticket_persona_id_serialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_serialization, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_thinking_gui.py:test_thinking_segment_model_compatibility, tests/test_ticket_queue.py:test_ticket_to_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_to_dict, tests/test_track_state_schema.py:test_track_state_to_dict_with_none, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return { - "enabled": self.enabled, - "vector_store": self.vector_store.to_dict(), + "enabled": self.enabled, + "vector_store": self.vector_store.to_dict(), "embedding_provider": self.embedding_provider, - "chunk_size": self.chunk_size, - "chunk_overlap": self.chunk_overlap, + "chunk_size": self.chunk_size, + "chunk_overlap": self.chunk_overlap, } @classmethod @@ -1024,11 +1023,11 @@ class RAGConfig: [C: src/personas.py:PersonaManager.load_all, src/presets.py:PresetManager.load_all, src/project_manager.py:load_project, src/project_manager.py:load_track_state, src/tool_presets.py:ToolPresetManager.load_all_bias_profiles, src/tool_presets.py:ToolPresetManager.load_all_presets, src/workspace_manager.py:WorkspaceManager.load_all_profiles, tests/test_bias_models.py:test_bias_profile_model, tests/test_bias_models.py:test_tool_model, tests/test_bias_models.py:test_tool_preset_extension, tests/test_context_presets_models.py:test_context_preset_from_dict_legacy, tests/test_context_presets_models.py:test_context_preset_serialization, tests/test_context_presets_models.py:test_file_view_preset_serialization, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_deserialization_with_annotations, tests/test_custom_slices_annotations.py:test_file_item_custom_slices_round_trip_annotations, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_dict_editors, tests/test_external_editor.py:TestExternalEditorConfig.test_from_dict_with_string_editors, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_with_diff_args, tests/test_external_editor.py:TestTextEditorConfig.test_from_dict_without_diff_args, tests/test_file_item_model.py:test_file_item_from_dict, tests/test_file_item_model.py:test_file_item_from_dict_defaults, tests/test_history_manager.py:TestHistoryManager.test_snapshot_roundtrip, tests/test_mcp_config.py:test_mcp_configuration_to_from_dict, tests/test_mcp_config.py:test_mcp_server_config_to_from_dict, tests/test_per_ticket_model.py:test_model_override_default_on_deserialize, tests/test_per_ticket_model.py:test_model_override_deserialization, tests/test_persona_id.py:test_ticket_persona_id_deserialization, tests/test_persona_models.py:test_persona_defaults, tests/test_persona_models.py:test_persona_deserialization, tests/test_project_serialization.py:TestProjectSerialization.test_backward_compatibility_strings, tests/test_slice_editor_behavior.py:test_add_slice_with_annotations, tests/test_ticket_queue.py:test_ticket_from_dict_default_priority, tests/test_ticket_queue.py:test_ticket_from_dict_priority, tests/test_tiered_aggregation.py:test_persona_aggregation_strategy, tests/test_track_state_schema.py:test_track_state_from_dict, tests/test_track_state_schema.py:test_track_state_from_dict_empty_and_missing, tests/test_ui_summary_only_removal.py:test_file_item_serialization_with_flags] """ return cls( - enabled=data.get("enabled", False), - vector_store=VectorStoreConfig.from_dict(data.get("vector_store", {"provider": "mock"})), - embedding_provider=data.get("embedding_provider", "gemini"), - chunk_size=data.get("chunk_size", 1000), - chunk_overlap=data.get("chunk_overlap", 200), + enabled = data.get("enabled", False), + vector_store = VectorStoreConfig.from_dict(data.get("vector_store", {"provider": "mock"})), + embedding_provider = data.get("embedding_provider", "gemini"), + chunk_size = data.get("chunk_size", 1000), + chunk_overlap = data.get("chunk_overlap", 200), ) def load_mcp_config(path: str) -> MCPConfiguration: