# aggregate.py from __future__ import annotations """ Note(Gemini): This module orchestrates the construction of the final Markdown context string. Instead of sending every file to the AI raw (which blows up tokens), this uses a pipeline: 1. Resolve paths (handles globs and absolute paths). 2. Build file items (raw content). 3. If 'summary_only' is true (which is the default behavior now), it pipes the files through summarize.py to generate a compacted view. This is essential for keeping prompt tokens low while giving the AI enough structural info to use the MCP tools to fetch only what it needs. """ import tomllib import re import glob import os from pathlib import Path, PureWindowsPath from typing import Any, cast from src import summarize from src import project_manager from src import beads_client from src.file_cache import ASTParser from src.performance_monitor import get_monitor def find_next_increment(output_dir: Path, namespace: str) -> int: pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$") max_num = 0 for f in output_dir.iterdir(): if f.is_file(): match = pattern.match(f.name) if match: max_num = max(max_num, int(match.group(1))) return max_num + 1 def is_absolute_with_drive(entry: str) -> bool: try: p = PureWindowsPath(entry) return p.drive != "" except Exception: return False def resolve_paths(base_dir: Path, entry: str) -> list[Path]: has_drive = is_absolute_with_drive(entry) is_wildcard = "*" in entry matches = [] if is_wildcard: root = Path(entry) if has_drive else base_dir / entry matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()] else: p = Path(entry) if has_drive else (base_dir / entry).resolve() matches = [p] # Blacklist filter filtered = [] for p in matches: name = p.name.lower() if name == "history.toml" or name.endswith("_history.toml"): continue filtered.append(p) return sorted(filtered) def group_files_by_dir(files: list[Any]) -> dict[str, list[Any]]: """Groups FileItem objects by their relative directory path.""" grouped = {} for f in files: path_str = f.path if hasattr(f, 'path') else str(f) # Normalize path separators path_str = path_str.replace('\\', '/') dir_name = os.path.dirname(path_str) if not dir_name: dir_name = "." if dir_name not in grouped: grouped[dir_name] = [] grouped[dir_name].append(f) return grouped def compute_file_stats(abs_path: str) -> dict[str, int]: """Computes lines and basic AST stats for a file.""" stats = {"lines": 0, "ast_elements": 0} try: with open(abs_path, 'r', encoding='utf-8') as f: content = f.read() stats["lines"] = len(content.splitlines()) if abs_path.endswith('.py'): import ast try: tree = ast.parse(content) stats["ast_elements"] = sum(1 for node in ast.walk(tree) if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef))) except Exception: pass except Exception: pass return stats def build_discussion_section(history: list[Any]) -> str: """ Builds a markdown section for discussion history. Handles both legacy list[str] and new list[dict]. """ sections = [] for i, entry in enumerate(history, start=1): if isinstance(entry, dict): role = entry.get("role", "Unknown") content = entry.get("content", "").strip() text = f"{role}: {content}" else: text = str(entry).strip() sections.append(f"### Discussion Excerpt {i}\n\n{text}") return "\n\n---\n\n".join(sections) def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str: sections = [] for entry in screenshots: if not entry or not isinstance(entry, str): continue paths = resolve_paths(base_dir, entry) if not paths: sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_") continue for path in paths: original = entry if "*" not in entry else str(path) if not path.exists(): sections.append(f"### `{original}`\n\n_ERROR: file not found: {path}_") continue sections.append(f"### `{original}`\n\n![{path.name}]({path.as_posix()})") return "\n\n---\n\n".join(sections) def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[dict[str, Any]]: """ Return a list of dicts describing each file, for use by ai_client when it wants to upload individual files rather than inline everything as markdown. Each dict has: path : Path (resolved absolute path) entry : str (original config entry string) content : str (file text, or error string) error : bool mtime : float (last modification time, for skip-if-unchanged optimization) tier : int | None (optional tier for context management) auto_aggregate : bool force_full : bool view_mode : str (summary, full, skeleton, outline, none) [C: src/app_controller.py:AppController._bg_task, src/orchestrator_pm.py:module, tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_aggregate_flags.py:test_force_full, tests/test_tiered_context.py:test_build_file_items_with_tiers] """ with get_monitor().scope("build_file_items"): items: list[dict[str, Any]] = [] parser = None for entry_raw in files: if isinstance(entry_raw, dict): entry = cast(str, entry_raw.get("path", "")) tier = entry_raw.get("tier") auto_aggregate = entry_raw.get("auto_aggregate", True) force_full = entry_raw.get("force_full", False) view_mode = entry_raw.get("view_mode", "full") if force_full: view_mode = "full" ast_signatures = entry_raw.get("ast_signatures", False) ast_definitions = entry_raw.get("ast_definitions", False) ast_mask = entry_raw.get("ast_mask", {}) custom_slices = entry_raw.get("custom_slices", []) elif hasattr(entry_raw, "path"): entry = entry_raw.path tier = getattr(entry_raw, "tier", None) auto_aggregate = getattr(entry_raw, "auto_aggregate", True) force_full = getattr(entry_raw, "force_full", False) view_mode = getattr(entry_raw, "view_mode", "full") if force_full: view_mode = "full" ast_signatures = getattr(entry_raw, "ast_signatures", False) ast_definitions = getattr(entry_raw, "ast_definitions", False) ast_mask = getattr(entry_raw, "ast_mask", {}) custom_slices = getattr(entry_raw, "custom_slices", []) else: entry = entry_raw tier = None auto_aggregate = True force_full = False view_mode = "full" ast_signatures = False ast_definitions = False ast_mask = {} custom_slices = [] if not entry or not isinstance(entry, str): continue paths = resolve_paths(base_dir, entry) if not paths: items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices}) continue for path in paths: try: content = path.read_text(encoding="utf-8") mtime = path.stat().st_mtime error = False if not error and view_mode != "full": if view_mode == "summary": content = summarize.summarise_file(path, content) elif view_mode == "skeleton": if path.suffix == ".py": if not parser: parser = ASTParser("python") content = parser.get_skeleton(content, path=str(path)) else: content = summarize.summarise_file(path, content) elif view_mode == "outline": if path.suffix == ".py": if not parser: parser = ASTParser("python") content = parser.get_code_outline(content, path=str(path)) else: content = summarize.summarise_file(path, content) elif view_mode == "none": content = "(context excluded)" elif view_mode == "custom": if custom_slices: lines = content.splitlines() slices_text = [] for s in custom_slices: start = s.get("start_line", 1) end = s.get("end_line", len(lines)) tag = s.get("tag", "unnamed") comment = s.get("comment", "") s_idx = max(0, start - 1) e_idx = min(len(lines), end) chunk = "\n".join(lines[s_idx:e_idx]) slices_text.append(f"---\n[Slice: {tag}] ({comment})\nLines {start}-{end}:\n{chunk}") content = "\n\n".join(slices_text) else: content = summarize.summarise_file(path, content) except FileNotFoundError: content = f"ERROR: file not found: {path}" mtime = 0.0 error = True except Exception as e: content = f"ERROR: {e}" mtime = 0.0 error = True items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices}) return items def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str: """ Build the files markdown section from pre-read file items (avoids double I/O). [C: tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_ui_summary_only_removal.py:test_aggregate_from_items_respects_auto_aggregate] """ sections = [] for item in file_items: if not item.get("auto_aggregate", True): continue path = item.get("path") entry = item.get("entry", "unknown") content = item.get("content", "") view_mode = item.get("view_mode", "full") if path is None: if view_mode == "summary": sections.append(f"### `{entry}`\n\n{content}") else: sections.append(f"### `{entry}`\n\n```text\n{content}\n```") else: path_obj = Path(path) if isinstance(path, str) else path suffix = path_obj.suffix.lstrip(".") if path_obj.suffix else "text" original = entry if "*" not in entry else str(path) if view_mode == "summary": sections.append(f"### `{original}`\n\n{content}") else: sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```") return "\n\n---\n\n".join(sections) def build_beads_section(base_dir: Path) -> str: """ [C: tests/test_aggregate_beads.py:test_build_beads_compaction] """ client = beads_client.BeadsClient(base_dir) if not client.is_initialized(): return "" beads = client.list_beads() if not beads: return "" active = [b for b in beads if b.status == "active"] completed = [b for b in beads if b.status == "completed"] parts = [] parts.append("## Beads Mode: Progress Track") if completed: parts.append("### Completed Beads") comp_list = ", ".join([f"`{b.title}`" for b in completed]) parts.append(comp_list) if active: parts.append("### Active Beads") for b in active: parts.append(f"- **{b.title}** ({b.id}): {b.description}") return "\n\n".join(parts) def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, aggregation_strategy: str = "auto", execution_mode: str = "standard", base_dir: Path | None = None) -> str: """Build markdown from pre-read file items instead of re-reading from disk.""" parts = [] # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits if file_items: if aggregation_strategy == "summarize": parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) elif aggregation_strategy == "full": parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) else: # auto if summary_only: parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) else: parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) if execution_mode == "beads" and base_dir: beads_md = build_beads_section(base_dir) if beads_md: parts.append(beads_md) # DYNAMIC SUFFIX: History changes every turn, must go last if history: parts.append("## Discussion History\n\n" + build_discussion_section(history)) return "\n\n---\n\n".join(parts) def build_markdown_no_history(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False, aggregation_strategy: str = "auto") -> str: """ Build markdown with only files + screenshots (no history). Used for stable caching. [C: src/app_controller.py:AppController._do_generate, tests/test_history_management.py:test_aggregate_blacklist] """ return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history=[], summary_only=summary_only, aggregation_strategy=aggregation_strategy) def build_discussion_text(history: list[str]) -> str: """ Build just the discussion history section text. Returns empty string if no history. [C: src/app_controller.py:AppController._do_generate, tests/test_history_management.py:test_aggregate_includes_segregated_history] """ if not history: return "" return "## Discussion History\n\n" + build_discussion_section(history) def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str: """ Tier 3 Context: Execution/Worker. Full content for focus_files and files with tier=3, summaries/skeletons for others. [C: tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_aggregate_flags.py:test_force_full, tests/test_perf_aggregate.py:test_build_tier3_context_scaling, tests/test_tiered_context.py:test_build_tier3_context_ast_skeleton, tests/test_tiered_context.py:test_build_tier3_context_exists, tests/test_tiered_context.py:test_tiered_context_by_tier_field] """ with get_monitor().scope("build_tier3_context"): focus_set = set(focus_files) parser = ASTParser("python") sections = [] for item in file_items: if not item.get("auto_aggregate", True): continue path = item.get("path") entry = item.get("entry", "") path_str = str(path) if path else "" name = path.name if path else "" tier = item.get("tier") force_full = item.get("force_full") ast_signatures = item.get("ast_signatures", False) ast_definitions = item.get("ast_definitions", False) ast_mask = item.get("ast_mask", {}) content = item.get("content", "") is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set) if not is_focus and path_str: for focus in focus_set: if focus in path_str: is_focus = True break original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown")) slices = item.get('custom_slices', []) if slices and not item.get('error'): from src.fuzzy_anchor import FuzzyAnchor resolved_blocks = [] content = item.get('content', '') suffix = path.suffix.lstrip(".") if path and path.suffix else "text" for slc in slices: range_res = FuzzyAnchor.resolve_slice(content, slc) if range_res: s, e = range_res lines = content.splitlines() resolved_blocks.append("\n".join(lines[s-1:e])) if resolved_blocks: combined = "\n\n... [LINES SKIPPED] ...\n\n".join(resolved_blocks) sections.append(f"### `{original}` (Slices)\n\n```{suffix}\n{combined}\n```") continue # Skip full file logic if is_focus or tier == 3 or force_full: suffix = path.suffix.lstrip(".") if path and path.suffix else "text" sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```") elif path: if ast_mask and not item.get("error"): mask_sections = [] from src import mcp_client for symbol, mode in ast_mask.items(): if mode == "hide": continue res = "" if path.suffix == ".py": res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol) elif path.suffix in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]: is_cpp = any(ext in path.suffix for ext in [".cpp", ".hpp", ".cxx", ".cc"]) if mode == "def": res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol) else: res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol) if res: mask_sections.append(res) if mask_sections: suffix = path.suffix.lstrip(".") if path.suffix else "text" sections.append(f"### `{original}` (Masked)\n\n```{suffix}\n" + "\n\n".join(mask_sections) + "\n```") continue if path.suffix in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc'] and not item.get("error"): from src import mcp_client if ast_definitions: skeleton = mcp_client.ts_cpp_get_skeleton(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_skeleton(str(path)) sections.append(f"### `{original}` (AST Definitions)\n\n```{path.suffix.lstrip('.')}\n{skeleton}\n```") elif ast_signatures: outline = mcp_client.ts_cpp_get_code_outline(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_code_outline(str(path)) sections.append(f"### `{original}` (AST Signatures)\n\n```{path.suffix.lstrip('.')}\n{outline}\n```") else: sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}") elif path.suffix == ".py" and not item.get("error"): try: skeleton = parser.get_skeleton(content) sections.append(f"### `{original}` (AST Skeleton)\n\n```python\n{skeleton}\n```") except Exception: sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}") else: sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}") parts = [] if sections: parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections)) if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) if history: parts.append("## Discussion History\n\n" + build_discussion_section(history)) return "\n\n---\n\n".join(parts) def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str: file_items = build_file_items(base_dir, files) return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=summary_only, aggregation_strategy='auto', execution_mode=execution_mode, base_dir=base_dir) def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str, Path, list[dict[str, Any]]]: """ [C: simulation/sim_base.py:run_sim, src/ai_client.py:_send_anthropic, src/ai_client.py:_send_deepseek, src/ai_client.py:_send_gemini, src/ai_client.py:_send_gemini_cli, src/ai_client.py:_send_minimax, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._do_generate, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._start_track_logic, src/external_editor.py:_find_vscode_in_registry, src/gui_2.py:App._render_snapshot_tab, src/gui_2.py:App.run, src/gui_2.py:main, src/mcp_client.py:get_git_diff, src/project_manager.py:get_git_commit, src/project_manager.py:get_git_log, src/rag_engine.py:RAGEngine._search_mcp, src/shell_runner.py:run_powershell, tests/conftest.py:kill_process_tree, tests/conftest.py:live_gui, tests/test_conductor_abort_event.py:test_conductor_abort_event_populated, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_extended_sims.py:test_ai_settings_sim_live, tests/test_extended_sims.py:test_context_sim_live, tests/test_extended_sims.py:test_execution_sim_live, tests/test_extended_sims.py:test_tools_sim_live, tests/test_external_editor_gui.py:get_vscode_processes, tests/test_external_editor_gui.py:test_vscode_launches_with_diff_view, tests/test_gui_custom_window.py:test_app_window_is_borderless, tests/test_headless_simulation.py:module, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_mock_gemini_cli.py:run_mock, tests/test_orchestration_logic.py:test_conductor_engine_run, tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_sim_ai_settings.py:test_ai_settings_simulation_run, tests/test_sim_context.py:test_context_simulation_run, tests/test_sim_execution.py:test_execution_simulation_run, tests/test_sim_tools.py:test_tools_simulation_run] """ namespace = config.get("project", {}).get("name") if not namespace: namespace = config.get("output", {}).get("namespace", "project") output_dir = Path(config["output"]["output_dir"]) base_dir = Path(config["files"]["base_dir"]) files = config["files"].get("paths", []) screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", ".")) screenshots = config.get("screenshots", {}).get("paths", []) history = config.get("discussion", {}).get("history", []) output_dir.mkdir(parents=True, exist_ok=True) increment = find_next_increment(output_dir, namespace) output_file = output_dir / f"{namespace}_{increment:03d}.md" # Build file items once, then construct markdown from them (avoids double I/O) file_items = build_file_items(base_dir, files) summary_only = config.get("project", {}).get("summary_only", False) execution_mode = config.get("project", {}).get("execution_mode", "standard") markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=summary_only, aggregation_strategy=aggregation_strategy, execution_mode=execution_mode, base_dir=base_dir) output_file.write_text(markdown, encoding="utf-8") return markdown, output_file, file_items def main() -> None: # Load global config to find active project """ [C: simulation/live_walkthrough.py:module, simulation/ping_pong.py:module, src/api_hooks.py:WebSocketServer._run_loop, src/gui_2.py:module, tests/mock_concurrent_mma.py:module, tests/mock_gemini_cli.py:module, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_allow_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_deny_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_unreachable_hook_server, tests/test_cli_tool_bridge.py:module, tests/test_cli_tool_bridge_mapping.py:TestCliToolBridgeMapping.test_mapping_from_api_format, tests/test_cli_tool_bridge_mapping.py:module, tests/test_discussion_takes.py:module, tests/test_external_editor_gui.py:module, tests/test_headless_service.py:TestHeadlessStartup.test_headless_flag_triggers_run, tests/test_headless_service.py:TestHeadlessStartup.test_normal_startup_calls_app_run, tests/test_mma_skeleton.py:module, tests/test_orchestrator_pm.py:module, tests/test_orchestrator_pm_history.py:module, tests/test_post_process.py:module, tests/test_presets.py:module, tests/test_project_serialization.py:module, tests/test_run_worker_lifecycle_abort.py:module, tests/test_symbol_lookup.py:module, tests/test_system_prompt_exposure.py:module, tests/test_theme_nerv_fx.py:module] """ from src.paths import get_config_path config_path = get_config_path() if not config_path.exists(): print(f"{config_path} not found.") return with open(config_path, "rb") as f: global_cfg = tomllib.load(f) active_path = global_cfg.get("projects", {}).get("active") if not active_path: print(f"No active project found in {config_path}.") return # Use project_manager to load project (handles history segregation) proj = project_manager.load_project(active_path) # Use flat_config to make it compatible with aggregate.run() config = project_manager.flat_config(proj) markdown, output_file, _ = run(config) print(f"Written: {output_file}") if __name__ == "__main__": main()