From 73fad80257e285a0390e4e7e55886457c22a77a0 Mon Sep 17 00:00:00 2001 From: Ed_ Date: Sun, 22 Feb 2026 17:03:38 -0500 Subject: [PATCH] carlos patches --- aggregate.py | 52 ++++++++++++--- ai_client.py | 175 ++++++++++++++++++++++++++++++++++++++------------- 2 files changed, 178 insertions(+), 49 deletions(-) diff --git a/aggregate.py b/aggregate.py index 304ebc8..46a8f16 100644 --- a/aggregate.py +++ b/aggregate.py @@ -98,24 +98,28 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]: entry : str (original config entry string) content : str (file text, or error string) error : bool + mtime : float (last modification time, for skip-if-unchanged optimization) """ items = [] for entry in files: paths = resolve_paths(base_dir, entry) if not paths: - items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True}) + items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0}) continue for path in paths: try: content = path.read_text(encoding="utf-8") + mtime = path.stat().st_mtime error = False except FileNotFoundError: content = f"ERROR: file not found: {path}" + mtime = 0.0 error = True except Exception as e: content = f"ERROR: {e}" + mtime = 0.0 error = True - items.append({"path": path, "entry": entry, "content": content, "error": error}) + items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime}) return items def build_summary_section(base_dir: Path, files: list[str]) -> str: @@ -126,6 +130,40 @@ def build_summary_section(base_dir: Path, files: list[str]) -> str: items = build_file_items(base_dir, files) return summarize.build_summary_markdown(items) +def _build_files_section_from_items(file_items: list[dict]) -> str: + """Build the files markdown section from pre-read file items (avoids double I/O).""" + sections = [] + for item in file_items: + path = item.get("path") + entry = item.get("entry", "unknown") + content = item.get("content", "") + if path is None: + sections.append(f"### `{entry}`\n\n```text\n{content}\n```") + continue + suffix = path.suffix.lstrip(".") if hasattr(path, "suffix") else "text" + lang = suffix if suffix else "text" + original = entry if "*" not in entry else str(path) + sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```") + return "\n\n---\n\n".join(sections) + + +def build_markdown_from_items(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str: + """Build markdown from pre-read file items instead of re-reading from disk.""" + parts = [] + # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits + if file_items: + if summary_only: + parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items)) + else: + parts.append("## Files\n\n" + _build_files_section_from_items(file_items)) + if screenshots: + parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) + # DYNAMIC SUFFIX: History changes every turn, must go last + if history: + parts.append("## Discussion History\n\n" + build_discussion_section(history)) + return "\n\n---\n\n".join(parts) + + def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str: parts = [] # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits @@ -141,7 +179,7 @@ def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, parts.append("## Discussion History\n\n" + build_discussion_section(history)) return "\n\n---\n\n".join(parts) -def run(config: dict) -> tuple[str, Path]: +def run(config: dict) -> tuple[str, Path, list[dict]]: namespace = config.get("project", {}).get("name") if not namespace: namespace = config.get("output", {}).get("namespace", "project") @@ -155,11 +193,11 @@ def run(config: dict) -> tuple[str, Path]: output_dir.mkdir(parents=True, exist_ok=True) increment = find_next_increment(output_dir, namespace) output_file = output_dir / f"{namespace}_{increment:03d}.md" - # Provide full files to trigger Gemini's 32k cache threshold and give the AI immediate context - markdown = build_markdown(base_dir, files, screenshot_base_dir, screenshots, history, - summary_only=False) - output_file.write_text(markdown, encoding="utf-8") + # Build file items once, then construct markdown from them (avoids double I/O) file_items = build_file_items(base_dir, files) + markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, + summary_only=False) + output_file.write_text(markdown, encoding="utf-8") return markdown, output_file, file_items def main(): diff --git a/ai_client.py b/ai_client.py index c32a4a1..437569d 100644 --- a/ai_client.py +++ b/ai_client.py @@ -13,6 +13,7 @@ during chat creation to avoid massive history bloat. # ai_client.py import tomllib import json +import time import datetime from pathlib import Path import file_cache @@ -34,6 +35,12 @@ def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000): _gemini_client = None _gemini_chat = None _gemini_cache = None +_gemini_cache_md_hash: int | None = None +_gemini_cache_created_at: float | None = None + +# Gemini cache TTL in seconds. Caches are created with this TTL and +# proactively rebuilt at 90% of this value to avoid stale-reference errors. +_GEMINI_CACHE_TTL = 3600 _anthropic_client = None _anthropic_history: list[dict] = [] @@ -216,6 +223,7 @@ def cleanup(): def reset_session(): global _gemini_client, _gemini_chat, _gemini_cache + global _gemini_cache_md_hash, _gemini_cache_created_at global _anthropic_client, _anthropic_history global _CACHED_ANTHROPIC_TOOLS if _gemini_client and _gemini_cache: @@ -226,6 +234,8 @@ def reset_session(): _gemini_client = None _gemini_chat = None _gemini_cache = None + _gemini_cache_md_hash = None + _gemini_cache_created_at = None _anthropic_client = None _anthropic_history = [] _CACHED_ANTHROPIC_TOOLS = None @@ -383,12 +393,15 @@ def _run_script(script: str, base_dir: str) -> str: # ------------------------------------------------------------------ dynamic file context refresh -def _reread_file_items(file_items: list[dict]) -> list[dict]: +def _reread_file_items(file_items: list[dict]) -> tuple[list[dict], list[dict]]: """ - Re-read every file in file_items from disk, returning a fresh list. - This is called after tool calls so the AI sees updated file contents. + Re-read file_items from disk, but only files whose mtime has changed. + Returns (all_items, changed_items) — all_items is the full refreshed list, + changed_items contains only the files that were actually modified since + the last read (used to build a minimal [FILES UPDATED] block). """ refreshed = [] + changed = [] for item in file_items: path = item.get("path") if path is None: @@ -397,11 +410,20 @@ def _reread_file_items(file_items: list[dict]) -> list[dict]: from pathlib import Path as _P p = _P(path) if not isinstance(path, _P) else path try: + current_mtime = p.stat().st_mtime + prev_mtime = item.get("mtime", 0.0) + if current_mtime == prev_mtime: + refreshed.append(item) # unchanged — skip re-read + continue content = p.read_text(encoding="utf-8") - refreshed.append({**item, "content": content, "error": False}) + new_item = {**item, "content": content, "error": False, "mtime": current_mtime} + refreshed.append(new_item) + changed.append(new_item) except Exception as e: - refreshed.append({**item, "content": f"ERROR re-reading {p}: {e}", "error": True}) - return refreshed + err_item = {**item, "content": f"ERROR re-reading {p}: {e}", "error": True, "mtime": 0.0} + refreshed.append(err_item) + changed.append(err_item) + return refreshed, changed def _build_file_context_text(file_items: list[dict]) -> str: @@ -466,25 +488,39 @@ def _get_gemini_history_list(chat): return [] def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str: - global _gemini_chat, _gemini_cache + global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at from google.genai import types try: _ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir]) sys_instr = f"{_get_combined_system_prompt()}\n\n\n{md_content}\n" tools_decl = [_gemini_tool_declaration()] - + # DYNAMIC CONTEXT: Check if files/context changed mid-session current_md_hash = hash(md_content) old_history = None - if _gemini_chat and getattr(_gemini_chat, "_last_md_hash", None) != current_md_hash: + if _gemini_chat and _gemini_cache_md_hash != current_md_hash: old_history = list(_get_gemini_history_list(_gemini_chat)) if _get_gemini_history_list(_gemini_chat) else [] if _gemini_cache: try: _gemini_client.caches.delete(name=_gemini_cache.name) except: pass _gemini_chat = None _gemini_cache = None + _gemini_cache_created_at = None _append_comms("OUT", "request", {"message": "[CONTEXT CHANGED] Rebuilding cache and chat session..."}) + # CACHE TTL: Proactively rebuild before the cache expires server-side. + # If we don't, send_message() will reference a deleted cache and fail. + if _gemini_chat and _gemini_cache and _gemini_cache_created_at: + elapsed = time.time() - _gemini_cache_created_at + if elapsed > _GEMINI_CACHE_TTL * 0.9: + old_history = list(_get_gemini_history_list(_gemini_chat)) if _get_gemini_history_list(_gemini_chat) else [] + try: _gemini_client.caches.delete(name=_gemini_cache.name) + except: pass + _gemini_chat = None + _gemini_cache = None + _gemini_cache_created_at = None + _append_comms("OUT", "request", {"message": f"[CACHE TTL] Rebuilding cache (expired after {int(elapsed)}s)..."}) + if not _gemini_chat: chat_config = types.GenerateContentConfig( system_instruction=sys_instr, @@ -500,9 +536,10 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: config=types.CreateCachedContentConfig( system_instruction=sys_instr, tools=tools_decl, - ttl="3600s", + ttl=f"{_GEMINI_CACHE_TTL}s", ) ) + _gemini_cache_created_at = time.time() chat_config = types.GenerateContentConfig( cached_content=_gemini_cache.name, temperature=_temperature, @@ -511,35 +548,38 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: ) _append_comms("OUT", "request", {"message": f"[CACHE CREATED] {_gemini_cache.name}"}) except Exception as e: - _gemini_cache = None # Ensure clean state on failure - + _gemini_cache = None + _gemini_cache_created_at = None + _append_comms("OUT", "request", {"message": f"[CACHE FAILED] {type(e).__name__}: {e} — falling back to inline system_instruction"}) + kwargs = {"model": _model, "config": chat_config} if old_history: kwargs["history"] = old_history - + _gemini_chat = _gemini_client.chats.create(**kwargs) - _gemini_chat._last_md_hash = current_md_hash + _gemini_cache_md_hash = current_md_hash _append_comms("OUT", "request", {"message": f"[ctx {len(md_content)} + msg {len(user_message)}]"}) payload, all_text = user_message, [] - - for r_idx in range(MAX_TOOL_ROUNDS + 2): - # Strip stale file refreshes and truncate old tool outputs in Gemini history - if _gemini_chat and _get_gemini_history_list(_gemini_chat): - for msg in _get_gemini_history_list(_gemini_chat): - if msg.role == "user" and hasattr(msg, "parts"): - for p in msg.parts: - if hasattr(p, "function_response") and p.function_response and hasattr(p.function_response, "response"): - r = p.function_response.response - if isinstance(r, dict) and "output" in r: - val = r["output"] - if isinstance(val, str): - if "[SYSTEM: FILES UPDATED]" in val: - val = val.split("[SYSTEM: FILES UPDATED]")[0].strip() - if _history_trunc_limit > 0 and len(val) > _history_trunc_limit: - val = val[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS.]" - r["output"] = val + # Strip stale file refreshes and truncate old tool outputs ONCE before + # entering the tool loop (not per-round — history entries don't change). + if _gemini_chat and _get_gemini_history_list(_gemini_chat): + for msg in _get_gemini_history_list(_gemini_chat): + if msg.role == "user" and hasattr(msg, "parts"): + for p in msg.parts: + if hasattr(p, "function_response") and p.function_response and hasattr(p.function_response, "response"): + r = p.function_response.response + if isinstance(r, dict) and "output" in r: + val = r["output"] + if isinstance(val, str): + if "[SYSTEM: FILES UPDATED]" in val: + val = val.split("[SYSTEM: FILES UPDATED]")[0].strip() + if _history_trunc_limit > 0 and len(val) > _history_trunc_limit: + val = val[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS.]" + r["output"] = val + + for r_idx in range(MAX_TOOL_ROUNDS + 2): resp = _gemini_chat.send_message(payload) txt = "\n".join(p.text for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "text") and p.text) if txt: all_text.append(txt) @@ -593,12 +633,12 @@ def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: if i == len(calls) - 1: if file_items: - file_items = _reread_file_items(file_items) - ctx = _build_file_context_text(file_items) + file_items, changed = _reread_file_items(file_items) + ctx = _build_file_context_text(changed) if ctx: out += f"\n\n[SYSTEM: FILES UPDATED]\n\n{ctx}" if r_idx == MAX_TOOL_ROUNDS: out += "\n\n[SYSTEM: MAX ROUNDS. PROVIDE FINAL ANSWER.]" - + f_resps.append(types.Part.from_function_response(name=name, response={"output": out})) log.append({"tool_use_id": name, "content": out}) @@ -630,7 +670,15 @@ _FILE_REFRESH_MARKER = "[FILES UPDATED" def _estimate_message_tokens(msg: dict) -> int: - """Rough token estimate for a single Anthropic message dict.""" + """ + Rough token estimate for a single Anthropic message dict. + Caches the result on the dict as '_est_tokens' so repeated calls + (e.g., from _trim_anthropic_history) don't re-scan unchanged messages. + Call _invalidate_token_estimate() when a message's content is modified. + """ + cached = msg.get("_est_tokens") + if cached is not None: + return cached total_chars = 0 content = msg.get("content", "") if isinstance(content, str): @@ -648,7 +696,14 @@ def _estimate_message_tokens(msg: dict) -> int: total_chars += len(_json.dumps(inp, ensure_ascii=False)) elif isinstance(block, str): total_chars += len(block) - return max(1, int(total_chars / _CHARS_PER_TOKEN)) + est = max(1, int(total_chars / _CHARS_PER_TOKEN)) + msg["_est_tokens"] = est + return est + + +def _invalidate_token_estimate(msg: dict): + """Remove the cached token estimate so the next call recalculates.""" + msg.pop("_est_tokens", None) def _estimate_prompt_tokens(system_blocks: list[dict], history: list[dict]) -> int: @@ -660,7 +715,7 @@ def _estimate_prompt_tokens(system_blocks: list[dict], history: list[dict]) -> i total += max(1, int(len(text) / _CHARS_PER_TOKEN)) # Tool definitions (rough fixed estimate — they're ~2k tokens for our set) total += 2500 - # History messages + # History messages (uses cached estimates for unchanged messages) for msg in history: total += _estimate_message_tokens(msg) return total @@ -695,6 +750,7 @@ def _strip_stale_file_refreshes(history: list[dict]): cleaned.append(block) if len(cleaned) < len(content): msg["content"] = cleaned + _invalidate_token_estimate(msg) def _trim_anthropic_history(system_blocks: list[dict], history: list[dict]): @@ -786,6 +842,28 @@ def _strip_cache_controls(history: list[dict]): if isinstance(block, dict): block.pop("cache_control", None) +def _add_history_cache_breakpoint(history: list[dict]): + """ + Place cache_control:ephemeral on the last content block of the + second-to-last user message. This uses one of the 4 allowed Anthropic + cache breakpoints to cache the conversation prefix so the full history + isn't reprocessed on every request. + """ + user_indices = [i for i, m in enumerate(history) if m.get("role") == "user"] + if len(user_indices) < 2: + return # Only one user message (the current turn) — nothing stable to cache + target_idx = user_indices[-2] + content = history[target_idx].get("content") + if isinstance(content, list) and content: + last_block = content[-1] + if isinstance(last_block, dict): + last_block["cache_control"] = {"type": "ephemeral"} + elif isinstance(content, str): + history[target_idx]["content"] = [ + {"type": "text", "text": content, "cache_control": {"type": "ephemeral"}} + ] + + def _repair_anthropic_history(history: list[dict]): """ If history ends with an assistant message that contains tool_use blocks @@ -823,23 +901,36 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item _ensure_anthropic_client() mcp_client.configure(file_items or [], [base_dir]) - system_text = _get_combined_system_prompt() + f"\n\n\n{md_content}\n" - system_blocks = _build_chunked_context_blocks(system_text) + # Split system into two cache breakpoints: + # 1. Stable system prompt (never changes — always a cache hit) + # 2. Dynamic file context (invalidated only when files change) + stable_prompt = _get_combined_system_prompt() + stable_blocks = [{"type": "text", "text": stable_prompt, "cache_control": {"type": "ephemeral"}}] + context_text = f"\n\n\n{md_content}\n" + context_blocks = _build_chunked_context_blocks(context_text) + system_blocks = stable_blocks + context_blocks user_content = [{"type": "text", "text": user_message}] # COMPRESS HISTORY: Truncate massive tool outputs from previous turns for msg in _anthropic_history: if msg.get("role") == "user" and isinstance(msg.get("content"), list): + modified = False for block in msg["content"]: if isinstance(block, dict) and block.get("type") == "tool_result": t_content = block.get("content", "") if _history_trunc_limit > 0 and isinstance(t_content, str) and len(t_content) > _history_trunc_limit: block["content"] = t_content[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS. Original output was too large.]" + modified = True + if modified: + _invalidate_token_estimate(msg) _strip_cache_controls(_anthropic_history) _repair_anthropic_history(_anthropic_history) _anthropic_history.append({"role": "user", "content": user_content}) + # Use the 4th cache breakpoint to cache the conversation history prefix. + # This is placed on the second-to-last user message (the last stable one). + _add_history_cache_breakpoint(_anthropic_history) n_chunks = len(system_blocks) _append_comms("OUT", "request", { @@ -953,10 +1044,10 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item "content": output, }) - # Refresh file context after tool calls and inject into tool result message + # Refresh file context after tool calls — only inject CHANGED files if file_items: - file_items = _reread_file_items(file_items) - refreshed_ctx = _build_file_context_text(file_items) + file_items, changed = _reread_file_items(file_items) + refreshed_ctx = _build_file_context_text(changed) if refreshed_ctx: tool_results.append({ "type": "text",