497 lines
24 KiB
Python
497 lines
24 KiB
Python
# aggregate.py
|
|
from __future__ import annotations
|
|
"""
|
|
Note(Gemini):
|
|
This module orchestrates the construction of the final Markdown context string.
|
|
Instead of sending every file to the AI raw (which blows up tokens), this uses a pipeline:
|
|
1. Resolve paths (handles globs and absolute paths).
|
|
2. Build file items (raw content).
|
|
3. If 'summary_only' is true (which is the default behavior now), it pipes the files through
|
|
summarize.py to generate a compacted view.
|
|
|
|
This is essential for keeping prompt tokens low while giving the AI enough structural info
|
|
to use the MCP tools to fetch only what it needs.
|
|
"""
|
|
import tomllib
|
|
import re
|
|
import glob
|
|
import os
|
|
from pathlib import Path, PureWindowsPath
|
|
from typing import Any, cast
|
|
from src import summarize
|
|
from src import project_manager
|
|
from src import beads_client
|
|
from src.file_cache import ASTParser
|
|
from src.performance_monitor import get_monitor
|
|
|
|
def find_next_increment(output_dir: Path, namespace: str) -> int:
|
|
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
|
|
max_num = 0
|
|
for f in output_dir.iterdir():
|
|
if f.is_file():
|
|
match = pattern.match(f.name)
|
|
if match:
|
|
max_num = max(max_num, int(match.group(1)))
|
|
return max_num + 1
|
|
|
|
def is_absolute_with_drive(entry: str) -> bool:
|
|
try:
|
|
p = PureWindowsPath(entry)
|
|
return p.drive != ""
|
|
except Exception:
|
|
return False
|
|
|
|
def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
|
|
has_drive = is_absolute_with_drive(entry)
|
|
is_wildcard = "*" in entry
|
|
matches = []
|
|
if is_wildcard:
|
|
root = Path(entry) if has_drive else base_dir / entry
|
|
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
|
|
else:
|
|
p = Path(entry) if has_drive else (base_dir / entry).resolve()
|
|
matches = [p]
|
|
# Blacklist filter
|
|
filtered = []
|
|
for p in matches:
|
|
name = p.name.lower()
|
|
if name == "history.toml" or name.endswith("_history.toml"):
|
|
continue
|
|
filtered.append(p)
|
|
return sorted(filtered)
|
|
|
|
def group_files_by_dir(files: list[Any]) -> dict[str, list[Any]]:
|
|
"""Groups FileItem objects by their relative directory path."""
|
|
grouped = {}
|
|
for f in files:
|
|
path_str = f.path if hasattr(f, 'path') else str(f)
|
|
# Normalize path separators
|
|
path_str = path_str.replace('\\', '/')
|
|
dir_name = os.path.dirname(path_str)
|
|
if not dir_name:
|
|
dir_name = "."
|
|
if dir_name not in grouped:
|
|
grouped[dir_name] = []
|
|
grouped[dir_name].append(f)
|
|
return grouped
|
|
|
|
def compute_file_stats(abs_path: str) -> dict[str, int]:
|
|
"""Computes lines and basic AST stats for a file."""
|
|
stats = {"lines": 0, "ast_elements": 0}
|
|
try:
|
|
with open(abs_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
stats["lines"] = len(content.splitlines())
|
|
if abs_path.endswith('.py'):
|
|
import ast
|
|
try:
|
|
tree = ast.parse(content)
|
|
stats["ast_elements"] = sum(1 for node in ast.walk(tree) if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return stats
|
|
|
|
def build_discussion_section(history: list[Any]) -> str:
|
|
"""
|
|
|
|
|
|
Builds a markdown section for discussion history.
|
|
Handles both legacy list[str] and new list[dict].
|
|
"""
|
|
sections = []
|
|
for i, entry in enumerate(history, start=1):
|
|
if isinstance(entry, dict):
|
|
role = entry.get("role", "Unknown")
|
|
content = entry.get("content", "").strip()
|
|
text = f"{role}: {content}"
|
|
else:
|
|
text = str(entry).strip()
|
|
sections.append(f"### Discussion Excerpt {i}\n\n{text}")
|
|
return "\n\n---\n\n".join(sections)
|
|
|
|
|
|
def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
|
|
sections = []
|
|
for entry in screenshots:
|
|
if not entry or not isinstance(entry, str):
|
|
continue
|
|
paths = resolve_paths(base_dir, entry)
|
|
if not paths:
|
|
sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_")
|
|
continue
|
|
for path in paths:
|
|
original = entry if "*" not in entry else str(path)
|
|
if not path.exists():
|
|
sections.append(f"### `{original}`\n\n_ERROR: file not found: {path}_")
|
|
continue
|
|
sections.append(f"### `{original}`\n\n})")
|
|
return "\n\n---\n\n".join(sections)
|
|
|
|
def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[dict[str, Any]]:
|
|
"""
|
|
|
|
|
|
Return a list of dicts describing each file, for use by ai_client when it
|
|
wants to upload individual files rather than inline everything as markdown.
|
|
|
|
Each dict has:
|
|
path : Path (resolved absolute path)
|
|
entry : str (original config entry string)
|
|
content : str (file text, or error string)
|
|
error : bool
|
|
mtime : float (last modification time, for skip-if-unchanged optimization)
|
|
tier : int | None (optional tier for context management)
|
|
auto_aggregate : bool
|
|
force_full : bool
|
|
view_mode : str (summary, full, skeleton, outline, none)
|
|
[C: src/app_controller.py:AppController._bg_task, src/orchestrator_pm.py:module, tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_aggregate_flags.py:test_force_full, tests/test_tiered_context.py:test_build_file_items_with_tiers]
|
|
"""
|
|
with get_monitor().scope("build_file_items"):
|
|
items: list[dict[str, Any]] = []
|
|
parser = None
|
|
for entry_raw in files:
|
|
if isinstance(entry_raw, dict):
|
|
entry = cast(str, entry_raw.get("path", ""))
|
|
tier = entry_raw.get("tier")
|
|
auto_aggregate = entry_raw.get("auto_aggregate", True)
|
|
force_full = entry_raw.get("force_full", False)
|
|
view_mode = entry_raw.get("view_mode", "full")
|
|
if force_full:
|
|
view_mode = "full"
|
|
ast_signatures = entry_raw.get("ast_signatures", False)
|
|
ast_definitions = entry_raw.get("ast_definitions", False)
|
|
ast_mask = entry_raw.get("ast_mask", {})
|
|
custom_slices = entry_raw.get("custom_slices", [])
|
|
elif hasattr(entry_raw, "path"):
|
|
entry = entry_raw.path
|
|
tier = getattr(entry_raw, "tier", None)
|
|
auto_aggregate = getattr(entry_raw, "auto_aggregate", True)
|
|
force_full = getattr(entry_raw, "force_full", False)
|
|
view_mode = getattr(entry_raw, "view_mode", "full")
|
|
if force_full:
|
|
view_mode = "full"
|
|
ast_signatures = getattr(entry_raw, "ast_signatures", False)
|
|
ast_definitions = getattr(entry_raw, "ast_definitions", False)
|
|
ast_mask = getattr(entry_raw, "ast_mask", {})
|
|
custom_slices = getattr(entry_raw, "custom_slices", [])
|
|
else:
|
|
entry = entry_raw
|
|
tier = None
|
|
auto_aggregate = True
|
|
force_full = False
|
|
view_mode = "full"
|
|
ast_signatures = False
|
|
ast_definitions = False
|
|
ast_mask = {}
|
|
custom_slices = []
|
|
if not entry or not isinstance(entry, str):
|
|
continue
|
|
paths = resolve_paths(base_dir, entry)
|
|
if not paths:
|
|
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices})
|
|
continue
|
|
for path in paths:
|
|
try:
|
|
content = path.read_text(encoding="utf-8")
|
|
mtime = path.stat().st_mtime
|
|
error = False
|
|
if not error and view_mode != "full":
|
|
if view_mode == "summary":
|
|
content = summarize.summarise_file(path, content)
|
|
elif view_mode == "skeleton":
|
|
if path.suffix == ".py":
|
|
if not parser: parser = ASTParser("python")
|
|
content = parser.get_skeleton(content, path=str(path))
|
|
else:
|
|
content = summarize.summarise_file(path, content)
|
|
elif view_mode == "outline":
|
|
if path.suffix == ".py":
|
|
if not parser: parser = ASTParser("python")
|
|
content = parser.get_code_outline(content, path=str(path))
|
|
else:
|
|
content = summarize.summarise_file(path, content)
|
|
elif view_mode == "none":
|
|
content = "(context excluded)"
|
|
elif view_mode == "custom":
|
|
if custom_slices:
|
|
lines = content.splitlines()
|
|
slices_text = []
|
|
for s in custom_slices:
|
|
start = s.get("start_line", 1)
|
|
end = s.get("end_line", len(lines))
|
|
tag = s.get("tag", "unnamed")
|
|
comment = s.get("comment", "")
|
|
s_idx = max(0, start - 1)
|
|
e_idx = min(len(lines), end)
|
|
chunk = "\n".join(lines[s_idx:e_idx])
|
|
slices_text.append(f"---\n[Slice: {tag}] ({comment})\nLines {start}-{end}:\n{chunk}")
|
|
content = "\n\n".join(slices_text)
|
|
else:
|
|
content = summarize.summarise_file(path, content)
|
|
except FileNotFoundError:
|
|
content = f"ERROR: file not found: {path}"
|
|
mtime = 0.0
|
|
error = True
|
|
except Exception as e:
|
|
content = f"ERROR: {e}"
|
|
mtime = 0.0
|
|
error = True
|
|
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices})
|
|
return items
|
|
|
|
|
|
def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str:
|
|
"""
|
|
|
|
Build the files markdown section from pre-read file items (avoids double I/O).
|
|
[C: tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_ui_summary_only_removal.py:test_aggregate_from_items_respects_auto_aggregate]
|
|
"""
|
|
sections = []
|
|
for item in file_items:
|
|
if not item.get("auto_aggregate", True):
|
|
continue
|
|
path = item.get("path")
|
|
entry = item.get("entry", "unknown")
|
|
content = item.get("content", "")
|
|
view_mode = item.get("view_mode", "full")
|
|
if path is None:
|
|
if view_mode == "summary":
|
|
sections.append(f"### `{entry}`\n\n{content}")
|
|
else:
|
|
sections.append(f"### `{entry}`\n\n```text\n{content}\n```")
|
|
else:
|
|
path_obj = Path(path) if isinstance(path, str) else path
|
|
suffix = path_obj.suffix.lstrip(".") if path_obj.suffix else "text"
|
|
original = entry if "*" not in entry else str(path)
|
|
if view_mode == "summary":
|
|
sections.append(f"### `{original}`\n\n{content}")
|
|
else:
|
|
sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```")
|
|
return "\n\n---\n\n".join(sections)
|
|
|
|
def build_beads_section(base_dir: Path) -> str:
|
|
"""
|
|
[C: tests/test_aggregate_beads.py:test_build_beads_compaction]
|
|
"""
|
|
client = beads_client.BeadsClient(base_dir)
|
|
if not client.is_initialized():
|
|
return ""
|
|
beads = client.list_beads()
|
|
if not beads:
|
|
return ""
|
|
active = [b for b in beads if b.status == "active"]
|
|
completed = [b for b in beads if b.status == "completed"]
|
|
parts = []
|
|
parts.append("## Beads Mode: Progress Track")
|
|
if completed:
|
|
parts.append("### Completed Beads")
|
|
comp_list = ", ".join([f"`{b.title}`" for b in completed])
|
|
parts.append(comp_list)
|
|
if active:
|
|
parts.append("### Active Beads")
|
|
for b in active:
|
|
parts.append(f"- **{b.title}** ({b.id}): {b.description}")
|
|
return "\n\n".join(parts)
|
|
|
|
def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, aggregation_strategy: str = "auto", execution_mode: str = "standard", base_dir: Path | None = None) -> str:
|
|
"""Build markdown from pre-read file items instead of re-reading from disk."""
|
|
parts = []
|
|
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
|
|
if file_items:
|
|
if aggregation_strategy == "summarize":
|
|
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
|
|
elif aggregation_strategy == "full":
|
|
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
|
|
else: # auto
|
|
if summary_only:
|
|
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
|
|
else:
|
|
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
|
|
if screenshots:
|
|
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
|
|
if execution_mode == "beads" and base_dir:
|
|
beads_md = build_beads_section(base_dir)
|
|
if beads_md:
|
|
parts.append(beads_md)
|
|
# DYNAMIC SUFFIX: History changes every turn, must go last
|
|
if history:
|
|
parts.append("## Discussion History\n\n" + build_discussion_section(history))
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
def build_markdown_no_history(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False, aggregation_strategy: str = "auto") -> str:
|
|
"""
|
|
|
|
Build markdown with only files + screenshots (no history). Used for stable caching.
|
|
[C: src/app_controller.py:AppController._do_generate, tests/test_history_management.py:test_aggregate_blacklist]
|
|
"""
|
|
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history=[], summary_only=summary_only, aggregation_strategy=aggregation_strategy)
|
|
|
|
def build_discussion_text(history: list[str]) -> str:
|
|
"""
|
|
|
|
Build just the discussion history section text. Returns empty string if no history.
|
|
[C: src/app_controller.py:AppController._do_generate, tests/test_history_management.py:test_aggregate_includes_segregated_history]
|
|
"""
|
|
if not history:
|
|
return ""
|
|
return "## Discussion History\n\n" + build_discussion_section(history)
|
|
|
|
def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
|
|
"""
|
|
|
|
|
|
Tier 3 Context: Execution/Worker.
|
|
Full content for focus_files and files with tier=3, summaries/skeletons for others.
|
|
[C: tests/test_aggregate_flags.py:test_auto_aggregate_skip, tests/test_aggregate_flags.py:test_force_full, tests/test_perf_aggregate.py:test_build_tier3_context_scaling, tests/test_tiered_context.py:test_build_tier3_context_ast_skeleton, tests/test_tiered_context.py:test_build_tier3_context_exists, tests/test_tiered_context.py:test_tiered_context_by_tier_field]
|
|
"""
|
|
with get_monitor().scope("build_tier3_context"):
|
|
focus_set = set(focus_files)
|
|
parser = ASTParser("python")
|
|
sections = []
|
|
for item in file_items:
|
|
if not item.get("auto_aggregate", True):
|
|
continue
|
|
path = item.get("path")
|
|
entry = item.get("entry", "")
|
|
path_str = str(path) if path else ""
|
|
name = path.name if path else ""
|
|
tier = item.get("tier")
|
|
force_full = item.get("force_full")
|
|
ast_signatures = item.get("ast_signatures", False)
|
|
ast_definitions = item.get("ast_definitions", False)
|
|
ast_mask = item.get("ast_mask", {})
|
|
content = item.get("content", "")
|
|
is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set)
|
|
if not is_focus and path_str:
|
|
for focus in focus_set:
|
|
if focus in path_str:
|
|
is_focus = True
|
|
break
|
|
original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown"))
|
|
|
|
slices = item.get('custom_slices', [])
|
|
if slices and not item.get('error'):
|
|
from src.fuzzy_anchor import FuzzyAnchor
|
|
resolved_blocks = []
|
|
content = item.get('content', '')
|
|
suffix = path.suffix.lstrip(".") if path and path.suffix else "text"
|
|
for slc in slices:
|
|
range_res = FuzzyAnchor.resolve_slice(content, slc)
|
|
if range_res:
|
|
s, e = range_res
|
|
lines = content.splitlines()
|
|
resolved_blocks.append("\n".join(lines[s-1:e]))
|
|
if resolved_blocks:
|
|
combined = "\n\n... [LINES SKIPPED] ...\n\n".join(resolved_blocks)
|
|
sections.append(f"### `{original}` (Slices)\n\n```{suffix}\n{combined}\n```")
|
|
continue # Skip full file logic
|
|
|
|
if is_focus or tier == 3 or force_full:
|
|
suffix = path.suffix.lstrip(".") if path and path.suffix else "text"
|
|
sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```")
|
|
elif path:
|
|
if ast_mask and not item.get("error"):
|
|
mask_sections = []
|
|
from src import mcp_client
|
|
for symbol, mode in ast_mask.items():
|
|
if mode == "hide":
|
|
continue
|
|
res = ""
|
|
if path.suffix == ".py":
|
|
res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol)
|
|
elif path.suffix in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]:
|
|
is_cpp = any(ext in path.suffix for ext in [".cpp", ".hpp", ".cxx", ".cc"])
|
|
if mode == "def":
|
|
res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol)
|
|
else:
|
|
res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol)
|
|
if res:
|
|
mask_sections.append(res)
|
|
if mask_sections:
|
|
suffix = path.suffix.lstrip(".") if path.suffix else "text"
|
|
sections.append(f"### `{original}` (Masked)\n\n```{suffix}\n" + "\n\n".join(mask_sections) + "\n```")
|
|
continue
|
|
if path.suffix in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc'] and not item.get("error"):
|
|
from src import mcp_client
|
|
if ast_definitions:
|
|
skeleton = mcp_client.ts_cpp_get_skeleton(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_skeleton(str(path))
|
|
sections.append(f"### `{original}` (AST Definitions)\n\n```{path.suffix.lstrip('.')}\n{skeleton}\n```")
|
|
elif ast_signatures:
|
|
outline = mcp_client.ts_cpp_get_code_outline(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_code_outline(str(path))
|
|
sections.append(f"### `{original}` (AST Signatures)\n\n```{path.suffix.lstrip('.')}\n{outline}\n```")
|
|
else:
|
|
sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}")
|
|
elif path.suffix == ".py" and not item.get("error"):
|
|
try:
|
|
skeleton = parser.get_skeleton(content)
|
|
sections.append(f"### `{original}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
|
|
except Exception:
|
|
sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}")
|
|
else:
|
|
sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}")
|
|
parts = []
|
|
if sections:
|
|
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
|
|
if screenshots:
|
|
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
|
|
if history:
|
|
parts.append("## Discussion History\n\n" + build_discussion_section(history))
|
|
return "\n\n---\n\n".join(parts)
|
|
|
|
def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str:
|
|
file_items = build_file_items(base_dir, files)
|
|
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=summary_only, aggregation_strategy='auto', execution_mode=execution_mode, base_dir=base_dir)
|
|
|
|
def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str, Path, list[dict[str, Any]]]:
|
|
"""
|
|
[C: simulation/sim_base.py:run_sim, src/ai_client.py:_send_anthropic, src/ai_client.py:_send_deepseek, src/ai_client.py:_send_gemini, src/ai_client.py:_send_gemini_cli, src/ai_client.py:_send_minimax, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._do_generate, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._start_track_logic, src/external_editor.py:_find_vscode_in_registry, src/gui_2.py:App._render_snapshot_tab, src/gui_2.py:App.run, src/gui_2.py:main, src/mcp_client.py:get_git_diff, src/project_manager.py:get_git_commit, src/project_manager.py:get_git_log, src/rag_engine.py:RAGEngine._search_mcp, src/shell_runner.py:run_powershell, tests/conftest.py:kill_process_tree, tests/conftest.py:live_gui, tests/test_conductor_abort_event.py:test_conductor_abort_event_populated, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_extended_sims.py:test_ai_settings_sim_live, tests/test_extended_sims.py:test_context_sim_live, tests/test_extended_sims.py:test_execution_sim_live, tests/test_extended_sims.py:test_tools_sim_live, tests/test_external_editor_gui.py:get_vscode_processes, tests/test_external_editor_gui.py:test_vscode_launches_with_diff_view, tests/test_gui_custom_window.py:test_app_window_is_borderless, tests/test_headless_simulation.py:module, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_mock_gemini_cli.py:run_mock, tests/test_orchestration_logic.py:test_conductor_engine_run, tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_sim_ai_settings.py:test_ai_settings_simulation_run, tests/test_sim_context.py:test_context_simulation_run, tests/test_sim_execution.py:test_execution_simulation_run, tests/test_sim_tools.py:test_tools_simulation_run]
|
|
"""
|
|
namespace = config.get("project", {}).get("name")
|
|
if not namespace:
|
|
namespace = config.get("output", {}).get("namespace", "project")
|
|
output_dir = Path(config["output"]["output_dir"])
|
|
base_dir = Path(config["files"]["base_dir"])
|
|
files = config["files"].get("paths", [])
|
|
screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", "."))
|
|
screenshots = config.get("screenshots", {}).get("paths", [])
|
|
history = config.get("discussion", {}).get("history", [])
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
increment = find_next_increment(output_dir, namespace)
|
|
output_file = output_dir / f"{namespace}_{increment:03d}.md"
|
|
# Build file items once, then construct markdown from them (avoids double I/O)
|
|
file_items = build_file_items(base_dir, files)
|
|
summary_only = config.get("project", {}).get("summary_only", False)
|
|
execution_mode = config.get("project", {}).get("execution_mode", "standard")
|
|
markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history,
|
|
summary_only=summary_only, aggregation_strategy=aggregation_strategy, execution_mode=execution_mode, base_dir=base_dir)
|
|
output_file.write_text(markdown, encoding="utf-8")
|
|
return markdown, output_file, file_items
|
|
|
|
def main() -> None:
|
|
# Load global config to find active project
|
|
"""
|
|
[C: simulation/live_walkthrough.py:module, simulation/ping_pong.py:module, src/api_hooks.py:WebSocketServer._run_loop, src/gui_2.py:module, tests/mock_concurrent_mma.py:module, tests/mock_gemini_cli.py:module, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_allow_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_deny_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_unreachable_hook_server, tests/test_cli_tool_bridge.py:module, tests/test_cli_tool_bridge_mapping.py:TestCliToolBridgeMapping.test_mapping_from_api_format, tests/test_cli_tool_bridge_mapping.py:module, tests/test_discussion_takes.py:module, tests/test_external_editor_gui.py:module, tests/test_headless_service.py:TestHeadlessStartup.test_headless_flag_triggers_run, tests/test_headless_service.py:TestHeadlessStartup.test_normal_startup_calls_app_run, tests/test_mma_skeleton.py:module, tests/test_orchestrator_pm.py:module, tests/test_orchestrator_pm_history.py:module, tests/test_post_process.py:module, tests/test_presets.py:module, tests/test_project_serialization.py:module, tests/test_run_worker_lifecycle_abort.py:module, tests/test_symbol_lookup.py:module, tests/test_system_prompt_exposure.py:module, tests/test_theme_nerv_fx.py:module]
|
|
"""
|
|
from src.paths import get_config_path
|
|
config_path = get_config_path()
|
|
if not config_path.exists():
|
|
|
|
print(f"{config_path} not found.")
|
|
return
|
|
with open(config_path, "rb") as f:
|
|
global_cfg = tomllib.load(f)
|
|
active_path = global_cfg.get("projects", {}).get("active")
|
|
if not active_path:
|
|
print(f"No active project found in {config_path}.")
|
|
return
|
|
# Use project_manager to load project (handles history segregation)
|
|
proj = project_manager.load_project(active_path)
|
|
# Use flat_config to make it compatible with aggregate.run()
|
|
config = project_manager.flat_config(proj)
|
|
markdown, output_file, _ = run(config)
|
|
print(f"Written: {output_file}")
|
|
|
|
if __name__ == "__main__":
|
|
main() |