Files
manual_slop/aggregate.py
razor950 69401365be Port missing features to gui_2 and optimize caching
- Port 10 missing features from gui.py to gui_2.py: performance
    diagnostics, prior session log viewing, token budget visualization,
    agent tools config, API hooks server, GUI task queue, discussion
    truncation, THINKING/LIVE indicators, event subscriptions, and
    session usage tracking
  - Persist window visibility state in config.toml
  - Fix Gemini cache invalidation by separating discussion history
    from cached context (use MD5 hash instead of built-in hash)
  - Add cost optimizations: tool output truncation at source, proactive
    history trimming at 40%, summary_only support in aggregate.run()
  - Add cleanup() for destroying API caches on exit
2026-02-23 20:06:13 -05:00

225 lines
9.4 KiB
Python

# aggregate.py
"""
Note(Gemini):
This module orchestrates the construction of the final Markdown context string.
Instead of sending every file to the AI raw (which blows up tokens), this uses a pipeline:
1. Resolve paths (handles globs and absolute paths).
2. Build file items (raw content).
3. If 'summary_only' is true (which is the default behavior now), it pipes the files through
summarize.py to generate a compacted view.
This is essential for keeping prompt tokens low while giving the AI enough structural info
to use the MCP tools to fetch only what it needs.
"""
import tomllib
import re
import glob
from pathlib import Path, PureWindowsPath
import summarize
def find_next_increment(output_dir: Path, namespace: str) -> int:
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
max_num = 0
for f in output_dir.iterdir():
if f.is_file():
match = pattern.match(f.name)
if match:
max_num = max(max_num, int(match.group(1)))
return max_num + 1
def is_absolute_with_drive(entry: str) -> bool:
try:
p = PureWindowsPath(entry)
return p.drive != ""
except Exception:
return False
def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
has_drive = is_absolute_with_drive(entry)
is_wildcard = "*" in entry
if is_wildcard:
root = Path(entry) if has_drive else base_dir / entry
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
return sorted(matches)
else:
if has_drive:
return [Path(entry)]
return [(base_dir / entry).resolve()]
def build_discussion_section(history: list[str]) -> str:
sections = []
for i, paste in enumerate(history, start=1):
sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}")
return "\n\n---\n\n".join(sections)
def build_files_section(base_dir: Path, files: list[str]) -> str:
sections = []
for entry in files:
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```")
continue
for path in paths:
suffix = path.suffix.lstrip(".")
lang = suffix if suffix else "text"
try:
content = path.read_text(encoding="utf-8")
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
except Exception as e:
content = f"ERROR: {e}"
original = entry if "*" not in entry else str(path)
sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```")
return "\n\n---\n\n".join(sections)
def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
sections = []
for entry in screenshots:
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_")
continue
for path in paths:
original = entry if "*" not in entry else str(path)
if not path.exists():
sections.append(f"### `{original}`\n\n_ERROR: file not found: {path}_")
continue
sections.append(f"### `{original}`\n\n![{path.name}]({path.as_posix()})")
return "\n\n---\n\n".join(sections)
def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
"""
Return a list of dicts describing each file, for use by ai_client when it
wants to upload individual files rather than inline everything as markdown.
Each dict has:
path : Path (resolved absolute path)
entry : str (original config entry string)
content : str (file text, or error string)
error : bool
mtime : float (last modification time, for skip-if-unchanged optimization)
"""
items = []
for entry in files:
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime})
return items
def build_summary_section(base_dir: Path, files: list[str]) -> str:
"""
Build a compact summary section using summarize.py — one short block per file.
Used as the initial <context> block instead of full file contents.
"""
items = build_file_items(base_dir, files)
return summarize.build_summary_markdown(items)
def _build_files_section_from_items(file_items: list[dict]) -> str:
"""Build the files markdown section from pre-read file items (avoids double I/O)."""
sections = []
for item in file_items:
path = item.get("path")
entry = item.get("entry", "unknown")
content = item.get("content", "")
if path is None:
sections.append(f"### `{entry}`\n\n```text\n{content}\n```")
continue
suffix = path.suffix.lstrip(".") if hasattr(path, "suffix") else "text"
lang = suffix if suffix else "text"
original = entry if "*" not in entry else str(path)
sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```")
return "\n\n---\n\n".join(sections)
def build_markdown_from_items(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
"""Build markdown from pre-read file items instead of re-reading from disk."""
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if file_items:
if summary_only:
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
else:
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown_no_history(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False) -> str:
"""Build markdown with only files + screenshots (no history). Used for stable caching."""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history=[], summary_only=summary_only)
def build_discussion_text(history: list[str]) -> str:
"""Build just the discussion history section text. Returns empty string if no history."""
if not history:
return ""
return "## Discussion History\n\n" + build_discussion_section(history)
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:
if summary_only:
parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files))
else:
parts.append("## Files\n\n" + build_files_section(base_dir, files))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def run(config: dict) -> tuple[str, Path, list[dict]]:
namespace = config.get("project", {}).get("name")
if not namespace:
namespace = config.get("output", {}).get("namespace", "project")
output_dir = Path(config["output"]["output_dir"])
base_dir = Path(config["files"]["base_dir"])
files = config["files"].get("paths", [])
screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", "."))
screenshots = config.get("screenshots", {}).get("paths", [])
history = config.get("discussion", {}).get("history", [])
output_dir.mkdir(parents=True, exist_ok=True)
increment = find_next_increment(output_dir, namespace)
output_file = output_dir / f"{namespace}_{increment:03d}.md"
# Build file items once, then construct markdown from them (avoids double I/O)
file_items = build_file_items(base_dir, files)
summary_only = config.get("project", {}).get("summary_only", False)
markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history,
summary_only=summary_only)
output_file.write_text(markdown, encoding="utf-8")
return markdown, output_file, file_items
def main():
with open("config.toml", "rb") as f:
import tomllib
config = tomllib.load(f)
markdown, output_file, _ = run(config)
print(f"Written: {output_file}")
if __name__ == "__main__":
main()