Private
Public Access
0
0

organizing (mostly aggregate.py)

This commit is contained in:
2026-06-05 22:34:26 -04:00
parent 8b83c5d0b7
commit aa56981c87
5 changed files with 148 additions and 186 deletions
+128 -150
View File
@@ -12,19 +12,24 @@ Instead of sending every file to the AI raw (which blows up tokens), this uses a
This is essential for keeping prompt tokens low while giving the AI enough structural info
to use the MCP tools to fetch only what it needs.
"""
import ast
import glob
import os
import re
import tomllib
import traceback
from pathlib import Path, PureWindowsPath
from typing import Any, cast
from src import beads_client
from src import mcp_client
from src import project_manager
from src import summarize
from src.file_cache import ASTParser
from src.fuzzy_anchor import FuzzyAnchor
from src.file_cache import ASTParser
from src.paths import get_config_path
from src.performance_monitor import get_monitor
@@ -50,17 +55,16 @@ def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
is_wildcard = "*" in entry
matches = []
if is_wildcard:
root = Path(entry) if has_drive else base_dir / entry
root = Path(entry) if has_drive else base_dir / entry
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
else:
p = Path(entry) if has_drive else (base_dir / entry).resolve()
p = Path(entry) if has_drive else (base_dir / entry).resolve()
matches = [p]
# Blacklist filter
filtered = []
for p in matches:
name = p.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
if name == "history.toml" or name.endswith("_history.toml"): continue
filtered.append(p)
return sorted(filtered)
@@ -93,7 +97,6 @@ def compute_file_stats(abs_path: str) -> dict[str, int]:
content = f.read()
stats["lines"] = len(content.splitlines())
if abs_path.endswith('.py'):
import ast
try:
tree = ast.parse(content)
stats["ast_elements"] = sum(1 for node in ast.walk(tree) if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)))
@@ -111,19 +114,19 @@ def build_discussion_section(history: list[Any]) -> str:
sections = []
for i, entry in enumerate(history, start=1):
if isinstance(entry, dict):
role = entry.get("role", "Unknown")
role = entry.get("role", "Unknown")
content = entry.get("content", "").strip()
text = f"{role}: {content}"
text = f"{role}: {content}"
else:
text = str(entry).strip()
sections.append(f"### Discussion Excerpt {i}\n\n{text}")
return "\n\n---\n\n".join(sections)
def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
sections = []
for entry in screenshots:
if not entry or not isinstance(entry, str):
continue
if not entry or not isinstance(entry, str): continue
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_")
@@ -158,63 +161,61 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[
parser = None
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = cast(str, entry_raw.get("path", ""))
tier = entry_raw.get("tier")
entry = cast(str, entry_raw.get("path", ""))
tier = entry_raw.get("tier")
auto_aggregate = entry_raw.get("auto_aggregate", True)
force_full = entry_raw.get("force_full", False)
view_mode = entry_raw.get("view_mode", "full")
if force_full:
view_mode = "full"
ast_signatures = entry_raw.get("ast_signatures", False)
force_full = entry_raw.get("force_full", False)
view_mode = entry_raw.get("view_mode", "full")
if force_full: view_mode = "full"
ast_signatures = entry_raw.get("ast_signatures", False)
ast_definitions = entry_raw.get("ast_definitions", False)
ast_mask = entry_raw.get("ast_mask", {})
custom_slices = entry_raw.get("custom_slices", [])
ast_mask = entry_raw.get("ast_mask", {})
custom_slices = entry_raw.get("custom_slices", [])
elif hasattr(entry_raw, "path"):
entry = entry_raw.path
tier = getattr(entry_raw, "tier", None)
entry = entry_raw.path
tier = getattr(entry_raw, "tier", None)
auto_aggregate = getattr(entry_raw, "auto_aggregate", True)
force_full = getattr(entry_raw, "force_full", False)
view_mode = getattr(entry_raw, "view_mode", "full")
if force_full:
view_mode = "full"
ast_signatures = getattr(entry_raw, "ast_signatures", False)
force_full = getattr(entry_raw, "force_full", False)
view_mode = getattr(entry_raw, "view_mode", "full")
if force_full: view_mode = "full"
ast_signatures = getattr(entry_raw, "ast_signatures", False)
ast_definitions = getattr(entry_raw, "ast_definitions", False)
ast_mask = getattr(entry_raw, "ast_mask", {})
custom_slices = getattr(entry_raw, "custom_slices", [])
ast_mask = getattr(entry_raw, "ast_mask", {})
custom_slices = getattr(entry_raw, "custom_slices", [])
else:
entry = entry_raw
tier = None
auto_aggregate = True
force_full = False
view_mode = "full"
ast_signatures = False
entry = entry_raw
tier = None
auto_aggregate = True
force_full = False
view_mode = "full"
ast_signatures = False
ast_definitions = False
ast_mask = {}
custom_slices = []
if not entry or not isinstance(entry, str):
continue
ast_mask = {}
custom_slices = []
if not entry or not isinstance(entry, str): continue
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
mtime = path.stat().st_mtime
error = False
if not error and view_mode != "full":
try:
if view_mode == "summary":
content = summarize.summarise_file(path, content)
if view_mode == "summary": content = summarize.summarise_file(path, content)
elif view_mode == "skeleton":
suffix_lower = path.suffix.lower()
if suffix_lower == ".py":
if not parser: parser = ASTParser("python")
content = parser.get_skeleton(content, path=str(path))
elif suffix_lower in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc']:
from src import mcp_client
if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_skeleton(str(path))
else: content = mcp_client.ts_cpp_get_skeleton(str(path))
if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_skeleton(str(path))
else: content = mcp_client.ts_cpp_get_skeleton(str(path))
else:
content = summarize.summarise_file(path, content)
elif view_mode == "outline":
@@ -223,7 +224,6 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[
if not parser: parser = ASTParser("python")
content = parser.get_code_outline(content, path=str(path))
elif suffix_lower in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc']:
from src import mcp_client
if suffix_lower in ['.c', '.h']: content = mcp_client.ts_c_get_code_outline(str(path))
else: content = mcp_client.ts_cpp_get_code_outline(str(path))
else:
@@ -232,58 +232,50 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[
suffix_lower = path.suffix.lower()
if ast_mask:
mask_sections = []
from src import mcp_client
for symbol_raw, mode in ast_mask.items():
if mode == "hide": continue
import re
symbol = re.sub(r'\(\d+-\d+\)$', '', symbol_raw)
res = ""
if suffix_lower == ".py":
res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol)
elif suffix_lower in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]:
is_cpp = any(ext in suffix_lower for ext in [".cpp", ".hpp", ".cxx", ".cc"])
if mode == "def":
res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol)
else:
res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol)
if mode == "def": res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol)
else: res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol)
if res: mask_sections.append(res)
if mask_sections:
content = "\n\n".join(mask_sections)
else:
content = "(no masked sections visible)"
if mask_sections: content = "\n\n".join(mask_sections)
else: content = "(no masked sections visible)"
else:
content = "(no ast mask defined)"
elif view_mode == "none":
content = "(context excluded)"
elif view_mode == "none": content = "(context excluded)"
elif view_mode == "custom":
if custom_slices:
lines = content.splitlines()
lines = content.splitlines()
slices_text = []
for s in custom_slices:
start = s.get("start_line", 1)
end = s.get("end_line", len(lines))
tag = s.get("tag", "unnamed")
comment = s.get("comment", "")
s_idx = max(0, start - 1)
e_idx = min(len(lines), end)
chunk = "\n".join(lines[s_idx:e_idx])
start = s.get("start_line", 1)
end = s.get("end_line", len(lines))
tag = s.get("tag", "unnamed")
comment = s.get("comment", "")
s_idx = max(0, start - 1)
e_idx = min(len(lines), end)
chunk = "\n".join(lines[s_idx:e_idx])
slices_text.append(f"---\n[Slice: {tag}] ({comment})\nLines {start}-{end}:\n{chunk}")
content = "\n\n".join(slices_text)
else:
content = summarize.summarise_file(path, content)
except Exception as e:
import traceback
content = f"ERROR in {view_mode} view mode for {path}:\n{traceback.format_exc()}"
error = True
error = True
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
import traceback
content = f"ERROR reading {path}:\n{traceback.format_exc()}"
mtime = 0.0
error = True
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full, "view_mode": view_mode, "ast_signatures": ast_signatures, "ast_definitions": ast_definitions, "ast_mask": ast_mask, "custom_slices": custom_slices})
return items
@@ -294,11 +286,10 @@ def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str:
"""
sections = []
for item in file_items:
if not item.get("auto_aggregate", True):
continue
path = item.get("path")
entry = item.get("entry", "unknown")
content = item.get("content", "")
if not item.get("auto_aggregate", True): continue
path = item.get("path")
entry = item.get("entry", "unknown")
content = item.get("content", "")
view_mode = item.get("view_mode", "full")
if path is None:
if view_mode == "summary":
@@ -320,23 +311,20 @@ def build_beads_section(base_dir: Path) -> str:
[C: tests/test_aggregate_beads.py:test_build_beads_compaction]
"""
client = beads_client.BeadsClient(base_dir)
if not client.is_initialized():
return ""
if not client.is_initialized(): return ""
beads = client.list_beads()
if not beads:
return ""
active = [b for b in beads if b.status == "active"]
if not beads: return ""
active = [b for b in beads if b.status == "active"]
completed = [b for b in beads if b.status == "completed"]
parts = []
parts = []
parts.append("## Beads Mode: Progress Track")
if completed:
if completed:
parts.append("### Completed Beads")
comp_list = ", ".join([f"`{b.title}`" for b in completed])
parts.append(comp_list)
if active:
parts.append("### Active Beads")
for b in active:
parts.append(f"- **{b.title}** ({b.id}): {b.description}")
for b in active: parts.append(f"- **{b.title}** ({b.id}): {b.description}")
return "\n\n".join(parts)
def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, aggregation_strategy: str = "auto", execution_mode: str = "standard", base_dir: Path | None = None) -> str:
@@ -344,24 +332,17 @@ def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if file_items:
if aggregation_strategy == "summarize":
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
elif aggregation_strategy == "full":
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if aggregation_strategy == "summarize": parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
elif aggregation_strategy == "full": parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
else: # auto
if summary_only:
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
else:
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if summary_only: parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
else: parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if execution_mode == "beads" and base_dir:
beads_md = build_beads_section(base_dir)
if beads_md:
parts.append(beads_md)
if beads_md: parts.append(beads_md)
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
if history: parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown_no_history(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False, aggregation_strategy: str = "auto") -> str:
@@ -388,67 +369,61 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P
"""
with get_monitor().scope("build_tier3_context"):
focus_set = set(focus_files)
parser = ASTParser("python")
sections = []
parser = ASTParser("python")
sections = []
for item in file_items:
if not item.get("auto_aggregate", True):
continue
path = item.get("path")
entry = item.get("entry", "")
path_str = str(path) if path else ""
name = path.name if path else ""
tier = item.get("tier")
force_full = item.get("force_full")
ast_signatures = item.get("ast_signatures", False)
if not item.get("auto_aggregate", True): continue
path = item.get("path")
entry = item.get("entry", "")
path_str = str(path) if path else ""
name = path.name if path else ""
tier = item.get("tier")
force_full = item.get("force_full")
ast_signatures = item.get("ast_signatures", False)
ast_definitions = item.get("ast_definitions", False)
ast_mask = item.get("ast_mask", {})
content = item.get("content", "")
is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set)
ast_mask = item.get("ast_mask", {})
content = item.get("content", "")
is_focus = entry in focus_set or (name and name in focus_set) or (path_str and path_str in focus_set)
if not is_focus and path_str:
for focus in focus_set:
if focus in path_str:
is_focus = True
break
original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown"))
slices = item.get('custom_slices', [])
original = entry if entry and "*" not in entry else (str(path) if path else (entry or "unknown"))
slices = item.get('custom_slices', [])
if slices and not item.get('error'):
from src.fuzzy_anchor import FuzzyAnchor
resolved_blocks = []
content = item.get('content', '')
suffix = path.suffix.lstrip(".") if path and path.suffix else "text"
content = item.get('content', '')
suffix = path.suffix.lstrip(".") if path and path.suffix else "text"
for slc in slices:
range_res = FuzzyAnchor.resolve_slice(content, slc)
if range_res:
s, e = range_res
s, e = range_res
lines = content.splitlines()
resolved_blocks.append("\n".join(lines[s-1:e]))
if resolved_blocks:
combined = "\n\n... [LINES SKIPPED] ...\n\n".join(resolved_blocks)
sections.append(f"### `{original}` (Slices)\n\n```{suffix}\n{combined}\n```")
continue # Skip full file logic
if is_focus or tier == 3 or force_full:
suffix = path.suffix.lstrip(".") if path and path.suffix else "text"
sections.append(f"### `{original}`\n\n```{suffix}\n{content}\n```")
elif path:
if ast_mask and not item.get("error"):
mask_sections = []
from src import mcp_client
for symbol_raw, mode in ast_mask.items():
if mode == "hide":
continue
import re
if mode == "hide": continue
symbol = re.sub(r'\(\d+-\d+\)$', '', symbol_raw)
res = ""
res = ""
if path.suffix == ".py":
res = mcp_client.py_get_definition(str(path), symbol) if mode == "def" else mcp_client.py_get_signature(str(path), symbol)
elif path.suffix in [".c", ".h", ".cpp", ".hpp", ".cxx", ".cc"]:
is_cpp = any(ext in path.suffix for ext in [".cpp", ".hpp", ".cxx", ".cc"])
if mode == "def":
res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol)
else:
res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol)
if mode == "def": res = mcp_client.ts_cpp_get_definition(str(path), symbol) if is_cpp else mcp_client.ts_c_get_definition(str(path), symbol)
else: res = mcp_client.ts_cpp_get_signature(str(path), symbol) if is_cpp else mcp_client.ts_c_get_signature(str(path), symbol)
if res:
mask_sections.append(res)
if mask_sections:
@@ -456,7 +431,6 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P
sections.append(f"### `{original}` (Masked)\n\n```{suffix}\n" + "\n\n".join(mask_sections) + "\n```")
continue
if path.suffix in ['.c', '.h', '.cpp', '.hpp', '.cxx', '.cc'] and not item.get("error"):
from src import mcp_client
if ast_definitions:
skeleton = mcp_client.ts_cpp_get_skeleton(str(path)) if 'cpp' in path.suffix or 'hpp' in path.suffix or 'cxx' in path.suffix or 'cc' in path.suffix else mcp_client.ts_c_get_skeleton(str(path))
sections.append(f"### `{original}` (AST Definitions)\n\n```{path.suffix.lstrip('.')}\n{skeleton}\n```")
@@ -474,12 +448,9 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P
else:
sections.append(f"### `{original}`\n\n{summarize.summarise_file(path, content)}")
parts = []
if sections:
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
if sections: parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots: parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history: parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str:
@@ -491,23 +462,31 @@ def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str
[C: simulation/sim_base.py:run_sim, src/ai_client.py:_send_anthropic, src/ai_client.py:_send_deepseek, src/ai_client.py:_send_gemini, src/ai_client.py:_send_gemini_cli, src/ai_client.py:_send_minimax, src/app_controller.py:AppController._cb_start_track, src/app_controller.py:AppController._do_generate, src/app_controller.py:AppController._process_event_queue, src/app_controller.py:AppController._start_track_logic, src/external_editor.py:_find_vscode_in_registry, src/gui_2.py:App._render_snapshot_tab, src/gui_2.py:App.run, src/gui_2.py:main, src/mcp_client.py:get_git_diff, src/project_manager.py:get_git_commit, src/rag_engine.py:RAGEngine._search_mcp, src/shell_runner.py:run_powershell, tests/conftest.py:kill_process_tree, tests/conftest.py:live_gui, tests/test_conductor_abort_event.py:test_conductor_abort_event_populated, tests/test_conductor_engine_v2.py:test_conductor_engine_dynamic_parsing_and_execution, tests/test_conductor_engine_v2.py:test_conductor_engine_run_executes_tickets_in_order, tests/test_extended_sims.py:test_ai_settings_sim_live, tests/test_extended_sims.py:test_context_sim_live, tests/test_extended_sims.py:test_execution_sim_live, tests/test_extended_sims.py:test_tools_sim_live, tests/test_external_editor_gui.py:get_vscode_processes, tests/test_external_editor_gui.py:test_vscode_launches_with_diff_view, tests/test_gui_custom_window.py:test_app_window_is_borderless, tests/test_headless_simulation.py:module, tests/test_headless_verification.py:test_headless_verification_error_and_qa_interceptor, tests/test_headless_verification.py:test_headless_verification_full_run, tests/test_mock_gemini_cli.py:run_mock, tests/test_orchestration_logic.py:test_conductor_engine_run, tests/test_parallel_execution.py:test_conductor_engine_pool_integration, tests/test_sim_ai_settings.py:test_ai_settings_simulation_run, tests/test_sim_context.py:test_context_simulation_run, tests/test_sim_execution.py:test_execution_simulation_run, tests/test_sim_tools.py:test_tools_simulation_run]
"""
namespace = config.get("project", {}).get("name")
if not namespace:
namespace = config.get("output", {}).get("namespace", "project")
output_dir = Path(config["output"]["output_dir"])
base_dir = Path(config["files"]["base_dir"])
files = config["files"].get("paths", [])
if not namespace: namespace = config.get("output", {}).get("namespace", "project")
output_dir = Path(config["output"]["output_dir"])
base_dir = Path(config["files"]["base_dir"])
files = config["files"].get("paths", [])
screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", "."))
screenshots = config.get("screenshots", {}).get("paths", [])
history = config.get("discussion", {}).get("history", [])
screenshots = config.get("screenshots", {}).get("paths", [])
history = config.get("discussion", {}).get("history", [])
output_dir.mkdir(parents=True, exist_ok=True)
increment = find_next_increment(output_dir, namespace)
increment = find_next_increment(output_dir, namespace)
output_file = output_dir / f"{namespace}_{increment:03d}.md"
# Build file items once, then construct markdown from them (avoids double I/O)
file_items = build_file_items(base_dir, files)
summary_only = config.get("project", {}).get("summary_only", False)
file_items = build_file_items(base_dir, files)
summary_only = config.get("project", {}).get("summary_only", False)
execution_mode = config.get("project", {}).get("execution_mode", "standard")
markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history,
summary_only=summary_only, aggregation_strategy=aggregation_strategy, execution_mode=execution_mode, base_dir=base_dir)
markdown = build_markdown_from_items(
file_items,
screenshot_base_dir,
screenshots,
history,
summary_only = summary_only,
aggregation_strategy = aggregation_strategy,
execution_mode = execution_mode,
base_dir = base_dir)
output_file.write_text(markdown, encoding="utf-8")
return markdown, output_file, file_items
@@ -516,7 +495,6 @@ def main() -> None:
"""
[C: simulation/live_walkthrough.py:module, simulation/ping_pong.py:module, src/ai_server.py:module, src/api_hooks.py:WebSocketServer._run_loop, src/gui_2.py:module, tests/mock_concurrent_mma.py:module, tests/mock_gemini_cli.py:module, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_allow_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_deny_decision, tests/test_cli_tool_bridge.py:TestCliToolBridge.test_unreachable_hook_server, tests/test_cli_tool_bridge.py:module, tests/test_cli_tool_bridge_mapping.py:TestCliToolBridgeMapping.test_mapping_from_api_format, tests/test_cli_tool_bridge_mapping.py:module, tests/test_discussion_takes.py:module, tests/test_external_editor_gui.py:module, tests/test_headless_service.py:TestHeadlessStartup.test_headless_flag_triggers_run, tests/test_headless_service.py:TestHeadlessStartup.test_normal_startup_calls_app_run, tests/test_mma_skeleton.py:module, tests/test_orchestrator_pm.py:module, tests/test_orchestrator_pm_history.py:module, tests/test_presets.py:module, tests/test_project_serialization.py:module, tests/test_run_worker_lifecycle_abort.py:module, tests/test_symbol_lookup.py:module, tests/test_system_prompt_exposure.py:module, tests/test_theme_nerv_fx.py:module]
"""
from src.paths import get_config_path
config_path = get_config_path()
if not config_path.exists():
@@ -528,7 +506,7 @@ def main() -> None:
if not active_path:
print(f"No active project found in {config_path}.")
return
# Use project_manager to load project (handles history segregation)
# Use project_manager to load project (handles history segregation)
proj = project_manager.load_project(active_path)
# Use flat_config to make it compatible with aggregate.run()
config = project_manager.flat_config(proj)
+8 -16
View File
@@ -45,10 +45,8 @@ def _now_ts() -> str:
def open_session(label: Optional[str] = None) -> None:
"""
Called once at GUI startup. Creates the log directories if needed and
opens the log files for this session within a sub-directory.
Called once at GUI startup. Creates the log directories if needed and
opens the log files for this session within a sub-directory.
[C: tests/test_app_controller_offloading.py:tmp_session_dir, tests/test_logging_e2e.py:test_logging_e2e, tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs, tests/test_session_logger_optimization.py:test_session_directory_and_subdirectories_creation, tests/test_session_logger_reset.py:test_reset_session, tests/test_session_logging.py:test_open_session_creates_subdir_and_registry]
"""
global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq, _output_seq
@@ -139,10 +137,8 @@ def log_api_hook(method: str, path: str, payload: str) -> None:
def log_comms(entry: dict[str, Any]) -> None:
"""
Append one comms entry to the comms log file as a JSON-L line.
Thread-safe (GIL + line-buffered file).
Append one comms entry to the comms log file as a JSON-L line.
Thread-safe (GIL + line-buffered file).
[C: tests/test_logging_e2e.py:test_logging_e2e]
"""
if _comms_fh is None:
@@ -154,10 +150,8 @@ def log_comms(entry: dict[str, Any]) -> None:
def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]:
"""
Append a tool-call record to the toolcalls log and write the PS1 script to
the session's scripts directory. Returns the path of the written script file.
Append a tool-call record to the toolcalls log and write the PS1 script to
the session's scripts directory. Returns the path of the written script file.
[C: tests/test_session_logger_optimization.py:test_log_tool_call_saves_in_session_scripts]
"""
global _seq
@@ -199,10 +193,8 @@ def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optio
def log_tool_output(content: str) -> Optional[str]:
"""
Save tool output content to a unique file in the session's outputs directory.
Returns the path of the written file.
Save tool output content to a unique file in the session's outputs directory.
Returns the path of the written file.
[C: tests/test_session_logger_optimization.py:test_log_tool_output_returns_none_if_no_session, tests/test_session_logger_optimization.py:test_log_tool_output_saves_in_session_outputs]
"""
global _output_seq
+2 -4
View File
@@ -3,10 +3,8 @@ from imgui_bundle import imgui
def draw_soft_shadow(draw_list: imgui.ImDrawList, p_min: imgui.ImVec2, p_max: imgui.ImVec2, color: imgui.ImVec4, shadow_size: float = 10.0, rounding: float = 0.0) -> None:
"""
Simulates a soft shadow effect by drawing multiple concentric rounded rectangles
with decreasing alpha values. This is a faux-shader effect using primitive batching.
Simulates a soft shadow effect by drawing multiple concentric rounded rectangles
with decreasing alpha values. This is a faux-shader effect using primitive batching.
"""
r, g, b, a = color.x, color.y, color.z, color.w
steps = int(shadow_size)
+6 -8
View File
@@ -55,14 +55,12 @@ def _build_subprocess_env() -> dict[str, str]:
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None, patch_callback: Optional[Callable[[str, str], Optional[str]]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
If patch_callback is provided, it receives (error, file_context) and returns patch text.
[C: tests/test_tier4_interceptor.py:test_run_powershell_no_qa_callback_on_success, tests/test_tier4_interceptor.py:test_run_powershell_optional_qa_callback, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_failure, tests/test_tier4_interceptor.py:test_run_powershell_qa_callback_on_stderr_only]
"""
safe_dir: str = str(base_dir).replace("'", "''")
+4 -8
View File
@@ -205,10 +205,8 @@ def summarise_file(path: Path, content: str) -> str:
def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Given a list of file_item dicts (as returned by aggregate.build_file_items),
return a parallel list of dicts with an added `summary` key.
Given a list of file_item dicts (as returned by aggregate.build_file_items),
return a parallel list of dicts with an added `summary` key.
"""
result = []
for item in file_items:
@@ -225,10 +223,8 @@ def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
def build_summary_markdown(file_items: list[dict[str, Any]]) -> str:
"""
Build a compact markdown string of file summaries, suitable for the
initial <context> block instead of full file contents.
Build a compact markdown string of file summaries, suitable for the
initial <context> block instead of full file contents.
"""
summarised = summarise_items(file_items)
parts = []