From aa56981c8752601ede785aa42fdcaf955a2061b3 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Fri, 5 Jun 2026 22:34:26 -0400 Subject: [PATCH] organizing (mostly aggregate.py) --- src/aggregate.py | 278 +++++++++++++++++++----------------------- src/session_logger.py | 24 ++-- src/shaders.py | 6 +- src/shell_runner.py | 14 +-- src/summarize.py | 12 +- 5 files changed, 148 insertions(+), 186 deletions(-) diff --git a/src/aggregate.py b/src/aggregate.py index dee13dbb..e2872310 100644 --- a/src/aggregate.py +++ b/src/aggregate.py @@ -12,19 +12,24 @@ Instead of sending every file to the AI raw (which blows up tokens), this uses a This is essential for keeping prompt tokens low while giving the AI enough structural info to use the MCP tools to fetch only what it needs. """ +import ast import glob import os import re import tomllib +import traceback from pathlib import Path, PureWindowsPath from typing import Any, cast from src import beads_client +from src import mcp_client from src import project_manager from src import summarize -from src.file_cache import ASTParser +from src.fuzzy_anchor import FuzzyAnchor +from src.file_cache import ASTParser +from src.paths import get_config_path from src.performance_monitor import get_monitor @@ -50,17 +55,16 @@ def resolve_paths(base_dir: Path, entry: str) -> list[Path]: is_wildcard = "*" in entry matches = [] if is_wildcard: - root = Path(entry) if has_drive else base_dir / entry + root = Path(entry) if has_drive else base_dir / entry matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()] else: - p = Path(entry) if has_drive else (base_dir / entry).resolve() + p = Path(entry) if has_drive else (base_dir / entry).resolve() matches = [p] # Blacklist filter filtered = [] for p in matches: name = p.name.lower() - if name == "history.toml" or name.endswith("_history.toml"): - continue + if name == "history.toml" or name.endswith("_history.toml"): continue filtered.append(p) return sorted(filtered) @@ -93,7 +97,6 @@ def compute_file_stats(abs_path: str) -> dict[str, int]: content = f.read() stats["lines"] = len(content.splitlines()) if abs_path.endswith('.py'): - import ast try: tree = ast.parse(content) stats["ast_elements"] = sum(1 for node in ast.walk(tree) if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef))) @@ -111,19 +114,19 @@ def build_discussion_section(history: list[Any]) -> str: sections = [] for i, entry in enumerate(history, start=1): if isinstance(entry, dict): - role = entry.get("role", "Unknown") + role = entry.get("role", "Unknown") content = entry.get("content", "").strip() - text = f"{role}: {content}" + text = f"{role}: {content}" else: text = str(entry).strip() + sections.append(f"### Discussion Excerpt {i}\n\n{text}") return "\n\n---\n\n".join(sections) def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str: sections = [] for entry in screenshots: - if not entry or not isinstance(entry, str): - continue + if not entry or not isinstance(entry, str): continue paths = resolve_paths(base_dir, entry) if not paths: sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_") @@ -158,63 +161,61 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[ parser = None for entry_raw in files: if isinstance(entry_raw, dict): - entry = cast(str, entry_raw.get("path", "")) - tier = entry_raw.get("tier") + entry = cast(str, entry_raw.get("path", "")) + tier = entry_raw.get("tier") auto_aggregate = entry_raw.get("auto_aggregate", True) - force_full = entry_raw.get("force_full", False) - view_mode = entry_raw.get("view_mode", "full") - if force_full: - view_mode = "full" - ast_signatures = entry_raw.get("ast_signatures", False) + force_full = entry_raw.get("force_full", False) + view_mode = entry_raw.get("view_mode", "full") + if force_full: view_mode = "full" + ast_signatures = entry_raw.get("ast_signatures", False) ast_definitions = entry_raw.get("ast_definitions", False) - ast_mask = entry_raw.get("ast_mask", {}) - custom_slices = entry_raw.get("custom_slices", []) + ast_mask = entry_raw.get("ast_mask", {}) + custom_slices = entry_raw.get("custom_slices", []) elif hasattr(entry_raw, "path"): - entry = entry_raw.path - tier = getattr(entry_raw, "tier", None) + entry = entry_raw.path + tier = getattr(entry_raw, "tier", None) auto_aggregate = getattr(entry_raw, "auto_aggregate", True) - force_full = getattr(entry_raw, "force_full", False) - view_mode = getattr(entry_raw, "view_mode", "full") - if force_full: - view_mode = "full" - ast_signatures = getattr(entry_raw, "ast_signatures", False) + force_full = getattr(entry_raw, "force_full", False) + view_mode = getattr(entry_raw, "view_mode", "full") + if force_full: view_mode = "full" + ast_signatures = getattr(entry_raw, "ast_signatures", False) ast_definitions = getattr(entry_raw, "ast_definitions", False) - ast_mask = getattr(entry_raw, "ast_mask", {}) - custom_slices = getattr(entry_raw, "custom_slices", []) + ast_mask = getattr(entry_raw, "ast_mask", {}) + custom_slices = getattr(entry_raw, "custom_slices", []) else: - entry = entry_raw - tier = None - auto_aggregate = True - force_full = False - view_mode = "full" - ast_signatures = False + entry = entry_raw + tier = None + auto_aggregate = True + force_full = False + view_mode = "full" + ast_signatures = False ast_definitions = False - ast_mask = {} - custom_slices = [] - if not entry or not isinstance(entry, str): - continue + ast_mask = {} + custom_slices = [] + + if not entry or not isinstance(entry, str): continue paths = resolve_paths(base_dir, entry) + if not paths: items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices}) continue + for path in paths: try: content = path.read_text(encoding="utf-8") - mtime = path.stat().st_mtime - error = False + mtime = path.stat().st_mtime + error = False if not error and view_mode != "full": try: - if view_mode == "summary": - content = summarize.summarise_file(path, content) + if view_mode == "summary": content = summarize.summarise_file(path, content) elif view_mode == "skeleton": suffix_lower = path.suffix.lower() if suffix_lower == ".py": if not parser: parser = ASTParser("python") content = parser.get_skeleton(content, path=str(path)) elif suffix_lower in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc']: - from src import mcp_client - if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_skeleton(str(path)) - else: content = mcp_client.ts_cpp_get_skeleton(str(path)) + if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_skeleton(str(path)) + else: content = mcp_client.ts_cpp_get_skeleton(str(path)) else: content = summarize.summarise_file(path, content) elif view_mode == "outline": @@ -223,7 +224,6 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[ if not parser: parser = ASTParser("python") content = parser.get_code_outline(content, path=str(path)) elif suffix_lower in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc']: - from src import mcp_client if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_code_outline(str(path)) else: content = mcp_client.ts_cpp_get_code_outline(str(path)) else: @@ -232,58 +232,50 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[ suffix_lower = path.suffix.lower() if ast_mask: mask_sections = [] - from src import mcp_client for symbol_raw, mode in ast_mask.items(): if mode == "hide": continue - import re symbol = re.sub(r'\(\d+-\d+\)$', '', symbol_raw) res = "" if suffix_lower == ".py": res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol) elif suffix_lower in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]: is_cpp = any(ext in suffix_lower for ext in [".cpp", ".hpp", ".cxx", ".cc"]) - if mode == "def": - res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol) - else: - res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol) + if mode == "def": res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol) + else: res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol) if res: mask_sections.append(res) - if mask_sections: - content = "\n\n".join(mask_sections) - else: - content = "(no masked sections visible)" + if mask_sections: content = "\n\n".join(mask_sections) + else: content = "(no masked sections visible)" else: content = "(no ast mask defined)" - elif view_mode == "none": - content = "(context excluded)" + elif view_mode == "none": content = "(context excluded)" elif view_mode == "custom": if custom_slices: - lines = content.splitlines() + lines = content.splitlines() slices_text = [] for s in custom_slices: - start = s.get("start_line", 1) - end = s.get("end_line", len(lines)) - tag = s.get("tag", "unnamed") - comment = s.get("comment", "") - s_idx = max(0, start - 1) - e_idx = min(len(lines), end) - chunk = "\n".join(lines[s_idx:e_idx]) + start = s.get("start_line", 1) + end = s.get("end_line", len(lines)) + tag = s.get("tag", "unnamed") + comment = s.get("comment", "") + s_idx = max(0, start - 1) + e_idx = min(len(lines), end) + chunk = "\n".join(lines[s_idx:e_idx]) slices_text.append(f"---\n[Slice: {tag}] ({comment})\nLines {start}-{end}:\n{chunk}") content = "\n\n".join(slices_text) else: content = summarize.summarise_file(path, content) except Exception as e: - import traceback content = f"ERROR in {view_mode} view mode for {path}:\n{traceback.format_exc()}" - error = True + error = True except FileNotFoundError: content = f"ERROR: file not found: {path}" mtime = 0.0 error = True except Exception as e: - import traceback content = f"ERROR reading {path}:\n{traceback.format_exc()}" - mtime = 0.0 - error = True + mtime = 0.0 + error = True + items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices}) return items @@ -294,11 +286,10 @@ def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str: """ sections = [] for item in file_items: - if not item.get("auto_aggregate", True): - continue - path = item.get("path") - entry = item.get("entry", "unknown") - content = item.get("content", "") + if not item.get("auto_aggregate", True): continue + path = item.get("path") + entry = item.get("entry", "unknown") + content = item.get("content", "") view_mode = item.get("view_mode", "full") if path is None: if view_mode == "summary": @@ -320,23 +311,20 @@ def build_beads_section(base_dir: Path) -> str: [C: tests/test_aggregate_beads.py:test_build_beads_compaction] """ client = beads_client.BeadsClient(base_dir) - if not client.is_initialized(): - return "" + if not client.is_initialized(): return "" beads = client.list_beads() - if not beads: - return "" - active = [b for b in beads if b.status == "active"] + if not beads: return "" + active = [b for b in beads if b.status == "active"] completed = [b for b in beads if b.status == "completed"] - parts = [] + parts = [] parts.append("## Beads Mode: Progress Track") - if completed: + if completed: parts.append("### Completed Beads") comp_list = ", ".join([f"`{b.title}`" for b in completed]) parts.append(comp_list) if active: parts.append("### Active Beads") - for b in active: - parts.append(f"- **{b.title}** ({b.id}): {b.description}") + for b in active: parts.append(f"- **{b.title}** ({b.id}): {b.description}") return "\n\n".join(parts) def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, aggregation_strategy: str = "auto", execution_mode: str = "standard", base_dir: Path | None = None) -> str: @@ -344,24 +332,17 @@ def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_ parts = [] # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits if file_items: - if aggregation_strategy == "summarize": - parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) - elif aggregation_strategy == "full": - parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) + if aggregation_strategy == "summarize": parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) + elif aggregation_strategy == "full": parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) else: # auto - if summary_only: - parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) - else: - parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) - if screenshots: - parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) + if summary_only: parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) + else: parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) + if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) if execution_mode == "beads" and base_dir: beads_md = build_beads_section(base_dir) - if beads_md: - parts.append(beads_md) + if beads_md: parts.append(beads_md) # DYNAMIC SUFFIX: History changes every turn, must go last - if history: - parts.append("## Discussion History\n\n" + build_discussion_section(history)) + if history: parts.append("## Discussion History\n\n" + build_discussion_section(history)) return "\n\n---\n\n".join(parts) def build_markdown_no_history(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False, aggregation_strategy: str = "auto") -> str: @@ -388,67 +369,61 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P """ with get_monitor().scope("build_tier3_context"): focus_set = set(focus_files) - parser = ASTParser("python") - sections = [] + parser = ASTParser("python") + sections = [] for item in file_items: - if not item.get("auto_aggregate", True): - continue - path = item.get("path") - entry = item.get("entry", "") - path_str = str(path) if path else "" - name = path.name if path else "" - tier = item.get("tier") - force_full = item.get("force_full") - ast_signatures = item.get("ast_signatures", False) + if not item.get("auto_aggregate", True): continue + path = item.get("path") + entry = item.get("entry", "") + path_str = str(path) if path else "" + name = path.name if path else "" + tier = item.get("tier") + force_full = item.get("force_full") + ast_signatures = item.get("ast_signatures", False) ast_definitions = item.get("ast_definitions", False) - ast_mask = item.get("ast_mask", {}) - content = item.get("content", "") - is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set) + ast_mask = item.get("ast_mask", {}) + content = item.get("content", "") + + is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set) if not is_focus and path_str: for focus in focus_set: if focus in path_str: is_focus = True break - original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown")) - slices = item.get('custom_slices', []) + original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown")) + slices = item.get('custom_slices', []) if slices and not item.get('error'): - from src.fuzzy_anchor import FuzzyAnchor resolved_blocks = [] - content = item.get('content', '') - suffix = path.suffix.lstrip(".") if path and path.suffix else "text" + content = item.get('content', '') + suffix = path.suffix.lstrip(".") if path and path.suffix else "text" for slc in slices: range_res = FuzzyAnchor.resolve_slice(content, slc) if range_res: - s, e = range_res + s, e = range_res lines = content.splitlines() resolved_blocks.append("\n".join(lines[s-1:e])) if resolved_blocks: combined = "\n\n... [LINES SKIPPED] ...\n\n".join(resolved_blocks) sections.append(f"### `{original}` (Slices)\n\n```{suffix}\n{combined}\n```") continue # Skip full file logic - + if is_focus or tier == 3 or force_full: suffix = path.suffix.lstrip(".") if path and path.suffix else "text" sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```") elif path: if ast_mask and not item.get("error"): mask_sections = [] - from src import mcp_client for symbol_raw, mode in ast_mask.items(): - if mode == "hide": - continue - import re + if mode == "hide": continue symbol = re.sub(r'\(\d+-\d+\)$', '', symbol_raw) - res = "" + res = "" if path.suffix == ".py": res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol) elif path.suffix in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]: is_cpp = any(ext in path.suffix for ext in [".cpp", ".hpp", ".cxx", ".cc"]) - if mode == "def": - res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol) - else: - res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol) + if mode == "def": res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol) + else: res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol) if res: mask_sections.append(res) if mask_sections: @@ -456,7 +431,6 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P sections.append(f"### `{original}` (Masked)\n\n```{suffix}\n" + "\n\n".join(mask_sections) + "\n```") continue if path.suffix in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc'] and not item.get("error"): - from src import mcp_client if ast_definitions: skeleton = mcp_client.ts_cpp_get_skeleton(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_skeleton(str(path)) sections.append(f"### `{original}` (AST Definitions)\n\n```{path.suffix.lstrip('.')}\n{skeleton}\n```") @@ -474,12 +448,9 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P else: sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}") parts = [] - if sections: - parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections)) - if screenshots: - parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) - if history: - parts.append("## Discussion History\n\n" + build_discussion_section(history)) + if sections: parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections)) + if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) + if history: parts.append("## Discussion History\n\n" + build_discussion_section(history)) return "\n\n---\n\n".join(parts) def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str: @@ -491,23 +462,31 @@ def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str [C: simulation/sim_base.py:run_sim, src/ai_client.py:_send_anthropic, src/ai_client.py:_send_deepseek, src/ai_client.py:_send_gemini, src/ai_client.py:_send_gemini_cli, src/ai_client.py:_send_minimax, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._do_generate, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._start_track_logic, src/external_editor.py:_find_vscode_in_registry, src/gui_2.py:App._render_snapshot_tab, src/gui_2.py:App.run, src/gui_2.py:main, src/mcp_client.py:get_git_diff, src/project_manager.py:get_git_commit, src/rag_engine.py:RAGEngine._search_mcp, src/shell_runner.py:run_powershell, tests/conftest.py:kill_process_tree, tests/conftest.py:live_gui, tests/test_conductor_abort_event.py:test_conductor_abort_event_populated, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_extended_sims.py:test_ai_settings_sim_live, tests/test_extended_sims.py:test_context_sim_live, tests/test_extended_sims.py:test_execution_sim_live, tests/test_extended_sims.py:test_tools_sim_live, tests/test_external_editor_gui.py:get_vscode_processes, tests/test_external_editor_gui.py:test_vscode_launches_with_diff_view, tests/test_gui_custom_window.py:test_app_window_is_borderless, tests/test_headless_simulation.py:module, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_mock_gemini_cli.py:run_mock, tests/test_orchestration_logic.py:test_conductor_engine_run, tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_sim_ai_settings.py:test_ai_settings_simulation_run, tests/test_sim_context.py:test_context_simulation_run, tests/test_sim_execution.py:test_execution_simulation_run, tests/test_sim_tools.py:test_tools_simulation_run] """ namespace = config.get("project", {}).get("name") - if not namespace: - namespace = config.get("output", {}).get("namespace", "project") - output_dir = Path(config["output"]["output_dir"]) - base_dir = Path(config["files"]["base_dir"]) - files = config["files"].get("paths", []) + if not namespace: namespace = config.get("output", {}).get("namespace", "project") + output_dir = Path(config["output"]["output_dir"]) + base_dir = Path(config["files"]["base_dir"]) + files = config["files"].get("paths", []) screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", ".")) - screenshots = config.get("screenshots", {}).get("paths", []) - history = config.get("discussion", {}).get("history", []) + screenshots = config.get("screenshots", {}).get("paths", []) + history = config.get("discussion", {}).get("history", []) output_dir.mkdir(parents=True, exist_ok=True) - increment = find_next_increment(output_dir, namespace) + increment = find_next_increment(output_dir, namespace) output_file = output_dir / f"{namespace}_{increment:03d}.md" + # Build file items once, then construct markdown from them (avoids double I/O) - file_items = build_file_items(base_dir, files) - summary_only = config.get("project", {}).get("summary_only", False) + file_items = build_file_items(base_dir, files) + summary_only = config.get("project", {}).get("summary_only", False) execution_mode = config.get("project", {}).get("execution_mode", "standard") - markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, - summary_only=summary_only, aggregation_strategy=aggregation_strategy, execution_mode=execution_mode, base_dir=base_dir) + markdown = build_markdown_from_items( + file_items, + screenshot_base_dir, + screenshots, + history, + summary_only = summary_only, + aggregation_strategy = aggregation_strategy, + execution_mode = execution_mode, + base_dir = base_dir) + output_file.write_text(markdown, encoding="utf-8") return markdown, output_file, file_items @@ -516,7 +495,6 @@ def main() -> None: """ [C: simulation/live_walkthrough.py:module, simulation/ping_pong.py:module, src/ai_server.py:module, src/api_hooks.py:WebSocketServer._run_loop, src/gui_2.py:module, tests/mock_concurrent_mma.py:module, tests/mock_gemini_cli.py:module, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_allow_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_deny_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_unreachable_hook_server, tests/test_cli_tool_bridge.py:module, tests/test_cli_tool_bridge_mapping.py:TestCliToolBridgeMapping.test_mapping_from_api_format, tests/test_cli_tool_bridge_mapping.py:module, tests/test_discussion_takes.py:module, tests/test_external_editor_gui.py:module, tests/test_headless_service.py:TestHeadlessStartup.test_headless_flag_triggers_run, tests/test_headless_service.py:TestHeadlessStartup.test_normal_startup_calls_app_run, tests/test_mma_skeleton.py:module, tests/test_orchestrator_pm.py:module, tests/test_orchestrator_pm_history.py:module, tests/test_presets.py:module, tests/test_project_serialization.py:module, tests/test_run_worker_lifecycle_abort.py:module, tests/test_symbol_lookup.py:module, tests/test_system_prompt_exposure.py:module, tests/test_theme_nerv_fx.py:module] """ - from src.paths import get_config_path config_path = get_config_path() if not config_path.exists(): @@ -528,7 +506,7 @@ def main() -> None: if not active_path: print(f"No active project found in {config_path}.") return - # Use project_manager to load project (handles history segregation) + # Use project_manager to load project (handles history segregation) proj = project_manager.load_project(active_path) # Use flat_config to make it compatible with aggregate.run() config = project_manager.flat_config(proj) diff --git a/src/session_logger.py b/src/session_logger.py index 3728a605..9e80b831 100644 --- a/src/session_logger.py +++ b/src/session_logger.py @@ -45,10 +45,8 @@ def _now_ts() -> str: def open_session(label: Optional[str] = None) -> None: """ - - - Called once at GUI startup. Creates the log directories if needed and - opens the log files for this session within a sub-directory. + Called once at GUI startup. Creates the log directories if needed and + opens the log files for this session within a sub-directory. [C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs, tests/test_session_logger_optimization.py:test_session_directory_and_subdirectories_creation, tests/test_session_logger_reset.py:test_reset_session, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry] """ global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq, _output_seq @@ -139,10 +137,8 @@ def log_api_hook(method: str, path: str, payload: str) -> None: def log_comms(entry: dict[str, Any]) -> None: """ - - - Append one comms entry to the comms log file as a JSON-L line. - Thread-safe (GIL + line-buffered file). + Append one comms entry to the comms log file as a JSON-L line. + Thread-safe (GIL + line-buffered file). [C: tests/test_logging_e2e.py:test_logging_e2e] """ if _comms_fh is None: @@ -154,10 +150,8 @@ def log_comms(entry: dict[str, Any]) -> None: def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]: """ - - - Append a tool-call record to the toolcalls log and write the PS1 script to - the session's scripts directory. Returns the path of the written script file. + Append a tool-call record to the toolcalls log and write the PS1 script to + the session's scripts directory. Returns the path of the written script file. [C: tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts] """ global _seq @@ -199,10 +193,8 @@ def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optio def log_tool_output(content: str) -> Optional[str]: """ - - - Save tool output content to a unique file in the session's outputs directory. - Returns the path of the written file. + Save tool output content to a unique file in the session's outputs directory. + Returns the path of the written file. [C: tests/test_session_logger_optimization.py:test_log_tool_output_returns_none_if_no_session, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs] """ global _output_seq diff --git a/src/shaders.py b/src/shaders.py index faeab5eb..dcca6c77 100644 --- a/src/shaders.py +++ b/src/shaders.py @@ -3,10 +3,8 @@ from imgui_bundle import imgui def draw_soft_shadow(draw_list: imgui.ImDrawList, p_min: imgui.ImVec2, p_max: imgui.ImVec2, color: imgui.ImVec4, shadow_size: float = 10.0, rounding: float = 0.0) -> None: """ - - - Simulates a soft shadow effect by drawing multiple concentric rounded rectangles - with decreasing alpha values. This is a faux-shader effect using primitive batching. + Simulates a soft shadow effect by drawing multiple concentric rounded rectangles + with decreasing alpha values. This is a faux-shader effect using primitive batching. """ r, g, b, a = color.x, color.y, color.z, color.w steps = int(shadow_size) diff --git a/src/shell_runner.py b/src/shell_runner.py index 5513993b..fece9d17 100644 --- a/src/shell_runner.py +++ b/src/shell_runner.py @@ -55,14 +55,12 @@ def _build_subprocess_env() -> dict[str, str]: def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str: """ - - - Run a PowerShell script with working directory set to base_dir. - Returns a string combining stdout, stderr, and exit code. - Environment is configured via mcp_env.toml (project root). - If qa_callback is provided and the command fails or has stderr, - the callback is called with the stderr content and its result is appended. - If patch_callback is provided, it receives (error, file_context) and returns patch text. + Run a PowerShell script with working directory set to base_dir. + Returns a string combining stdout, stderr, and exit code. + Environment is configured via mcp_env.toml (project root). + If qa_callback is provided and the command fails or has stderr, + the callback is called with the stderr content and its result is appended. + If patch_callback is provided, it receives (error, file_context) and returns patch text. [C: tests/test_tier4_interceptor.py:test_run_powershell_no_qa_callback_on_success, tests/test_tier4_interceptor.py:test_run_powershell_optional_qa_callback, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only] """ safe_dir: str = str(base_dir).replace("'", "''") diff --git a/src/summarize.py b/src/summarize.py index a2212fe3..99dc40cb 100644 --- a/src/summarize.py +++ b/src/summarize.py @@ -205,10 +205,8 @@ def summarise_file(path: Path, content: str) -> str: def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]: """ - - - Given a list of file_item dicts (as returned by aggregate.build_file_items), - return a parallel list of dicts with an added `summary` key. + Given a list of file_item dicts (as returned by aggregate.build_file_items), + return a parallel list of dicts with an added `summary` key. """ result = [] for item in file_items: @@ -225,10 +223,8 @@ def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]: def build_summary_markdown(file_items: list[dict[str, Any]]) -> str: """ - - - Build a compact markdown string of file summaries, suitable for the - initial block instead of full file contents. + Build a compact markdown string of file summaries, suitable for the + initial block instead of full file contents. """ summarised = summarise_items(file_items) parts = []