71 Commits

Author SHA1 Message Date
ed e3b483d983 chore(conductor): Mark track 'api_metrics_20260223' as complete 2026-02-23 13:46:59 -05:00
ed 2d22bd7b9c conductor(plan): Mark phase 'Phase 2: GUI Telemetry and Plotting' as complete 2026-02-23 13:46:28 -05:00
ed 76582c821e conductor(checkpoint): Checkpoint end of Phase 2 2026-02-23 13:45:32 -05:00
ed e47ee14c7b docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:43:31 -05:00
ed e747a783a5 feat(gui): Display active Gemini caches
This change adds a label to the Provider panel to show the count and total size of active Gemini caches when the Gemini provider is selected. This information is hidden for other providers.
2026-02-23 13:42:57 -05:00
ed 84f05079e3 docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:40:42 -05:00
ed c35170786b feat(gui): Implement token budget visualizer
This change adds a progress bar and label to the Provider panel to display the current history token usage against the provider's limit. The UI is updated in real-time.
2026-02-23 13:40:04 -05:00
ed a52f3a2ef8 conductor(plan): Mark phase 'Phase 1: Metric Extraction and Logic Review' as complete 2026-02-23 13:35:15 -05:00
ed 2668f88e8a conductor(checkpoint): Checkpoint end of Phase 1 2026-02-23 13:34:18 -05:00
ed ac51ded52b docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:29:22 -05:00
ed f10a2f2ffa feat(conductor): Expose history bleed flags
This change introduces a new function, get_history_bleed_stats, to calculate and expose how close the current conversation history is to the provider's token limit. The initial implementation supports Anthropic, with a placeholder for Gemini.
2026-02-23 13:29:06 -05:00
ed c61fcc6333 docs(conductor): Update plan for api_metrics_20260223 2026-02-23 13:28:20 -05:00
ed 8aa70e287f fix(conductor): Implement Gemini cache metrics
This change corrects the implementation of get_gemini_cache_stats to use the Gemini client instance and updates the corresponding test to use proper mocking.
2026-02-23 13:27:49 -05:00
ed 27eb9bef95 archive context managment 2026-02-23 13:10:47 -05:00
ed 56e275245f chore(conductor): Archive track 'api_hooks_verification_20260223' 2026-02-23 13:07:29 -05:00
ed eb9705bd93 chore(conductor): Mark track 'Update conductor to properly utilize the new api hooks for automated testing & verification of track implementation features withou the need of user intervention.' as complete 2026-02-23 13:04:01 -05:00
ed 10ca40dd35 conductor(plan): Mark phase 'Phase 2: Implement Automated Verification Logic' as complete 2026-02-23 13:02:28 -05:00
ed b575dcd1eb conductor(checkpoint): Checkpoint end of Phase 2: Implement Automated Verification Logic 2026-02-23 13:01:00 -05:00
ed f7d3e97f18 conductor(plan): Mark task 'Implement result handling' as complete 2026-02-23 13:00:20 -05:00
ed 94b4f38c8c test(conductor): Enhance integration tests for API hook result handling 2026-02-23 12:58:50 -05:00
ed 9c60936a0c conductor(plan): Mark task 'Integrate ApiHookClient' as complete 2026-02-23 12:58:15 -05:00
ed c7c8b89b4e test(conductor): Add integration test for ApiHookClient usage in phase completion 2026-02-23 12:56:57 -05:00
ed cf19530792 conductor(plan): Mark task 'Develop ApiHookClient' as complete 2026-02-23 12:54:46 -05:00
ed f4a9ff82fa feat(api-hooks): Implement ApiHookClient with comprehensive tests 2026-02-23 12:54:16 -05:00
ed 926cebe40a conductor(plan): Mark phase 'Phase 1: Update Workflow Definition' as complete 2026-02-23 12:49:41 -05:00
ed f17c9e31b4 conductor(checkpoint): Checkpoint end of Phase 1: Update Workflow Definition 2026-02-23 12:49:14 -05:00
ed 1b8b236433 conductor(plan): Mark task 'Modify workflow.md' as complete 2026-02-23 12:48:45 -05:00
ed 2ec1ecfd50 docs(workflow): Automate phase verification protocol with API hooks 2026-02-23 12:48:09 -05:00
ed a70e4e2b21 add new track 2026-02-23 12:47:22 -05:00
ed ce75f0e5a1 remove active track 2026-02-23 12:40:43 -05:00
ed 76e263c0c9 chore(conductor): Archive track 'Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing.' 2026-02-23 12:40:10 -05:00
ed bb4776e99c conductor(plan): Mark task 'Apply review suggestions' as complete 2026-02-23 12:38:40 -05:00
ed dc64493f42 fix(conductor): Apply review suggestions for track 'Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing.' 2026-02-23 12:38:29 -05:00
ed 0070f61a40 chore(conductor): Mark track 'Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing.' as complete 2026-02-23 12:29:11 -05:00
ed d3ca0fee98 conductor(plan): Mark phase 'Phase 2: Hook Implementations and Logging' as complete 2026-02-23 12:28:43 -05:00
ed eaf229e144 conductor(checkpoint): Checkpoint end of Phase 2 2026-02-23 12:27:02 -05:00
ed d7281dc16e conductor(plan): Mark task 'Integrate aggressive logging for all hook invocations' as complete 2026-02-23 12:23:53 -05:00
ed ef29902963 feat(api): Integrate aggressive logging for all hook invocations 2026-02-23 12:23:23 -05:00
ed 0d09007dc1 conductor(plan): Mark task 'Implement GUI state manipulation hooks with thread-safe queueing' as complete 2026-02-23 12:22:27 -05:00
ed 5f9bc193cb feat(api): Add GUI state manipulation hooks with thread-safe queueing 2026-02-23 12:21:18 -05:00
ed 03db4190d7 conductor(plan): Mark task 'Implement project and AI session state manipulation hooks' as complete 2026-02-23 12:18:18 -05:00
ed d9d056c80d feat(api): Add project and session state manipulation hooks 2026-02-23 12:17:32 -05:00
ed a65990f72b conductor(plan): Mark phase 'Phase 1: Foundation and Opt-in Mechanisms' as complete 2026-02-23 12:15:13 -05:00
ed 2bc7a3f0a5 conductor(checkpoint): Checkpoint end of Phase 1 2026-02-23 12:14:26 -05:00
ed bf76a763c3 conductor(plan): Mark task 'Set up lightweight local IPC server...' as complete 2026-02-23 12:11:27 -05:00
ed 44c2585f95 feat(api): Add lightweight HTTP server for API hooks 2026-02-23 12:11:01 -05:00
ed bd7ccf3a07 conductor(plan): Mark task 'Implement CLI flag/env-var to enable the hook system' as complete 2026-02-23 12:07:21 -05:00
ed 1306163446 feat(api): Add CLI flag and env var to enable test hooks 2026-02-23 12:06:53 -05:00
ed ddf6f0e1bc chore(conductor): Add new track 'Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing.' 2026-02-23 11:53:12 -05:00
ed d53f0e44ee chore(conductor): Add new track 'Review vendor api usage in regards to conservative context handling' 2026-02-23 11:45:26 -05:00
ed fb018e1291 chore(conductor): Mark track 'Implement context visualization and memory management improvements' as complete 2026-02-23 11:38:02 -05:00
ed a7639fe24e conductor(plan): Mark phase 'Phase 2: Agent Capability Configuration' as complete 2026-02-23 11:37:55 -05:00
ed 1ac6eb9b7f conductor(checkpoint): Checkpoint end of Phase 2 2026-02-23 11:37:12 -05:00
ed d042fa95e2 conductor(plan): Mark task 'Wire tool toggles to AI provider tool declaration payload' as complete 2026-02-23 11:32:18 -05:00
ed 92aa33c6d3 feat(core): Wire tool toggles to AI provider tool declaration payload 2026-02-23 11:30:36 -05:00
ed 1677d25298 feat(ui): Add UI toggles for available tools per-project 2026-02-23 11:24:44 -05:00
ed 9c5fcab9e8 conductor(plan): Mark phase 'Phase 1: Context Memory and Token Visualization' as complete 2026-02-23 11:19:17 -05:00
ed a88311b9fe conductor(checkpoint): Checkpoint end of Phase 1 2026-02-23 11:17:25 -05:00
ed ccdba69214 conductor(plan): Mark task 'Expose history truncation controls in the Discussion panel' as complete 2026-02-23 11:04:46 -05:00
ed 94fe904d3f feat(ui): Expose history truncation controls in the Discussion panel 2026-02-23 11:03:00 -05:00
ed 9e6b740950 conductor(plan): Mark task 'Implement token usage summary widget' as complete 2026-02-23 11:00:20 -05:00
ed e34ff7ef79 feat(ui): Implement token usage summary widget 2026-02-23 10:59:29 -05:00
ed 4479c38395 conductor(setup): Add conductor setup files 2026-02-23 10:53:20 -05:00
ed 243a0cc5ca trying out conductor 2026-02-23 10:51:24 -05:00
ed 68e895cb8a update docs 2026-02-22 17:28:07 -05:00
ed b4734f4bba fix for gui 2026-02-22 17:28:00 -05:00
ed 8a3c2d8e21 fix to ai_client.py 2026-02-22 17:19:15 -05:00
ed 73fad80257 carlos patches 2026-02-22 17:03:38 -05:00
ed 17eebff5f8 Revert "final updates"
This reverts commit 1581380a43.
2026-02-22 12:15:49 -05:00
ed 1581380a43 final updates 2026-02-22 11:57:23 -05:00
ed 8bf95866dc fix for gemini. 2026-02-22 11:41:11 -05:00
51 changed files with 2417 additions and 295 deletions
BIN
View File
Binary file not shown.
+47
View File
@@ -0,0 +1,47 @@
# Project Overview
**Manual Slop** is a local GUI application designed as an experimental, "manual" AI coding assistant. It allows users to curate and send context (files, screenshots, and discussion history) to AI APIs (Gemini and Anthropic). The AI can then execute PowerShell scripts within the project directory to modify files, requiring explicit user confirmation before execution.
**Main Technologies:**
* **Language:** Python 3.11+
* **Package Management:** `uv`
* **GUI Framework:** Dear PyGui (`dearpygui`), ImGui Bundle (`imgui-bundle`)
* **AI SDKs:** `google-genai` (Gemini), `anthropic`
* **Configuration:** TOML (`tomli-w`)
**Architecture:**
* **`gui.py`:** The main entry point and Dear PyGui application logic. Handles all panels, layouts, user input, and confirmation dialogs.
* **`ai_client.py`:** A unified wrapper for both Gemini and Anthropic APIs. Manages sessions, tool/function-call loops, token estimation, and context history management.
* **`aggregate.py`:** Responsible for building the `file_items` context. It reads project configurations, collects files and screenshots, and builds the context into markdown format to send to the AI.
* **`mcp_client.py`:** Implements MCP-like tools (e.g., `read_file`, `list_directory`, `search_files`, `web_search`) as native functions that the AI can call. Enforces a strict allowlist for file access.
* **`shell_runner.py`:** A sandboxed subprocess wrapper that executes PowerShell scripts (`powershell -NoProfile -NonInteractive -Command`) provided by the AI.
* **`project_manager.py`:** Manages per-project TOML configurations (`manual_slop.toml`), serializes discussion entries, and integrates with git (e.g., fetching current commit).
* **`session_logger.py`:** Handles timestamped logging of communication history (JSON-L) and tool calls (saving generated `.ps1` files).
# Building and Running
* **Setup:** The application uses `uv` for dependency management. Ensure `uv` is installed.
* **Credentials:** You must create a `credentials.toml` file in the root directory to store your API keys:
```toml
[gemini]
api_key = "****"
[anthropic]
api_key = "****"
```
* **Run the Application:**
```powershell
uv run .\gui.py
```
# Development Conventions
* **Configuration Management:** The application uses two tiers of configuration:
* `config.toml`: Global settings (UI theme, active provider, list of project paths).
* `manual_slop.toml`: Per-project settings (files to track, discussion history, specific system prompts).
* **Tool Execution:** The AI acts primarily by generating PowerShell scripts. These scripts MUST be confirmed by the user via a GUI modal before execution. The AI also has access to read-only MCP-style file exploration tools and web search capabilities.
* **Context Refresh:** After every tool call that modifies the file system, the application automatically refreshes the file contents in the context using the files' `mtime` to optimize reads.
* **UI State Persistence:** Window layouts and docking arrangements are automatically saved to and loaded from `dpg_layout.ini`.
* **Code Style:**
* Use type hints where appropriate.
* Internal methods and variables are generally prefixed with an underscore (e.g., `_flush_to_project`, `_do_generate`).
* **Logging:** All API communications are logged to `logs/comms_<ts>.log`. All executed scripts are saved to `scripts/generated/`.
+44 -7
View File
@@ -12,16 +12,16 @@ Is a local GUI tool for manually curating and sending context to AI APIs. It agg
- `uv` - package/env management
**Files:**
- `gui.py` - main GUI, `App` class, all panels, all callbacks, confirmation dialog, layout persistence, rich comms rendering
- `ai_client.py` - unified provider wrapper, model listing, session management, send, tool/function-call loop, comms log, provider error classification
- `aggregate.py` - reads config, collects files/screenshots/discussion, writes numbered `.md` files to `output_dir`
- `gui.py` - main GUI, `App` class, all panels, all callbacks, confirmation dialog, layout persistence, rich comms rendering; `[+ Maximize]` buttons in `ConfirmDialog` and `win_script_output` now pass text directly as `user_data` / read from `self._last_script` / `self._last_output` instance vars instead of `dpg.get_value(tag)` — fixes glitch when word-wrap is ON or dialog is dismissed before viewer opens
- `ai_client.py` - unified provider wrapper, model listing, session management, send, tool/function-call loop, comms log, provider error classification, token estimation, and aggressive history truncation
- `aggregate.py` - reads config, collects files/screenshots/discussion, builds `file_items` with `mtime` for cache optimization, writes numbered `.md` files to `output_dir` using `build_markdown_from_items` to avoid double I/O; `run()` returns `(markdown_str, path, file_items)` tuple; `summary_only=False` by default (full file contents sent, not heuristic summaries)
- `shell_runner.py` - subprocess wrapper that runs PowerShell scripts sandboxed to `base_dir`, returns stdout/stderr/exit code as a string
- `session_logger.py` - opens timestamped log files at session start; writes comms entries as JSON-L and tool calls as markdown; saves each AI-generated script as a `.ps1` file
- `project_manager.py` - per-project .toml load/save, entry serialisation (entry_to_str/str_to_entry with @timestamp support), default_project/default_discussion factories, migrate_from_legacy_config, flat_config for aggregate.run(), git helpers (get_git_commit, get_git_log)
- `theme.py` - palette definitions, font loading, scale, load_from_config/save_to_config
- `gemini.py` - legacy standalone Gemini wrapper (not used by the main GUI; superseded by `ai_client.py`)
- `file_cache.py` - stub; Anthropic Files API path removed; kept so stale imports don't break
- `mcp_client.py` - MCP-style read-only file tools (read_file, list_directory, search_files, get_file_summary); allowlist enforced against project file_items + base_dirs; dispatched by ai_client tool-use loop for both Anthropic and Gemini
- `mcp_client.py` - MCP-style tools (read_file, list_directory, search_files, get_file_summary, web_search, fetch_url); allowlist enforced against project file_items + base_dirs for file tools; web tools are unrestricted; dispatched by ai_client tool-use loop for both Anthropic and Gemini
- `summarize.py` - local heuristic summariser (no AI); .py via AST, .toml via regex, .md headings, generic preview; used by mcp_client.get_file_summary and aggregate.build_summary_section
- `config.toml` - global-only settings: [ai] provider+model+system_prompt, [theme] palette+font+scale, [projects] paths array + active path
- `manual_slop.toml` - per-project file: [project] name+git_dir+system_prompt+main_context, [output] namespace+output_dir, [files] base_dir+paths, [screenshots] base_dir+paths, [discussion] roles+active+[discussion.discussions.<name>] git_commit+last_updated+history
@@ -87,7 +87,7 @@ Is a local GUI tool for manually curating and sending context to AI APIs. It agg
- All tool calls (script + result/rejection) are appended to `_tool_log` and displayed in the Tool Calls panel
**Dynamic file context refresh (ai_client.py):**
- After the last tool call in each round, all project files from `file_items` are re-read from disk via `_reread_file_items()`. The `file_items` variable is reassigned so subsequent rounds see fresh content.
- After the last tool call in each round, project files from `file_items` are checked via `_reread_file_items()`. It uses `mtime` to only re-read modified files, returning only the `changed` files to build a minimal `[FILES UPDATED]` block.
- For Anthropic: the refreshed file contents are injected as a `text` block appended to the `tool_results` user message, prefixed with `[FILES UPDATED]` and an instruction not to re-read them.
- For Gemini: refreshed file contents are appended to the last function response's `output` string as a `[SYSTEM: FILES UPDATED]` block. On the next tool round, stale `[FILES UPDATED]` blocks are stripped from history and old tool outputs are truncated to `_history_trunc_limit` characters to control token growth.
- `_build_file_context_text(file_items)` formats the refreshed files as markdown code blocks (same format as the original context)
@@ -141,10 +141,12 @@ Entry layout: index + timestamp + direction + kind + provider/model header row,
- `log_tool_call(script, result, script_path)` writes the script to `scripts/generated/<ts>_<seq:04d>.ps1` and appends a markdown record to the toolcalls log without the script body (just the file path + result); uses a `threading.Lock` for the sequence counter
- `close_session()` flushes and closes both file handles; called just before `dpg.destroy_context()`
**Anthropic prompt caching:**
**Anthropic prompt caching & history management:**
- System prompt + context are combined into one string, chunked into <=120k char blocks, and sent as the `system=` parameter array. Only the LAST chunk gets `cache_control: ephemeral`, so the entire system prefix is cached as one unit.
- Last tool in `_ANTHROPIC_TOOLS` (`run_powershell`) has `cache_control: ephemeral`; this means the tools prefix is cached together with the system prefix after the first request.
- The user message is sent as a plain `[{"type": "text", "text": user_message}]` block with NO cache_control. The context lives in `system=`, not in the first user message.
- `_add_history_cache_breakpoint` places `cache_control:ephemeral` on the last content block of the second-to-last user message, using the 4th cache breakpoint to cache the conversation history prefix.
- `_trim_anthropic_history` uses token estimation (`_CHARS_PER_TOKEN = 3.5`) to keep the prompt under `_ANTHROPIC_MAX_PROMPT_TOKENS = 180_000`. It strips stale file refreshes from old turns, and drops oldest turn pairs if still over budget.
- The tools list is built once per session via `_get_anthropic_tools()` and reused across all API calls within the tool loop, avoiding redundant Python-side reconstruction.
- `_strip_cache_controls()` removes stale `cache_control` markers from all history entries before each API call, ensuring only the stable system/tools prefix consumes cache breakpoint slots.
- Cache stats (creation tokens, read tokens) are surfaced in the comms log usage dict and displayed in the Comms History panel
@@ -180,13 +182,15 @@ Entry layout: index + timestamp + direction + kind + provider/model header row,
**MCP file tools (mcp_client.py + ai_client.py):**
- Four read-only tools exposed to the AI as native function/tool declarations: `read_file`, `list_directory`, `search_files`, `get_file_summary`
- Access control: `mcp_client.configure(file_items, extra_base_dirs)` is called before each send; builds an allowlist of resolved absolute paths from the project's `file_items` plus the `base_dir`; any path that is not explicitly in the list or not under one of the allowed directories returns `ACCESS DENIED`
- `mcp_client.dispatch(tool_name, tool_input)` is the single dispatch entry point used by both Anthropic and Gemini tool-use loops
- `mcp_client.dispatch(tool_name, tool_input)` is the single dispatch entry point used by both Anthropic and Gemini tool-use loops; `TOOL_NAMES` set now includes all six tool names
- Anthropic: MCP tools appear before `run_powershell` in the tools list (no `cache_control` on them; only `run_powershell` carries `cache_control: ephemeral`)
- Gemini: MCP tools are included in the `FunctionDeclaration` list alongside `run_powershell`
- `get_file_summary` uses `summarize.summarise_file()` — same heuristic used for the initial `<context>` block, so the AI gets the same compact structural view it already knows
- `list_directory` sorts dirs before files; shows name, type, and size
- `search_files` uses `Path.glob()` with the caller-supplied pattern (supports `**/*.py` style)
- `read_file` returns raw UTF-8 text; errors (not found, access denied, decode error) are returned as error strings rather than exceptions, so the AI sees them as tool results
- `web_search(query)` queries DuckDuckGo HTML endpoint and returns the top 5 results (title, URL, snippet) as a formatted string; uses a custom `_DDGParser` (HTMLParser subclass)
- `fetch_url(url)` fetches a URL, strips HTML tags/scripts via `_TextExtractor` (HTMLParser subclass), collapses whitespace, and truncates to 40k chars to prevent context blowup; handles DuckDuckGo redirect links automatically
- `summarize.py` heuristics: `.py` → AST imports + ALL_CAPS constants + classes+methods + top-level functions; `.toml` → table headers + top-level keys; `.md` → h1–h3 headings with indentation; all others → line count + first 8 lines preview
- Comms log: MCP tool calls log `OUT/tool_call` with `{"name": ..., "args": {...}}` and `IN/tool_result` with `{"name": ..., "output": ...}`; rendered in the Comms History panel via `_render_payload_tool_call` (shows each arg key/value) and `_render_payload_tool_result` (shows output)
@@ -199,7 +203,9 @@ Entry layout: index + timestamp + direction + kind + provider/model header row,
### Gemini Context Management
- Gemini uses explicit caching via `client.caches.create()` to store the `system_instruction` + tools as an immutable cached prefix with a 1-hour TTL. The cache is created once per chat session.
- Proactively rebuilds cache at 90% of `_GEMINI_CACHE_TTL = 3600` to avoid stale-reference errors.
- When context changes (detected via `md_content` hash), the old cache is deleted, a new cache is created, and chat history is migrated to a fresh chat session pointing at the new cache.
- Trims history by dropping oldest pairs if input tokens exceed `_GEMINI_MAX_INPUT_TOKENS = 900_000`.
- If cache creation fails (e.g., content is under the minimum token threshold — 1024 for Flash, 4096 for Pro), the system falls back to inline `system_instruction` in the chat config. Implicit caching may still provide cost savings in this case.
- The `<context>` block lives inside `system_instruction`, NOT in user messages, preventing history bloat across turns.
- On cleanup/exit, active caches are deleted via `ai_client.cleanup()` to prevent orphaned billing.
@@ -244,3 +250,34 @@ Documentation has been completely rewritten matching the strict, structural form
- `docs/guide_architecture.md`: Details the Python implementation algorithms, queue management for UI rendering, the specific AST heuristics used for context aggregation, and the distinct algorithms for trimming Anthropic history vs Gemini state caching.
- `docs/Readme.md`: The core interface manual.
- `docs/guide_tools.md`: Security architecture for `_is_allowed` paths and definitions of the read-only vs destructive tool pipeline.
## Updates (2026-02-22 — ai_client.py & aggregate.py)
### mcp_client.py — Web Tools Added
- `web_search(query)` and `fetch_url(url)` added as two new MCP tools alongside the existing four file tools.
- `TOOL_NAMES` set updated to include all six tool names for dispatch routing.
- `MCP_TOOL_SPECS` list extended with full JSON schema definitions for both web tools.
- Both tools are declared in `_build_anthropic_tools()` and `_gemini_tool_declaration()` so they are available to both providers.
- Web tools bypass the `_is_allowed` path check (no filesystem access); file tools retain the allowlist enforcement.
### aggregate.py — run() double-I/O elimination
- `run()` now calls `build_file_items()` once, then passes the result to `build_markdown_from_items()` instead of calling `build_files_section()` separately. This avoids reading every file twice per send.
- `build_markdown_from_items()` accepts a `summary_only` flag (default `False`); when `False` it inlines full file content; when `True` it delegates to `summarize.build_summary_markdown()` for compact structural summaries.
- `run()` returns a 3-tuple `(markdown_str, output_path, file_items)` — the `file_items` list is passed through to `gui.py` as `self.last_file_items` for dynamic context refresh after tool calls.
## Updates (2026-02-22 — gui.py [+ Maximize] bug fix)
### Problem
Three `[+ Maximize]` buttons were reading their text content via `dpg.get_value(tag)` at click time:
1. `ConfirmDialog.show()` — passed `f"{self._tag}_script"` as `user_data` and called `dpg.get_value(u)` in the lambda. If the dialog was dismissed before the viewer opened, the item no longer existed and the call would fail silently or crash.
2. `win_script_output` Script `[+ Maximize]` — used `user_data="last_script_text"` and `dpg.get_value(u)`. When word-wrap is ON, `last_script_text` is hidden (`show=False`); in some DPG versions `dpg.get_value` on a hidden `input_text` returns `""`.
3. `win_script_output` Output `[+ Maximize]` — same issue with `"last_script_output"`.
### Fix
- `ConfirmDialog.show()`: changed `user_data` to `self._script` (the actual text string captured at button-creation time) and the callback to `lambda s, a, u: _show_text_viewer("Confirm Script", u)`. The text is now baked in at dialog construction, not read from a potentially-deleted widget.
- `App._append_tool_log()`: added `self._last_script = script` and `self._last_output = result` assignments so the latest values are always available as instance state.
- `win_script_output` buttons: both `[+ Maximize]` buttons now use `lambda s, a, u: _show_text_viewer("...", self._last_script/output)` directly, bypassing DPG widget state entirely.
+53 -17
View File
@@ -98,24 +98,28 @@ def build_file_items(base_dir: Path, files: list[str]) -> list[dict]:
entry : str (original config entry string)
content : str (file text, or error string)
error : bool
mtime : float (last modification time, for skip-if-unchanged optimization)
"""
items = []
for entry in files:
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True})
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error})
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime})
return items
def build_summary_section(base_dir: Path, files: list[str]) -> str:
@@ -126,8 +130,43 @@ def build_summary_section(base_dir: Path, files: list[str]) -> str:
items = build_file_items(base_dir, files)
return summarize.build_summary_markdown(items)
def build_static_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False) -> str:
def _build_files_section_from_items(file_items: list[dict]) -> str:
"""Build the files markdown section from pre-read file items (avoids double I/O)."""
sections = []
for item in file_items:
path = item.get("path")
entry = item.get("entry", "unknown")
content = item.get("content", "")
if path is None:
sections.append(f"### `{entry}`\n\n```text\n{content}\n```")
continue
suffix = path.suffix.lstrip(".") if hasattr(path, "suffix") else "text"
lang = suffix if suffix else "text"
original = entry if "*" not in entry else str(path)
sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```")
return "\n\n---\n\n".join(sections)
def build_markdown_from_items(file_items: list[dict], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
"""Build markdown from pre-read file items instead of re-reading from disk."""
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if file_items:
if summary_only:
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
else:
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:
if summary_only:
parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files))
@@ -135,12 +174,12 @@ def build_static_markdown(base_dir: Path, files: list[str], screenshot_base_dir:
parts.append("## Files\n\n" + build_files_section(base_dir, files))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
return "\n\n---\n\n".join(parts) if parts else ""
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_dynamic_markdown(history: list[str]) -> str:
return "## Discussion History\n\n" + build_discussion_section(history) if history else ""
def run(config: dict) -> tuple[str, str, Path, list[dict]]:
def run(config: dict) -> tuple[str, Path, list[dict]]:
namespace = config.get("project", {}).get("name")
if not namespace:
namespace = config.get("output", {}).get("namespace", "project")
@@ -154,21 +193,18 @@ def run(config: dict) -> tuple[str, str, Path, list[dict]]:
output_dir.mkdir(parents=True, exist_ok=True)
increment = find_next_increment(output_dir, namespace)
output_file = output_dir / f"{namespace}_{increment:03d}.md"
static_md = build_static_markdown(base_dir, files, screenshot_base_dir, screenshots, summary_only=False)
dynamic_md = build_dynamic_markdown(history)
markdown = f"{static_md}\n\n---\n\n{dynamic_md}" if static_md and dynamic_md else static_md or dynamic_md
output_file.write_text(markdown, encoding="utf-8")
# Build file items once, then construct markdown from them (avoids double I/O)
file_items = build_file_items(base_dir, files)
return static_md, dynamic_md, output_file, file_items
markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history,
summary_only=False)
output_file.write_text(markdown, encoding="utf-8")
return markdown, output_file, file_items
def main():
with open("config.toml", "rb") as f:
import tomllib
config = tomllib.load(f)
static_md, dynamic_md, output_file, _ = run(config)
markdown, output_file, _ = run(config)
print(f"Written: {output_file}")
if __name__ == "__main__":
+499 -182
View File
@@ -13,10 +13,12 @@ during chat creation to avoid massive history bloat.
# ai_client.py
import tomllib
import json
import time
import datetime
from pathlib import Path
import file_cache
import mcp_client
import google.genai
_provider: str = "gemini"
_model: str = "gemini-2.5-flash"
@@ -34,6 +36,12 @@ def set_model_params(temp: float, max_tok: int, trunc_limit: int = 8000):
_gemini_client = None
_gemini_chat = None
_gemini_cache = None
_gemini_cache_md_hash: int | None = None
_gemini_cache_created_at: float | None = None
# Gemini cache TTL in seconds. Caches are created with this TTL and
# proactively rebuilt at 90% of this value to avoid stale-reference errors.
_GEMINI_CACHE_TTL = 3600
_anthropic_client = None
_anthropic_history: list[dict] = []
@@ -216,6 +224,7 @@ def cleanup():
def reset_session():
global _gemini_client, _gemini_chat, _gemini_cache
global _gemini_cache_md_hash, _gemini_cache_created_at
global _anthropic_client, _anthropic_history
global _CACHED_ANTHROPIC_TOOLS
if _gemini_client and _gemini_cache:
@@ -226,11 +235,29 @@ def reset_session():
_gemini_client = None
_gemini_chat = None
_gemini_cache = None
_gemini_cache_md_hash = None
_gemini_cache_created_at = None
_anthropic_client = None
_anthropic_history = []
_CACHED_ANTHROPIC_TOOLS = None
file_cache.reset_client()
def get_gemini_cache_stats() -> dict:
"""
Retrieves statistics about the Gemini caches, such as count and total size.
"""
_ensure_gemini_client()
caches_iterator = _gemini_client.caches.list()
caches = list(caches_iterator)
total_size_bytes = sum(c.size_bytes for c in caches)
return {
"cache_count": len(list(caches)),
"total_size_bytes": total_size_bytes,
}
# ------------------------------------------------------------------ model listing
@@ -244,9 +271,9 @@ def list_models(provider: str) -> list[str]:
def _list_gemini_models(api_key: str) -> list[str]:
from google import genai
# from google import genai # Removed
try:
client = genai.Client(api_key=api_key)
client = google.genai.Client(api_key=api_key)
models = []
for m in client.models.list():
name = m.name
@@ -276,37 +303,53 @@ def _list_anthropic_models() -> list[str]:
TOOL_NAME = "run_powershell"
_agent_tools: dict = {}
def set_agent_tools(tools: dict):
global _agent_tools, _CACHED_ANTHROPIC_TOOLS
_agent_tools = tools
_CACHED_ANTHROPIC_TOOLS = None
def _build_anthropic_tools() -> list[dict]:
"""Build the full Anthropic tools list: run_powershell + MCP file tools."""
mcp_tools = []
for spec in mcp_client.MCP_TOOL_SPECS:
mcp_tools.append({
"name": spec["name"],
"description": spec["description"],
"input_schema": spec["parameters"],
})
powershell_tool = {
"name": TOOL_NAME,
"description": (
"Run a PowerShell script within the project base_dir. "
"Use this to create, edit, rename, or delete files and directories. "
"The working directory is set to base_dir automatically. "
"Always prefer targeted edits over full rewrites where possible. "
"stdout and stderr are returned to you as the result."
),
"input_schema": {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": "The PowerShell script to execute."
}
if _agent_tools.get(spec["name"], True):
mcp_tools.append({
"name": spec["name"],
"description": spec["description"],
"input_schema": spec["parameters"],
})
tools_list = mcp_tools
if _agent_tools.get(TOOL_NAME, True):
powershell_tool = {
"name": TOOL_NAME,
"description": (
"Run a PowerShell script within the project base_dir. "
"Use this to create, edit, rename, or delete files and directories. "
"The working directory is set to base_dir automatically. "
"Always prefer targeted edits over full rewrites where possible. "
"stdout and stderr are returned to you as the result."
),
"input_schema": {
"type": "object",
"properties": {
"script": {
"type": "string",
"description": "The PowerShell script to execute."
}
},
"required": ["script"]
},
"required": ["script"]
},
"cache_control": {"type": "ephemeral"},
}
return mcp_tools + [powershell_tool]
"cache_control": {"type": "ephemeral"},
}
tools_list.append(powershell_tool)
elif tools_list:
# Anthropic requires the LAST tool to have cache_control for the prefix caching to work optimally on tools
tools_list[-1]["cache_control"] = {"type": "ephemeral"}
return tools_list
_ANTHROPIC_TOOLS = _build_anthropic_tools()
@@ -322,50 +365,53 @@ def _get_anthropic_tools() -> list[dict]:
def _gemini_tool_declaration():
from google.genai import types
# from google.genai import types # Removed
declarations = []
# MCP file tools
for spec in mcp_client.MCP_TOOL_SPECS:
if not _agent_tools.get(spec["name"], True):
continue
props = {}
for pname, pdef in spec["parameters"].get("properties", {}).items():
props[pname] = types.Schema(
type=types.Type.STRING,
props[pname] = google.genai.types.Schema(
type=google.genai.types.Type.STRING,
description=pdef.get("description", ""),
)
declarations.append(types.FunctionDeclaration(
declarations.append(google.genai.types.FunctionDeclaration(
name=spec["name"],
description=spec["description"],
parameters=types.Schema(
type=types.Type.OBJECT,
parameters=google.genai.types.Schema(
type=google.genai.types.Type.OBJECT,
properties=props,
required=spec["parameters"].get("required", []),
),
))
# PowerShell tool
declarations.append(types.FunctionDeclaration(
name=TOOL_NAME,
description=(
"Run a PowerShell script within the project base_dir. "
"Use this to create, edit, rename, or delete files and directories. "
"The working directory is set to base_dir automatically. "
"stdout and stderr are returned to you as the result."
),
parameters=types.Schema(
type=types.Type.OBJECT,
properties={
"script": types.Schema(
type=types.Type.STRING,
description="The PowerShell script to execute."
)
},
required=["script"]
),
))
if _agent_tools.get(TOOL_NAME, True):
declarations.append(google.genai.types.FunctionDeclaration(
name=TOOL_NAME,
description=(
"Run a PowerShell script within the project base_dir. "
"Use this to create, edit, rename, or delete files and directories. "
"The working directory is set to base_dir automatically. "
"stdout and stderr are returned to you as the result."
),
parameters=google.genai.types.Schema(
type=google.genai.types.Type.OBJECT,
properties={
"script": google.genai.types.Schema(
type=google.genai.types.Type.STRING,
description="The PowerShell script to execute."
)
},
required=["script"]
),
))
return types.Tool(function_declarations=declarations)
return google.genai.types.Tool(function_declarations=declarations) if declarations else None
def _run_script(script: str, base_dir: str) -> str:
@@ -383,12 +429,15 @@ def _run_script(script: str, base_dir: str) -> str:
# ------------------------------------------------------------------ dynamic file context refresh
def _reread_file_items(file_items: list[dict]) -> list[dict]:
def _reread_file_items(file_items: list[dict]) -> tuple[list[dict], list[dict]]:
"""
Re-read every file in file_items from disk, returning a fresh list.
This is called after tool calls so the AI sees updated file contents.
Re-read file_items from disk, but only files whose mtime has changed.
Returns (all_items, changed_items) — all_items is the full refreshed list,
changed_items contains only the files that were actually modified since
the last read (used to build a minimal [FILES UPDATED] block).
"""
refreshed = []
changed = []
for item in file_items:
path = item.get("path")
if path is None:
@@ -397,11 +446,20 @@ def _reread_file_items(file_items: list[dict]) -> list[dict]:
from pathlib import Path as _P
p = _P(path) if not isinstance(path, _P) else path
try:
current_mtime = p.stat().st_mtime
prev_mtime = item.get("mtime", 0.0)
if current_mtime == prev_mtime:
refreshed.append(item) # unchanged — skip re-read
continue
content = p.read_text(encoding="utf-8")
refreshed.append({**item, "content": content, "error": False})
new_item = {**item, "content": content, "error": False, "mtime": current_mtime}
refreshed.append(new_item)
changed.append(new_item)
except Exception as e:
refreshed.append({**item, "content": f"ERROR re-reading {p}: {e}", "error": True})
return refreshed
err_item = {**item, "content": f"ERROR re-reading {p}: {e}", "error": True, "mtime": 0.0}
refreshed.append(err_item)
changed.append(err_item)
return refreshed, changed
def _build_file_context_text(file_items: list[dict]) -> str:
@@ -448,71 +506,115 @@ def _content_block_to_dict(block) -> dict:
def _ensure_gemini_client():
global _gemini_client
if _gemini_client is None:
from google import genai
# from google import genai # Removed
creds = _load_credentials()
_gemini_client = genai.Client(api_key=creds["gemini"]["api_key"])
_gemini_client = google.genai.Client(api_key=creds["gemini"]["api_key"])
def _send_gemini(static_md: str, dynamic_md: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str:
global _gemini_chat, _gemini_cache
from google.genai import types
def _get_gemini_history_list(chat):
if not chat: return []
# google-genai SDK stores the mutable list in _history
if hasattr(chat, "_history"):
return chat._history
if hasattr(chat, "history"):
return chat.history
if hasattr(chat, "get_history"):
return chat.get_history()
return []
def _send_gemini(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str:
global _gemini_chat, _gemini_cache, _gemini_cache_md_hash, _gemini_cache_created_at
# from google.genai import types # Removed
try:
_ensure_gemini_client(); mcp_client.configure(file_items or [], [base_dir])
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{static_md}\n</context>"
sys_instr = f"{_get_combined_system_prompt()}\n\n<context>\n{md_content}\n</context>"
tools_decl = [_gemini_tool_declaration()]
current_md_hash = hash(static_md)
# DYNAMIC CONTEXT: Check if files/context changed mid-session
current_md_hash = hash(md_content)
old_history = None
if _gemini_chat and getattr(_gemini_chat, "_last_md_hash", None) != current_md_hash:
old_history = list(_gemini_chat.history) if _gemini_chat.history else []
if _gemini_chat and _gemini_cache_md_hash != current_md_hash:
old_history = list(_get_gemini_history_list(_gemini_chat)) if _get_gemini_history_list(_gemini_chat) else []
if _gemini_cache:
try: _gemini_client.caches.delete(name=_gemini_cache.name)
except: pass
_gemini_chat, _gemini_cache = None, None
_append_comms("OUT", "request", {"message": "[STATIC CONTEXT CHANGED] Rebuilding cache and chat session..."})
_gemini_chat = None
_gemini_cache = None
_gemini_cache_created_at = None
_append_comms("OUT", "request", {"message": "[CONTEXT CHANGED] Rebuilding cache and chat session..."})
# CACHE TTL: Proactively rebuild before the cache expires server-side.
# If we don't, send_message() will reference a deleted cache and fail.
if _gemini_chat and _gemini_cache and _gemini_cache_created_at:
elapsed = time.time() - _gemini_cache_created_at
if elapsed > _GEMINI_CACHE_TTL * 0.9:
old_history = list(_get_gemini_history_list(_gemini_chat)) if _get_gemini_history_list(_gemini_chat) else []
try: _gemini_client.caches.delete(name=_gemini_cache.name)
except: pass
_gemini_chat = None
_gemini_cache = None
_gemini_cache_created_at = None
_append_comms("OUT", "request", {"message": f"[CACHE TTL] Rebuilding cache (expired after {int(elapsed)}s)..."})
if not _gemini_chat:
chat_config = types.GenerateContentConfig(
system_instruction=sys_instr, tools=tools_decl, temperature=_temperature, max_output_tokens=_max_tokens,
safety_settings=[types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
chat_config = google.genai.types.GenerateContentConfig(
system_instruction=sys_instr,
tools=tools_decl,
temperature=_temperature,
max_output_tokens=_max_tokens,
safety_settings=[google.genai.types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
)
try:
_gemini_cache = _gemini_client.caches.create(model=_model, config=types.CreateCachedContentConfig(system_instruction=sys_instr, tools=tools_decl, ttl="3600s"))
chat_config = types.GenerateContentConfig(
cached_content=_gemini_cache.name, temperature=_temperature, max_output_tokens=_max_tokens,
safety_settings=[types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
# Gemini requires 1024 (Flash) or 4096 (Pro) tokens to cache.
_gemini_cache = _gemini_client.caches.create(
model=_model,
config=google.genai.types.CreateCachedContentConfig(
system_instruction=sys_instr,
tools=tools_decl,
ttl=f"{_GEMINI_CACHE_TTL}s",
)
)
_gemini_cache_created_at = time.time()
chat_config = google.genai.types.GenerateContentConfig(
cached_content=_gemini_cache.name,
temperature=_temperature,
max_output_tokens=_max_tokens,
safety_settings=[google.genai.types.SafetySetting(category="HARM_CATEGORY_DANGEROUS_CONTENT", threshold="BLOCK_ONLY_HIGH")]
)
_append_comms("OUT", "request", {"message": f"[CACHE CREATED] {_gemini_cache.name}"})
except Exception: _gemini_cache = None
kwargs = {"model": _model, "config": chat_config}
if old_history: kwargs["history"] = old_history
_gemini_chat = _gemini_client.chats.create(**kwargs)
_gemini_chat._last_md_hash = current_md_hash
except Exception as e:
_gemini_cache = None
_gemini_cache_created_at = None
_append_comms("OUT", "request", {"message": f"[CACHE FAILED] {type(e).__name__}: {e} — falling back to inline system_instruction"})
import re
if _gemini_chat and _gemini_chat.history:
for msg in _gemini_chat.history:
kwargs = {"model": _model, "config": chat_config}
if old_history:
kwargs["history"] = old_history
_gemini_chat = _gemini_client.chats.create(**kwargs)
_gemini_cache_md_hash = current_md_hash
_append_comms("OUT", "request", {"message": f"[ctx {len(md_content)} + msg {len(user_message)}]"})
payload, all_text = user_message, []
# Strip stale file refreshes and truncate old tool outputs ONCE before
# entering the tool loop (not per-round — history entries don't change).
if _gemini_chat and _get_gemini_history_list(_gemini_chat):
for msg in _get_gemini_history_list(_gemini_chat):
if msg.role == "user" and hasattr(msg, "parts"):
for p in msg.parts:
if hasattr(p, "text") and p.text and "<discussion>" in p.text:
p.text = re.sub(r"<discussion>.*?</discussion>\n\n", "", p.text, flags=re.DOTALL)
if hasattr(p, "function_response") and p.function_response and hasattr(p.function_response, "response"):
r = p.function_response.response
r_dict = r if isinstance(r, dict) else getattr(r, "__dict__", {})
val = r_dict.get("output") if isinstance(r_dict, dict) else getattr(r, "output", None)
if isinstance(val, str):
if "[SYSTEM: FILES UPDATED]" in val: val = val.split("[SYSTEM: FILES UPDATED]")[0].strip()
if _history_trunc_limit > 0 and len(val) > _history_trunc_limit:
val = val[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS.]"
if isinstance(r, dict): r["output"] = val
else: setattr(r, "output", val)
if isinstance(r, dict) and "output" in r:
val = r["output"]
if isinstance(val, str):
if "[SYSTEM: FILES UPDATED]" in val:
val = val.split("[SYSTEM: FILES UPDATED]")[0].strip()
if _history_trunc_limit > 0 and len(val) > _history_trunc_limit:
val = val[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS.]"
r["output"] = val
full_user_msg = f"<discussion>\n{dynamic_md}\n</discussion>\n\n{user_message}" if dynamic_md else user_message
_append_comms("OUT", "request", {"message": f"[ctx {len(static_md)} static + {len(dynamic_md)} dynamic + msg {len(user_message)}]"})
payload, all_text = full_user_msg, []
for r_idx in range(MAX_TOOL_ROUNDS + 2):
resp = _gemini_chat.send_message(payload)
txt = "\n".join(p.text for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "text") and p.text)
@@ -521,27 +623,34 @@ def _send_gemini(static_md: str, dynamic_md: str, user_message: str, base_dir: s
calls = [p.function_call for c in resp.candidates if getattr(c, "content", None) for p in c.content.parts if hasattr(p, "function_call") and p.function_call]
usage = {"input_tokens": getattr(resp.usage_metadata, "prompt_token_count", 0), "output_tokens": getattr(resp.usage_metadata, "candidates_token_count", 0)}
cached_tokens = getattr(resp.usage_metadata, "cached_content_token_count", None)
if cached_tokens: usage["cache_read_input_tokens"] = cached_tokens
if cached_tokens:
usage["cache_read_input_tokens"] = cached_tokens
reason = resp.candidates[0].finish_reason.name if resp.candidates and hasattr(resp.candidates[0], "finish_reason") else "STOP"
_append_comms("IN", "response", {"round": r_idx, "stop_reason": reason, "text": txt, "tool_calls": [{"name": c.name, "args": dict(c.args)} for c in calls], "usage": usage})
# Guard: if Gemini reports input tokens approaching the limit, drop oldest history pairs
total_in = usage.get("input_tokens", 0)
if total_in > _GEMINI_MAX_INPUT_TOKENS and _gemini_chat and _gemini_chat.history:
hist = list(_gemini_chat.history)
if total_in > _GEMINI_MAX_INPUT_TOKENS and _gemini_chat and _get_gemini_history_list(_gemini_chat):
hist = _get_gemini_history_list(_gemini_chat)
dropped = 0
# Drop oldest pairs (user+model) but keep at least the last 2 entries
while len(hist) > 4 and total_in > _GEMINI_MAX_INPUT_TOKENS * 0.7:
saved = sum(len(p.text)//4 for p in hist[0].parts if hasattr(p, "text") and p.text)
for p in hist[0].parts:
if hasattr(p, "function_response") and p.function_response:
r = getattr(p.function_response, "response", {})
val = r.get("output", "") if isinstance(r, dict) else getattr(r, "output", "")
saved += len(str(val)) // 4
hist.pop(0)
total_in -= max(saved, 100)
dropped += 1
# Drop in pairs (user + model) to maintain alternating roles required by Gemini
saved = 0
for _ in range(2):
if not hist: break
for p in hist[0].parts:
if hasattr(p, "text") and p.text:
saved += len(p.text) // 4
elif hasattr(p, "function_response") and p.function_response:
r = getattr(p.function_response, "response", {})
if isinstance(r, dict):
saved += len(str(r.get("output", ""))) // 4
hist.pop(0)
dropped += 1
total_in -= max(saved, 200)
if dropped > 0:
_gemini_chat.history = hist
_append_comms("OUT", "request", {"message": f"[GEMINI HISTORY TRIMMED: dropped {dropped} old entries to stay within token budget]"})
if not calls or r_idx > MAX_TOOL_ROUNDS: break
@@ -560,11 +669,12 @@ def _send_gemini(static_md: str, dynamic_md: str, user_message: str, base_dir: s
if i == len(calls) - 1:
if file_items:
file_items = _reread_file_items(file_items)
ctx = _build_file_context_text(file_items)
if ctx: out += f"\n\n[SYSTEM: FILES UPDATED]\n\n{ctx}"
file_items, changed = _reread_file_items(file_items)
ctx = _build_file_context_text(changed)
if ctx:
out += f"\n\n[SYSTEM: FILES UPDATED]\n\n{ctx}"
if r_idx == MAX_TOOL_ROUNDS: out += "\n\n[SYSTEM: MAX ROUNDS. PROVIDE FINAL ANSWER.]"
f_resps.append(types.Part.from_function_response(name=name, response={"output": out}))
log.append({"tool_use_id": name, "content": out})
@@ -596,7 +706,15 @@ _FILE_REFRESH_MARKER = "[FILES UPDATED"
def _estimate_message_tokens(msg: dict) -> int:
"""Rough token estimate for a single Anthropic message dict."""
"""
Rough token estimate for a single Anthropic message dict.
Caches the result on the dict as '_est_tokens' so repeated calls
(e.g., from _trim_anthropic_history) don't re-scan unchanged messages.
Call _invalidate_token_estimate() when a message's content is modified.
"""
cached = msg.get("_est_tokens")
if cached is not None:
return cached
total_chars = 0
content = msg.get("content", "")
if isinstance(content, str):
@@ -614,7 +732,14 @@ def _estimate_message_tokens(msg: dict) -> int:
total_chars += len(_json.dumps(inp, ensure_ascii=False))
elif isinstance(block, str):
total_chars += len(block)
return max(1, int(total_chars / _CHARS_PER_TOKEN))
est = max(1, int(total_chars / _CHARS_PER_TOKEN))
msg["_est_tokens"] = est
return est
def _invalidate_token_estimate(msg: dict):
"""Remove the cached token estimate so the next call recalculates."""
msg.pop("_est_tokens", None)
def _estimate_prompt_tokens(system_blocks: list[dict], history: list[dict]) -> int:
@@ -626,48 +751,86 @@ def _estimate_prompt_tokens(system_blocks: list[dict], history: list[dict]) -> i
total += max(1, int(len(text) / _CHARS_PER_TOKEN))
# Tool definitions (rough fixed estimate — they're ~2k tokens for our set)
total += 2500
# History messages
# History messages (uses cached estimates for unchanged messages)
for msg in history:
total += _estimate_message_tokens(msg)
return total
def _strip_stale_file_refreshes(history: list[dict]):
"""
Remove [FILES UPDATED ...] text blocks from all history turns EXCEPT
the very last user message. These are stale snapshots from previous
tool rounds that bloat the context without providing value.
"""
if len(history) < 2:
return
last_user_idx = next((i for i in range(len(history)-1, -1, -1) if history[i].get("role") == "user"), -1)
# Find the index of the last user message — we keep its file refresh intact
last_user_idx = -1
for i in range(len(history) - 1, -1, -1):
if history[i].get("role") == "user":
last_user_idx = i
break
for i, msg in enumerate(history):
if msg.get("role") != "user" or i == last_user_idx:
continue
content = msg.get("content")
if not isinstance(content, list):
continue
cleaned = [b for b in content if not (isinstance(b, dict) and b.get("type") == "text" and b.get("text", "").startswith(_FILE_REFRESH_MARKER))]
cleaned = []
for block in content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text.startswith(_FILE_REFRESH_MARKER):
continue # drop this stale file refresh block
cleaned.append(block)
if len(cleaned) < len(content):
msg["content"] = cleaned
_invalidate_token_estimate(msg)
def _trim_anthropic_history(system_blocks: list[dict], history: list[dict]) -> int:
def _trim_anthropic_history(system_blocks: list[dict], history: list[dict]):
"""
Trim the Anthropic history to fit within the token budget.
Strategy:
1. Strip stale file-refresh injections from old turns.
2. If still over budget, drop oldest turn pairs (user + assistant).
Returns the number of messages dropped.
"""
# Phase 1: strip stale file refreshes
_strip_stale_file_refreshes(history)
est = _estimate_prompt_tokens(system_blocks, history)
if est <= _ANTHROPIC_MAX_PROMPT_TOKENS:
return 0
# Phase 2: drop oldest turn pairs until within budget
dropped = 0
while len(history) > 3 and est > _ANTHROPIC_MAX_PROMPT_TOKENS:
# Protect history[0] (original user prompt). Drop from history[1] (assistant) and history[2] (user)
if history[1].get("role") == "assistant" and len(history) > 2 and history[2].get("role") == "user":
est -= _estimate_message_tokens(history.pop(1))
est -= _estimate_message_tokens(history.pop(1))
removed_asst = history.pop(1)
removed_user = history.pop(1)
dropped += 2
est -= _estimate_message_tokens(removed_asst)
est -= _estimate_message_tokens(removed_user)
# Also drop dangling tool_results if the next message is an assistant and the removed user was just tool results
while len(history) > 2 and history[1].get("role") == "assistant" and history[2].get("role") == "user":
c = history[2].get("content", [])
if isinstance(c, list) and c and isinstance(c[0], dict) and c[0].get("type") == "tool_result":
est -= _estimate_message_tokens(history.pop(1))
est -= _estimate_message_tokens(history.pop(1))
content = history[2].get("content", [])
if isinstance(content, list) and content and isinstance(content[0], dict) and content[0].get("type") == "tool_result":
r_a = history.pop(1)
r_u = history.pop(1)
dropped += 2
else: break
est -= _estimate_message_tokens(r_a)
est -= _estimate_message_tokens(r_u)
else:
break
else:
est -= _estimate_message_tokens(history.pop(1))
# Edge case fallback: drop index 1 (protecting index 0)
removed = history.pop(1)
dropped += 1
est -= _estimate_message_tokens(removed)
return dropped
@@ -715,6 +878,28 @@ def _strip_cache_controls(history: list[dict]):
if isinstance(block, dict):
block.pop("cache_control", None)
def _add_history_cache_breakpoint(history: list[dict]):
"""
Place cache_control:ephemeral on the last content block of the
second-to-last user message. This uses one of the 4 allowed Anthropic
cache breakpoints to cache the conversation prefix so the full history
isn't reprocessed on every request.
"""
user_indices = [i for i, m in enumerate(history) if m.get("role") == "user"]
if len(user_indices) < 2:
return # Only one user message (the current turn) — nothing stable to cache
target_idx = user_indices[-2]
content = history[target_idx].get("content")
if isinstance(content, list) and content:
last_block = content[-1]
if isinstance(last_block, dict):
last_block["cache_control"] = {"type": "ephemeral"}
elif isinstance(content, str):
history[target_idx]["content"] = [
{"type": "text", "text": content, "cache_control": {"type": "ephemeral"}}
]
def _repair_anthropic_history(history: list[dict]):
"""
If history ends with an assistant message that contains tool_use blocks
@@ -747,119 +932,251 @@ def _repair_anthropic_history(history: list[dict]):
})
def _send_anthropic(static_md: str, dynamic_md: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str:
def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_items: list[dict] | None = None) -> str:
try:
_ensure_anthropic_client()
mcp_client.configure(file_items or [], [base_dir])
system_text = _get_combined_system_prompt() + f"\n\n<context>\n{static_md}\n</context>"
system_blocks = _build_chunked_context_blocks(system_text)
if dynamic_md:
system_blocks.append({"type": "text", "text": f"<discussion>\n{dynamic_md}\n</discussion>"})
# Split system into two cache breakpoints:
# 1. Stable system prompt (never changes — always a cache hit)
# 2. Dynamic file context (invalidated only when files change)
stable_prompt = _get_combined_system_prompt()
stable_blocks = [{"type": "text", "text": stable_prompt, "cache_control": {"type": "ephemeral"}}]
context_text = f"\n\n<context>\n{md_content}\n</context>"
context_blocks = _build_chunked_context_blocks(context_text)
system_blocks = stable_blocks + context_blocks
user_content = [{"type": "text", "text": user_message}]
# COMPRESS HISTORY: Truncate massive tool outputs from previous turns
for msg in _anthropic_history:
if msg.get("role") == "user" and isinstance(msg.get("content"), list):
modified = False
for block in msg["content"]:
if isinstance(block, dict) and block.get("type") == "tool_result":
t_content = block.get("content", "")
if _history_trunc_limit > 0 and isinstance(t_content, str) and len(t_content) > _history_trunc_limit:
block["content"] = t_content[:_history_trunc_limit] + "\n\n... [TRUNCATED BY SYSTEM TO SAVE TOKENS. Original output was too large.]"
modified = True
if modified:
_invalidate_token_estimate(msg)
_strip_cache_controls(_anthropic_history)
_repair_anthropic_history(_anthropic_history)
user_content[-1]["cache_control"] = {"type": "ephemeral"}
_anthropic_history.append({"role": "user", "content": user_content})
# Use the 4th cache breakpoint to cache the conversation history prefix.
# This is placed on the second-to-last user message (the last stable one).
_add_history_cache_breakpoint(_anthropic_history)
n_chunks = len(system_blocks)
_append_comms("OUT", "request", {
"message": (f"[system {n_chunks} chunk(s), {len(static_md)} static + {len(dynamic_md)} dynamic chars context] "
f"{user_message[:200]}{'...' if len(user_message) > 200 else ''}"),
"message": (
f"[system {n_chunks} chunk(s), {len(md_content)} chars context] "
f"{user_message[:200]}{'...' if len(user_message) > 200 else ''}"
),
})
all_text_parts = []
# We allow MAX_TOOL_ROUNDS, plus 1 final loop to get the text synthesis
for round_idx in range(MAX_TOOL_ROUNDS + 2):
# Trim history to fit within token budget before each API call
dropped = _trim_anthropic_history(system_blocks, _anthropic_history)
if dropped > 0:
est_tokens = _estimate_prompt_tokens(system_blocks, _anthropic_history)
_append_comms("OUT", "request", {"message": f"[HISTORY TRIMMED: dropped {dropped} old messages to fit token budget. Estimated {est_tokens} tokens remaining.]"})
_append_comms("OUT", "request", {
"message": (
f"[HISTORY TRIMMED: dropped {dropped} old messages to fit token budget. "
f"Estimated {est_tokens} tokens remaining. {len(_anthropic_history)} messages in history.]"
),
})
def _strip_private_keys(history):
return [{k: v for k, v in m.items() if not k.startswith("_")} for m in history]
response = _anthropic_client.messages.create(
model=_model, max_tokens=_max_tokens, temperature=_temperature,
system=system_blocks, tools=_get_anthropic_tools(), messages=_anthropic_history,
model=_model,
max_tokens=_max_tokens,
temperature=_temperature,
system=system_blocks,
tools=_get_anthropic_tools(),
messages=_strip_private_keys(_anthropic_history),
)
# Convert SDK content block objects to plain dicts before storing in history
serialised_content = [_content_block_to_dict(b) for b in response.content]
_anthropic_history.append({"role": "assistant", "content": serialised_content})
_anthropic_history.append({
"role": "assistant",
"content": serialised_content,
})
text_blocks = [b.text for b in response.content if hasattr(b, "text") and b.text]
if text_blocks: all_text_parts.append("\n".join(text_blocks))
if text_blocks:
all_text_parts.append("\n".join(text_blocks))
tool_use_blocks = [{"id": b.id, "name": b.name, "input": b.input} for b in response.content if getattr(b, "type", None) == "tool_use"]
tool_use_blocks = [
{"id": b.id, "name": b.name, "input": b.input}
for b in response.content
if getattr(b, "type", None) == "tool_use"
]
usage_dict = {}
usage_dict: dict = {}
if response.usage:
usage_dict.update({"input_tokens": response.usage.input_tokens, "output_tokens": response.usage.output_tokens})
if getattr(response.usage, "cache_creation_input_tokens", None) is not None:
usage_dict["cache_creation_input_tokens"] = response.usage.cache_creation_input_tokens
if getattr(response.usage, "cache_read_input_tokens", None) is not None:
usage_dict["cache_read_input_tokens"] = response.usage.cache_read_input_tokens
usage_dict["input_tokens"] = response.usage.input_tokens
usage_dict["output_tokens"] = response.usage.output_tokens
cache_creation = getattr(response.usage, "cache_creation_input_tokens", None)
cache_read = getattr(response.usage, "cache_read_input_tokens", None)
if cache_creation is not None:
usage_dict["cache_creation_input_tokens"] = cache_creation
if cache_read is not None:
usage_dict["cache_read_input_tokens"] = cache_read
_append_comms("IN", "response", {"round": round_idx, "stop_reason": response.stop_reason, "text": "\n".join(text_blocks), "tool_calls": tool_use_blocks, "usage": usage_dict})
_append_comms("IN", "response", {
"round": round_idx,
"stop_reason": response.stop_reason,
"text": "\n".join(text_blocks),
"tool_calls": tool_use_blocks,
"usage": usage_dict,
})
if response.stop_reason != "tool_use" or not tool_use_blocks: break
if round_idx > MAX_TOOL_ROUNDS: break
if response.stop_reason != "tool_use" or not tool_use_blocks:
break
if round_idx > MAX_TOOL_ROUNDS:
# The model ignored the MAX ROUNDS warning and kept calling tools.
# Force abort to prevent infinite loop.
break
tool_results = []
for block in response.content:
if getattr(block, "type", None) != "tool_use": continue
b_name, b_id, b_input = getattr(block, "name", None), getattr(block, "id", ""), getattr(block, "input", {})
if getattr(block, "type", None) != "tool_use":
continue
b_name = getattr(block, "name", None)
b_id = getattr(block, "id", "")
b_input = getattr(block, "input", {})
if b_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
out = mcp_client.dispatch(b_name, b_input)
output = mcp_client.dispatch(b_name, b_input)
_append_comms("IN", "tool_result", {"name": b_name, "id": b_id, "output": output})
tool_results.append({
"type": "tool_result",
"tool_use_id": b_id,
"content": output,
})
elif b_name == TOOL_NAME:
scr = b_input.get("script", "")
_append_comms("OUT", "tool_call", {"name": TOOL_NAME, "id": b_id, "script": scr})
out = _run_script(scr, base_dir)
else: out = f"ERROR: unknown tool '{b_name}'"
_append_comms("IN", "tool_result", {"name": b_name, "id": b_id, "output": out})
tool_results.append({"type": "tool_result", "tool_use_id": b_id, "content": out})
script = b_input.get("script", "")
_append_comms("OUT", "tool_call", {
"name": TOOL_NAME,
"id": b_id,
"script": script,
})
output = _run_script(script, base_dir)
_append_comms("IN", "tool_result", {
"name": TOOL_NAME,
"id": b_id,
"output": output,
})
tool_results.append({
"type": "tool_result",
"tool_use_id": b_id,
"content": output,
})
# Refresh file context after tool calls — only inject CHANGED files
if file_items:
file_items = _reread_file_items(file_items)
refreshed_ctx = _build_file_context_text(file_items)
file_items, changed = _reread_file_items(file_items)
refreshed_ctx = _build_file_context_text(changed)
if refreshed_ctx:
tool_results.append({"type": "text", "text": f"[{_FILE_REFRESH_MARKER} — current contents below. Do NOT re-read these files with PowerShell.]\n\n{refreshed_ctx}"})
tool_results.append({
"type": "text",
"text": (
"[FILES UPDATED — current contents below. "
"Do NOT re-read these files with PowerShell.]\n\n"
+ refreshed_ctx
),
})
if round_idx == MAX_TOOL_ROUNDS:
tool_results.append({"type": "text", "text": "SYSTEM WARNING: MAX TOOL ROUNDS REACHED. YOU MUST PROVIDE YOUR FINAL ANSWER NOW WITHOUT CALLING ANY MORE TOOLS."})
tool_results.append({
"type": "text",
"text": "SYSTEM WARNING: MAX TOOL ROUNDS REACHED. YOU MUST PROVIDE YOUR FINAL ANSWER NOW WITHOUT CALLING ANY MORE TOOLS."
})
_anthropic_history.append({"role": "user", "content": tool_results})
_append_comms("OUT", "tool_result_send", {"results": [{"tool_use_id": r["tool_use_id"], "content": r["content"]} for r in tool_results if r.get("type") == "tool_result"]})
_anthropic_history.append({
"role": "user",
"content": tool_results,
})
_append_comms("OUT", "tool_result_send", {
"results": [
{"tool_use_id": r["tool_use_id"], "content": r["content"]}
for r in tool_results if r.get("type") == "tool_result"
],
})
final_text = "\n\n".join(all_text_parts)
return final_text if final_text.strip() else "(No text returned by the model)"
except ProviderError: raise
except Exception as exc: raise _classify_anthropic_error(exc) from exc
except ProviderError:
raise
except Exception as exc:
raise _classify_anthropic_error(exc) from exc
# ------------------------------------------------------------------ unified send
def send(
static_md: str,
dynamic_md: str,
md_content: str,
user_message: str,
base_dir: str = ".",
file_items: list[dict] | None = None,
) -> str:
"""Send a message to the active provider."""
"""
Send a message to the active provider.
md_content : aggregated markdown string from aggregate.run()
user_message: the user question / instruction
base_dir : project base directory (for PowerShell tool calls)
file_items : list of file dicts from aggregate.build_file_items() for
dynamic context refresh after tool calls
"""
if _provider == "gemini":
return _send_gemini(static_md, dynamic_md, user_message, base_dir, file_items)
return _send_gemini(md_content, user_message, base_dir, file_items)
elif _provider == "anthropic":
return _send_anthropic(static_md, dynamic_md, user_message, base_dir, file_items)
return _send_anthropic(md_content, user_message, base_dir, file_items)
raise ValueError(f"unknown provider: {_provider}")
def get_history_bleed_stats() -> dict:
"""
Calculates how close the current conversation history is to the token limit.
"""
if _provider == "anthropic":
# For Anthropic, we have a robust estimator
current_tokens = _estimate_prompt_tokens([], _anthropic_history)
limit_tokens = _ANTHROPIC_MAX_PROMPT_TOKENS
percentage = (current_tokens / limit_tokens) * 100 if limit_tokens > 0 else 0
return {
"provider": "anthropic",
"limit": limit_tokens,
"current": current_tokens,
"percentage": percentage,
}
elif _provider == "gemini":
# For Gemini, token estimation is complex and handled by the server.
# We don't have a reliable client-side estimate, so we return a
# "not implemented" state for now.
return {
"provider": "gemini",
"limit": _GEMINI_MAX_INPUT_TOKENS,
"current": 0,
"percentage": 0,
}
# Default empty state
return {
"provider": _provider,
"limit": 0,
"current": 0,
"percentage": 0,
}
+48
View File
@@ -0,0 +1,48 @@
import requests
import json
class ApiHookClient:
def __init__(self, base_url="http://127.0.0.1:8999"):
self.base_url = base_url
def _make_request(self, method, endpoint, data=None):
url = f"{self.base_url}{endpoint}"
headers = {'Content-Type': 'application/json'}
try:
if method == 'GET':
response = requests.get(url, timeout=1)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=1)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
return response.json()
except requests.exceptions.Timeout:
raise requests.exceptions.Timeout(f"Request to {endpoint} timed out.")
except requests.exceptions.ConnectionError:
raise requests.exceptions.ConnectionError(f"Could not connect to API hook server at {self.base_url}.")
except requests.exceptions.HTTPError as e:
raise requests.exceptions.HTTPError(f"HTTP error {e.response.status_code} for {endpoint}: {e.response.text}")
except json.JSONDecodeError:
raise ValueError(f"Failed to decode JSON from response for {endpoint}: {response.text}")
def get_status(self):
return self._make_request('GET', '/status')
def get_project(self):
return self._make_request('GET', '/api/project')
def post_project(self, project_data):
return self._make_request('POST', '/api/project', data={'project': project_data})
def get_session(self):
return self._make_request('GET', '/api/session')
def post_session(self, session_entries):
return self._make_request('POST', '/api/session', data={'session': {'entries': session_entries}})
def post_gui(self, gui_data):
return self._make_request('POST', '/api/gui', data=gui_data)
+111
View File
@@ -0,0 +1,111 @@
import json
import threading
from http.server import HTTPServer, BaseHTTPRequestHandler
import logging
import session_logger
class HookServerInstance(HTTPServer):
"""Custom HTTPServer that carries a reference to the main App instance."""
def __init__(self, server_address, RequestHandlerClass, app):
super().__init__(server_address, RequestHandlerClass)
self.app = app
class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks."""
def do_GET(self):
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == '/status':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8'))
elif self.path == '/api/project':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'project': app.project}).encode('utf-8'))
elif self.path == '/api/session':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'session': {'entries': app.disc_entries}}).
encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
def do_POST(self):
app = self.server.app
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
body_str = body.decode('utf-8') if body else ""
session_logger.log_api_hook("POST", self.path, body_str)
try:
data = json.loads(body_str) if body_str else {}
if self.path == '/api/project':
app.project = data.get('project', app.project)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path == '/api/session':
app.disc_entries = data.get('session', {}).get(
'entries', app.disc_entries)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path == '/api/gui':
if not hasattr(app, '_pending_gui_tasks'):
app._pending_gui_tasks = []
if not hasattr(app, '_pending_gui_tasks_lock'):
app._pending_gui_tasks_lock = threading.Lock()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append(data)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(
json.dumps({'status': 'queued'}).encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
def log_message(self, format, *args):
logging.info("Hook API: " + format % args)
class HookServer:
def __init__(self, app, port=8999):
self.app = app
self.port = port
self.server = None
self.thread = None
def start(self):
if not getattr(self.app, 'test_hooks_enabled', False):
return
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
logging.info(f"Hook server started on port {self.port}")
def stop(self):
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join()
logging.info("Hook server stopped")
@@ -0,0 +1,5 @@
# Track api_hooks_verification_20260223 Context
- [Specification](./spec.md)
- [Implementation Plan](./plan.md)
- [Metadata](./metadata.json)
@@ -0,0 +1,8 @@
{
"track_id": "api_hooks_verification_20260223",
"type": "feature",
"status": "new",
"created_at": "2026-02-23T17:46:51Z",
"updated_at": "2026-02-23T17:46:51Z",
"description": "Update conductor to properly utilize the new api hooks for automated testing & verification of track implementation features without the need of user intervention."
}
@@ -0,0 +1,19 @@
# Implementation Plan: Integrate API Hooks for Automated Track Verification
## Phase 1: Update Workflow Definition [checkpoint: f17c9e3]
- [x] Task: Modify `conductor/workflow.md` to reflect the new automated verification process. [2ec1ecf]
- [ ] Sub-task: Update the "Phase Completion Verification and Checkpointing Protocol" section to replace manual verification steps with a description of the automated API hook process.
- [ ] Sub-task: Ensure the updated workflow clearly states that the agent will announce the automated test, execute it, and then present the results (success or failure) to the user.
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Update Workflow Definition' (Protocol in workflow.md)
## Phase 2: Implement Automated Verification Logic [checkpoint: b575dcd]
- [x] Task: Develop the client-side logic for communicating with the API hook server. [f4a9ff8]
- [ ] Sub-task: Write failing unit tests for a new `ApiHookClient` that can send requests to the IPC server.
- [ ] Sub-task: Implement the `ApiHookClient` to make the tests pass.
- [x] Task: Integrate the `ApiHookClient` into the Conductor agent's workflow. [c7c8b89]
- [ ] Sub-task: Write failing integration tests to ensure the Conductor's phase completion logic calls the `ApiHookClient`.
- [ ] Sub-task: Modify the workflow implementation to use the `ApiHookClient` for verification.
- [x] Task: Implement result handling and user feedback. [94b4f38]
- [ ] Sub-task: Write failing tests for handling success, failure, and server-unavailable scenarios.
- [ ] Sub-task: Implement the logic to log results, present them to the user, and halt the workflow on failure.
- [ ] Task: Conductor - User Manual Verification 'Phase 2: Implement Automated Verification Logic' (Protocol in workflow.md)
@@ -0,0 +1,21 @@
# Specification: Integrate API Hooks for Automated Track Verification
## Overview
This track focuses on integrating the existing, previously implemented API hooks (from track `test_hooks_20260223`) into the Conductor workflow. The primary goal is to automate the verification steps within the "Phase Completion Verification and Checkpointing Protocol", reducing the need for manual user intervention and enabling a more streamlined, automated development process.
## Functional Requirements
- **Workflow Integration:** The `workflow.md` document, specifically the "Phase Completion Verification and Checkpointing Protocol," must be updated to replace manual verification steps with automated checks using the API hooks.
- **IPC Communication:** The updated workflow will communicate with the application's backend via the established IPC server to trigger verification tasks.
- **Result Handling:**
- All results from the API hook verifications must be logged for auditing and debugging purposes.
- Upon successful verification, the Conductor agent will proceed with the workflow as it currently does after a successful manual check.
- Upon failure, the agent will halt, present the failure logs to the user, and await further instructions.
- **User Interaction Model:** The system will transition from asking the user to perform a manual test to informing the user that an automated test is running, and then presenting the results.
## Non-Functional Requirements
- **Resilience:** The Conductor agent must handle cases where the API hook server is unavailable or a hook call fails unexpectedly, without crashing or entering an unrecoverable state.
- **Transparency:** All interactions with the API hooks must be clearly logged, making the automated process easy to monitor and debug.
## Out of Scope
- **Modifying API Hooks:** This track will not alter the existing API hooks, the IPC server, or the backend implementation. The focus is solely on the client-side integration within the Conductor agent's workflow.
- **Changes to Manual Overrides:** Users will retain the ability to manually intervene or bypass automated checks if necessary.
@@ -0,0 +1,5 @@
# Track context_management_20260223 Context
- [Specification](./spec.md)
- [Implementation Plan](./plan.md)
- [Metadata](./metadata.json)
@@ -0,0 +1,8 @@
{
"track_id": "context_management_20260223",
"type": "feature",
"status": "new",
"created_at": "2026-02-23T10:00:00Z",
"updated_at": "2026-02-23T10:00:00Z",
"description": "Implement context visualization and memory management improvements"
}
@@ -0,0 +1,19 @@
# Implementation Plan
## Phase 1: Context Memory and Token Visualization [checkpoint: a88311b]
- [x] Task: Implement token usage summary widget e34ff7e
- [ ] Sub-task: Write Tests
- [ ] Sub-task: Implement Feature
- [x] Task: Expose history truncation controls in the Discussion panel 94fe904
- [ ] Sub-task: Write Tests
- [ ] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 1: Context Memory and Token Visualization' (Protocol in workflow.md) a88311b
## Phase 2: Agent Capability Configuration [checkpoint: 1ac6eb9]
- [x] Task: Add UI toggles for available tools per-project 1677d25
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Wire tool toggles to AI provider tool declaration payload 92aa33c
- [ ] Sub-task: Write Tests
- [ ] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 2: Agent Capability Configuration' (Protocol in workflow.md) 1ac6eb9
@@ -0,0 +1,9 @@
# Specification: Context Visualization and Memory Management
## Overview
This track implements UI improvements and structural changes to Manual Slop to provide explicit visualization of context memory usage and token consumption, fulfilling the "Expert systems level utility" and "Full control" product goals.
## Core Objectives
1. **Token Visualization:** Expose token usage metrics in real-time within the GUI (e.g., in a dedicated metrics panel or augmented Comms panel).
2. **Context Memory Management:** Provide tools to manually flush, persist, or truncate history to manage token budgets per-discussion.
3. **Agent Capability Toggles:** Expose explicit configuration options for agent capabilities (e.g., toggle MCP tools on/off) from the UI.
@@ -0,0 +1,5 @@
# Track test_hooks_20260223 Context
- [Specification](./spec.md)
- [Implementation Plan](./plan.md)
- [Metadata](./metadata.json)
@@ -0,0 +1,8 @@
{
"track_id": "test_hooks_20260223",
"type": "feature",
"status": "new",
"created_at": "2026-02-23T10:00:00Z",
"updated_at": "2026-02-23T10:00:00Z",
"description": "Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing."
}
@@ -0,0 +1,25 @@
# Implementation Plan
## Phase 1: Foundation and Opt-in Mechanisms [checkpoint: 2bc7a3f]
- [x] Task: Implement CLI flag/env-var to enable the hook system [1306163]
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Set up lightweight local IPC server (e.g., standard library socket/HTTP) for receiving hook commands [44c2585]
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 1: Foundation and Opt-in Mechanisms' (Protocol in workflow.md) [2bc7a3f]
## Phase 2: Hook Implementations and Logging [checkpoint: eaf229e]
- [x] Task: Implement project and AI session state manipulation hooks [d9d056c]
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Implement GUI state manipulation hooks with thread-safe queueing [5f9bc19]
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Integrate aggressive logging for all hook invocations [ef29902]
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 2: Hook Implementations and Logging' (Protocol in workflow.md) [eaf229e]
## Phase: Review Fixes
- [x] Task: Apply review suggestions [dc64493]
@@ -0,0 +1,21 @@
# Specification: Add full api/hooks so that gemini cli can test, interact, and manipulate the state of the gui & program backend for automated testing
## Overview
This track introduces a comprehensive suite of API hooks designed specifically for the Gemini CLI and the Conductor framework. These hooks will allow automated agents to manipulate and test the internal state of the application without requiring manual GUI interaction, enabling automated test-driven development and track progression validation.
## Use Cases
- **Automated Testing & Progression:** Expose low-level state manipulation hooks so that the Gemini CLI + Conductor can autonomously verify track completion, test UI logic, and validate backend states.
## Functional Requirements
- **Comprehensive Access:** The hooks must provide full, unrestricted access to the entire program, including:
- GUI state (Dear PyGui nodes, values, layout data).
- AI session state (history, active caches, tool configurations).
- Project configurations and discussion state.
- **Security & Logging:** The hook system MUST be strictly opt-in (e.g., enabled via a specific command-line argument like `--enable-test-hooks` or an environment variable). When enabled, any invocation of these hooks MUST be aggressively logged to ensure transparency.
## Non-Functional Requirements
- **Thread Safety:** Hooks interacting with the GUI state must respect the main render loop locks and threading model defined in the architecture guidelines.
- **Dependency Minimalism:** The hook interface should utilize built-in mechanisms (like sockets, a lightweight local HTTP server, or standard inter-process communication) without introducing heavy external web frameworks.
## Out of Scope
- Building the actual Gemini CLI or Conductor automation logic itself; this track only builds the *hooks* within Manual Slop that those external agents will consume.
+37
View File
@@ -0,0 +1,37 @@
# Google Python Style Guide Summary
This document summarizes key rules and best practices from the Google Python Style Guide.
## 1. Python Language Rules
- **Linting:** Run `pylint` on your code to catch bugs and style issues.
- **Imports:** Use `import x` for packages/modules. Use `from x import y` only when `y` is a submodule.
- **Exceptions:** Use built-in exception classes. Do not use bare `except:` clauses.
- **Global State:** Avoid mutable global state. Module-level constants are okay and should be `ALL_CAPS_WITH_UNDERSCORES`.
- **Comprehensions:** Use for simple cases. Avoid for complex logic where a full loop is more readable.
- **Default Argument Values:** Do not use mutable objects (like `[]` or `{}`) as default values.
- **True/False Evaluations:** Use implicit false (e.g., `if not my_list:`). Use `if foo is None:` to check for `None`.
- **Type Annotations:** Strongly encouraged for all public APIs.
## 2. Python Style Rules
- **Line Length:** Maximum 80 characters.
- **Indentation:** 4 spaces per indentation level. Never use tabs.
- **Blank Lines:** Two blank lines between top-level definitions (classes, functions). One blank line between method definitions.
- **Whitespace:** Avoid extraneous whitespace. Surround binary operators with single spaces.
- **Docstrings:** Use `"""triple double quotes"""`. Every public module, function, class, and method must have a docstring.
- **Format:** Start with a one-line summary. Include `Args:`, `Returns:`, and `Raises:` sections.
- **Strings:** Use f-strings for formatting. Be consistent with single (`'`) or double (`"`) quotes.
- **`TODO` Comments:** Use `TODO(username): Fix this.` format.
- **Imports Formatting:** Imports should be on separate lines and grouped: standard library, third-party, and your own application's imports.
## 3. Naming
- **General:** `snake_case` for modules, functions, methods, and variables.
- **Classes:** `PascalCase`.
- **Constants:** `ALL_CAPS_WITH_UNDERSCORES`.
- **Internal Use:** Use a single leading underscore (`_internal_variable`) for internal module/class members.
## 4. Main
- All executable files should have a `main()` function that contains the main logic, called from a `if __name__ == '__main__':` block.
**BE CONSISTENT.** When editing code, match the existing style.
*Source: [Google Python Style Guide](https://google.github.io/styleguide/pyguide.html)*
+14
View File
@@ -0,0 +1,14 @@
# Project Context
## Definition
- [Product Definition](./product.md)
- [Product Guidelines](./product-guidelines.md)
- [Tech Stack](./tech-stack.md)
## Workflow
- [Workflow](./workflow.md)
- [Code Style Guides](./code_styleguides/)
## Management
- [Tracks Registry](./tracks.md)
- [Tracks Directory](./tracks/)
+15
View File
@@ -0,0 +1,15 @@
# Product Guidelines: Manual Slop
## Documentation Style
- **Strict & In-Depth:** Documentation must follow an old-school, highly detailed technical breakdown style (similar to VEFontCache-Odin). Focus on architectural design, state management, algorithmic details, and structural formats rather than just surface-level usage.
## UX & UI Principles
- **USA Graphics Company Values:** Embrace high information density and tactile interactions.
- **Arcade Aesthetics:** Utilize arcade game-style visual feedback for state updates (e.g., blinking notifications for tool execution and AI responses) to make the experience fun, visceral, and engaging.
- **Explicit Control & Expert Focus:** The interface should not hold the user's hand. It must prioritize explicit manual confirmation for destructive actions while providing dense, unadulterated access to logs and context.
- **Multi-Viewport Capabilities:** Leverage dockable, floatable panels to allow users to build custom workspaces suitable for multi-monitor setups.
## Code Standards & Architecture
- **Strict State Management:** There must be a rigorous separation between the Main GUI rendering thread and daemon execution threads. The UI should *never* hang during AI communication or script execution. Use lock-protected queues and events for synchronization.
- **Comprehensive Logging:** Aggressively log all actions, API payloads, tool calls, and executed scripts. Maintain timestamped JSON-L and markdown logs to ensure total transparency and debuggability.
- **Dependency Minimalism:** Limit external dependencies where possible. For instance, prefer standard library modules (like `urllib` and `html.parser` for web tools) over heavy third-party packages.
+15
View File
@@ -0,0 +1,15 @@
# Product Guide: Manual Slop
## Vision
To serve as an expert-level utility for personal developer use on small projects, providing full, manual control over vendor API metrics, agent capabilities, and context memory usage.
## Primary Use Cases
- **Full Control over Vendor APIs:** Exposing detailed API metrics and configuring deep agent capabilities directly within the GUI.
- **Context & Memory Management:** Better visualization and management of token usage and context memory, allowing developers to optimize prompt limits manually.
- **Manual "Vibe Coding" Assistant:** Serving as an auxiliary, multi-provider assistant that natively interacts with the codebase via sandboxed PowerShell scripts and MCP-like file tools, emphasizing manual developer oversight and explicit confirmation.
## Key Features
- **Multi-Provider Integration:** Supports both Gemini and Anthropic with seamless switching.
- **Explicit Execution Control:** All AI-generated PowerShell scripts require explicit human confirmation via interactive UI dialogs before execution.
- **Detailed History Management:** Rich discussion history with branching, timestamping, and specific git commit linkage per conversation.
- **In-Depth Toolset Access:** MCP-like file exploration, URL fetching, search, and dynamic context aggregation embedded within a multi-viewport Dear PyGui/ImGui interface.
+1
View File
@@ -0,0 +1 @@
{"last_successful_step": "3.3_initial_track_generated"}
+16
View File
@@ -0,0 +1,16 @@
# Technology Stack: Manual Slop
## Core Language
- **Python 3.11+**
## GUI Frameworks
- **Dear PyGui:** For immediate/retained mode GUI rendering and node mapping.
- **ImGui Bundle (`imgui-bundle`):** To provide advanced multi-viewport and dockable panel capabilities on top of Dear ImGui.
## AI Integration SDKs
- **google-genai:** For Google Gemini API interaction and explicit context caching.
- **anthropic:** For Anthropic Claude API interaction, supporting ephemeral prompt caching.
## Configuration & Tooling
- **tomli-w:** For writing TOML configuration files.
- **uv:** An extremely fast Python package and project manager.
+14
View File
@@ -0,0 +1,14 @@
# Project Tracks
This file tracks all major tracks for the project. Each track has its own detailed plan in its respective folder.
---
- [x] **Track: Implement context visualization and memory management improvements**
*Link: [./tracks/context_management_20260223/](./tracks/context_management_20260223/)*
---
- [x] **Track: Review vendor api usage in regards to conservative context handling**
*Link: [./tracks/api_metrics_20260223/](./tracks/api_metrics_20260223/)*
@@ -0,0 +1,5 @@
# Track api_metrics_20260223 Context
- [Specification](./spec.md)
- [Implementation Plan](./plan.md)
- [Metadata](./metadata.json)
@@ -0,0 +1,8 @@
{
"track_id": "api_metrics_20260223",
"type": "feature",
"status": "new",
"created_at": "2026-02-23T10:00:00Z",
"updated_at": "2026-02-23T10:00:00Z",
"description": "Review vendor api usage in regards to conservative context handling"
}
@@ -0,0 +1,19 @@
# Implementation Plan
## Phase 1: Metric Extraction and Logic Review [checkpoint: 2668f88]
- [x] Task: Extract explicit cache counts and lifecycle states from Gemini SDK
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Review and expose 'history bleed' (token limit proximity) flags
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 1: Metric Extraction and Logic Review' (Protocol in workflow.md)
## Phase 2: GUI Telemetry and Plotting [checkpoint: 76582c8]
- [x] Task: Implement token budget visualizer (e.g., Progress bars for limits) in Dear PyGui
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Implement active caches data display in Provider/Comms panel
- [x] Sub-task: Write Tests
- [x] Sub-task: Implement Feature
- [x] Task: Conductor - User Manual Verification 'Phase 2: GUI Telemetry and Plotting' (Protocol in workflow.md)
@@ -0,0 +1,22 @@
# Specification: Review vendor api usage in regards to conservative context handling
## Overview
This track aims to optimize token efficiency and transparency by reviewing and improving how vendor APIs (Gemini and Anthropic) handle conservative context pruning. The primary focus is on extracting, plotting, and exposing deep metrics to the GUI so developers can intuit how close they are to API limits (e.g., token caps, cache counts, history bleed).
## Scope
- **Gemini Hooks:** Review explicit context caching, cache invalidation, and tools declaration.
- **Global Orchestration:** Review global context boundaries within the main prompt lifecycle.
- **GUI Metrics:** Expose as much metric data as possible to the user interface (e.g., plotting token usage, visual indicators for when "history bleed" occurs, displaying the number of active caches).
## Functional Requirements
- Implement extensive token and cache metric extraction from both Gemini and Anthropic API responses.
- Expose these metrics to the Dear PyGui frontend, potentially utilizing visual plots or progress bars to indicate token budget consumption.
- Implement tests to explicitly verify context rules, ensuring history pruning acts conservatively and predictable without data loss.
## Non-Functional Requirements
- Ensure GUI rendering of new plots or dense metrics does not block the main thread.
- Adhere to the "Strict State Management" product guideline.
## Out of Scope
- Major feature additions unrelated to context token management or telemetry.
- Expanding the AI's agentic capabilities (e.g., new tools).
+319
View File
@@ -0,0 +1,319 @@
# Project Workflow
## Guiding Principles
1. **The Plan is the Source of Truth:** All work must be tracked in `plan.md`
2. **The Tech Stack is Deliberate:** Changes to the tech stack must be documented in `tech-stack.md` *before* implementation
3. **Test-Driven Development:** Write unit tests before implementing functionality
4. **High Code Coverage:** Aim for >80% code coverage for all modules
5. **User Experience First:** Every decision should prioritize user experience
6. **Non-Interactive & CI-Aware:** Prefer non-interactive commands. Use `CI=true` for watch-mode tools (tests, linters) to ensure single execution.
## Task Workflow
All tasks follow a strict lifecycle:
### Standard Task Workflow
1. **Select Task:** Choose the next available task from `plan.md` in sequential order
2. **Mark In Progress:** Before beginning work, edit `plan.md` and change the task from `[ ]` to `[~]`
3. **Write Failing Tests (Red Phase):**
- Create a new test file for the feature or bug fix.
- Write one or more unit tests that clearly define the expected behavior and acceptance criteria for the task.
- **CRITICAL:** Run the tests and confirm that they fail as expected. This is the "Red" phase of TDD. Do not proceed until you have failing tests.
4. **Implement to Pass Tests (Green Phase):**
- Write the minimum amount of application code necessary to make the failing tests pass.
- Run the test suite again and confirm that all tests now pass. This is the "Green" phase.
5. **Refactor (Optional but Recommended):**
- With the safety of passing tests, refactor the implementation code and the test code to improve clarity, remove duplication, and enhance performance without changing the external behavior.
- Rerun tests to ensure they still pass after refactoring.
6. **Verify Coverage:** Run coverage reports using the project's chosen tools. For example, in a Python project, this might look like:
```bash
pytest --cov=app --cov-report=html
```
Target: >80% coverage for new code. The specific tools and commands will vary by language and framework.
7. **Document Deviations:** If implementation differs from tech stack:
- **STOP** implementation
- Update `tech-stack.md` with new design
- Add dated note explaining the change
- Resume implementation
8. **Commit Code Changes:**
- Stage all code changes related to the task.
- Propose a clear, concise commit message e.g, `feat(ui): Create basic HTML structure for calculator`.
- Perform the commit.
9. **Attach Task Summary with Git Notes:**
- **Step 9.1: Get Commit Hash:** Obtain the hash of the *just-completed commit* (`git log -1 --format="%H"`).
- **Step 9.2: Draft Note Content:** Create a detailed summary for the completed task. This should include the task name, a summary of changes, a list of all created/modified files, and the core "why" for the change.
- **Step 9.3: Attach Note:** Use the `git notes` command to attach the summary to the commit.
```bash
# The note content from the previous step is passed via the -m flag.
git notes add -m "<note content>" <commit_hash>
```
10. **Get and Record Task Commit SHA:**
- **Step 10.1: Update Plan:** Read `plan.md`, find the line for the completed task, update its status from `[~]` to `[x]`, and append the first 7 characters of the *just-completed commit's* commit hash.
- **Step 10.2: Write Plan:** Write the updated content back to `plan.md`.
11. **Commit Plan Update:**
- **Action:** Stage the modified `plan.md` file.
- **Action:** Commit this change with a descriptive message (e.g., `conductor(plan): Mark task 'Create user model' as complete`).
### Phase Completion Verification and Checkpointing Protocol
**Trigger:** This protocol is executed immediately after a task is completed that also concludes a phase in `plan.md`.
1. **Announce Protocol Start:** Inform the user that the phase is complete and the verification and checkpointing protocol has begun.
2. **Ensure Test Coverage for Phase Changes:**
- **Step 2.1: Determine Phase Scope:** To identify the files changed in this phase, you must first find the starting point. Read `plan.md` to find the Git commit SHA of the *previous* phase's checkpoint. If no previous checkpoint exists, the scope is all changes since the first commit.
- **Step 2.2: List Changed Files:** Execute `git diff --name-only <previous_checkpoint_sha> HEAD` to get a precise list of all files modified during this phase.
- **Step 2.3: Verify and Create Tests:** For each file in the list:
- **CRITICAL:** First, check its extension. Exclude non-code files (e.g., `.json`, `.md`, `.yaml`).
- For each remaining code file, verify a corresponding test file exists.
- If a test file is missing, you **must** create one. Before writing the test, **first, analyze other test files in the repository to determine the correct naming convention and testing style.** The new tests **must** validate the functionality described in this phase's tasks (`plan.md`).
3. **Execute Automated Tests with Proactive Debugging:**
- Before execution, you **must** announce the exact shell command you will use to run the tests.
- **Example Announcement:** "I will now run the automated test suite to verify the phase. **Command:** `CI=true npm test`"
- Execute the announced command.
- If tests fail, you **must** inform the user and begin debugging. You may attempt to propose a fix a **maximum of two times**. If the tests still fail after your second proposed fix, you **must stop**, report the persistent failure, and ask the user for guidance.
4. **Execute Automated API Hook Verification:**
- **CRITICAL:** The Conductor agent will now automatically execute verification tasks using the application's API hooks.
- The agent will announce the start of the automated verification to the user.
- It will then communicate with the application's IPC server to trigger the necessary verification functions.
- **Result Handling:**
- All results (successes and failures) from the API hook invocations will be logged.
- If all automated verifications pass, the agent will inform the user and proceed to the next step (Create Checkpoint Commit).
- If any automated verification fails, the agent will halt the workflow, present the detailed failure logs to the user, and await further instructions for debugging or remediation.
5. **Present Automated Verification Results and User Confirmation:**
- After executing automated verification, the Conductor agent will present the results to the user.
- If verification passed, the agent will state: "Automated verification completed successfully."
- If verification failed, the agent will state: "Automated verification failed. Please review the logs above for details. You may attempt to propose a fix a **maximum of two times**. If the tests still fail after your second proposed fix, you **must stop**, report the persistent failure, and ask the user for guidance."
- **PAUSE** and await the user's response. Do not proceed without an explicit yes or confirmation from the user to proceed if tests pass, or guidance if tests fail.
6. **Create Checkpoint Commit:**
- Stage all changes. If no changes occurred in this step, proceed with an empty commit.
- Perform the commit with a clear and concise message (e.g., `conductor(checkpoint): Checkpoint end of Phase X`).
7. **Attach Auditable Verification Report using Git Notes:**
- **Step 7.1: Draft Note Content:** Create a detailed verification report including the automated test command, the manual verification steps, and the user's confirmation.
- **Step 7.2: Attach Note:** Use the `git notes` command and the full commit hash from the previous step to attach the full report to the checkpoint commit.
8. **Get and Record Phase Checkpoint SHA:**
- **Step 8.1: Get Commit Hash:** Obtain the hash of the *just-created checkpoint commit* (`git log -1 --format="%H"`).
- **Step 8.2: Update Plan:** Read `plan.md`, find the heading for the completed phase, and append the first 7 characters of the commit hash in the format `[checkpoint: <sha>]`.
- **Step 8.3: Write Plan:** Write the updated content back to `plan.md`.
9. **Commit Plan Update:**
- **Action:** Stage the modified `plan.md` file.
- **Action:** Commit this change with a descriptive message following the format `conductor(plan): Mark phase '<PHASE NAME>' as complete`.
10. **Announce Completion:** Inform the user that the phase is complete and the checkpoint has been created, with the detailed verification report attached as a git note.
### Quality Gates
Before marking any task complete, verify:
- [ ] All tests pass
- [ ] Code coverage meets requirements (>80%)
- [ ] Code follows project's code style guidelines (as defined in `code_styleguides/`)
- [ ] All public functions/methods are documented (e.g., docstrings, JSDoc, GoDoc)
- [ ] Type safety is enforced (e.g., type hints, TypeScript types, Go types)
- [ ] No linting or static analysis errors (using the project's configured tools)
- [ ] Works correctly on mobile (if applicable)
- [ ] Documentation updated if needed
- [ ] No security vulnerabilities introduced
## Development Commands
**AI AGENT INSTRUCTION: This section should be adapted to the project's specific language, framework, and build tools.**
### Setup
```bash
# Example: Commands to set up the development environment (e.g., install dependencies, configure database)
# e.g., for a Node.js project: npm install
# e.g., for a Go project: go mod tidy
```
### Daily Development
```bash
# Example: Commands for common daily tasks (e.g., start dev server, run tests, lint, format)
# e.g., for a Node.js project: npm run dev, npm test, npm run lint
# e.g., for a Go project: go run main.go, go test ./..., go fmt ./...
```
### Before Committing
```bash
# Example: Commands to run all pre-commit checks (e.g., format, lint, type check, run tests)
# e.g., for a Node.js project: npm run check
# e.g., for a Go project: make check (if a Makefile exists)
```
## Testing Requirements
### Unit Testing
- Every module must have corresponding tests.
- Use appropriate test setup/teardown mechanisms (e.g., fixtures, beforeEach/afterEach).
- Mock external dependencies.
- Test both success and failure cases.
### Integration Testing
- Test complete user flows
- Verify database transactions
- Test authentication and authorization
- Check form submissions
### Mobile Testing
- Test on actual iPhone when possible
- Use Safari developer tools
- Test touch interactions
- Verify responsive layouts
- Check performance on 3G/4G
## Code Review Process
### Self-Review Checklist
Before requesting review:
1. **Functionality**
- Feature works as specified
- Edge cases handled
- Error messages are user-friendly
2. **Code Quality**
- Follows style guide
- DRY principle applied
- Clear variable/function names
- Appropriate comments
3. **Testing**
- Unit tests comprehensive
- Integration tests pass
- Coverage adequate (>80%)
4. **Security**
- No hardcoded secrets
- Input validation present
- SQL injection prevented
- XSS protection in place
5. **Performance**
- Database queries optimized
- Images optimized
- Caching implemented where needed
6. **Mobile Experience**
- Touch targets adequate (44x44px)
- Text readable without zooming
- Performance acceptable on mobile
- Interactions feel native
## Commit Guidelines
### Message Format
```
<type>(<scope>): <description>
[optional body]
[optional footer]
```
### Types
- `feat`: New feature
- `fix`: Bug fix
- `docs`: Documentation only
- `style`: Formatting, missing semicolons, etc.
- `refactor`: Code change that neither fixes a bug nor adds a feature
- `test`: Adding missing tests
- `chore`: Maintenance tasks
### Examples
```bash
git commit -m "feat(auth): Add remember me functionality"
git commit -m "fix(posts): Correct excerpt generation for short posts"
git commit -m "test(comments): Add tests for emoji reaction limits"
git commit -m "style(mobile): Improve button touch targets"
```
## Definition of Done
A task is complete when:
1. All code implemented to specification
2. Unit tests written and passing
3. Code coverage meets project requirements
4. Documentation complete (if applicable)
5. Code passes all configured linting and static analysis checks
6. Works beautifully on mobile (if applicable)
7. Implementation notes added to `plan.md`
8. Changes committed with proper message
9. Git note with task summary attached to the commit
## Emergency Procedures
### Critical Bug in Production
1. Create hotfix branch from main
2. Write failing test for bug
3. Implement minimal fix
4. Test thoroughly including mobile
5. Deploy immediately
6. Document in plan.md
### Data Loss
1. Stop all write operations
2. Restore from latest backup
3. Verify data integrity
4. Document incident
5. Update backup procedures
### Security Breach
1. Rotate all secrets immediately
2. Review access logs
3. Patch vulnerability
4. Notify affected users (if any)
5. Document and update security procedures
## Deployment Workflow
### Pre-Deployment Checklist
- [ ] All tests passing
- [ ] Coverage >80%
- [ ] No linting errors
- [ ] Mobile testing complete
- [ ] Environment variables configured
- [ ] Database migrations ready
- [ ] Backup created
### Deployment Steps
1. Merge feature branch to main
2. Tag release with version
3. Push to deployment service
4. Run database migrations
5. Verify deployment
6. Test critical paths
7. Monitor for errors
### Post-Deployment
1. Monitor analytics
2. Check error logs
3. Gather user feedback
4. Plan next iteration
## Continuous Improvement
- Review workflow weekly
- Update based on pain points
- Document lessons learned
- Optimize for user happiness
- Keep things simple and maintainable
+4 -4
View File
@@ -1,6 +1,6 @@
[ai]
provider = "anthropic"
model = "claude-sonnet-4-6"
provider = "gemini"
model = "gemini-2.5-flash"
temperature = 0.6000000238418579
max_tokens = 12000
history_trunc_limit = 8000
@@ -10,11 +10,11 @@ system_prompt = "DO NOT EVER make a shell script unless told to. DO NOT EVER mak
palette = "10x Dark"
font_path = "C:/Users/Ed/AppData/Local/uv/cache/archive-v0/WSthkYsQ82b_ywV6DkiaJ/pygame_gui/data/FiraCode-Regular.ttf"
font_size = 18.0
scale = 1.1
scale = 1.0
[projects]
paths = [
"manual_slop.toml",
"C:/projects/forth/bootslop/bootslop.toml",
]
active = "C:/projects/forth/bootslop/bootslop.toml"
active = "manual_slop.toml"
+3 -2
View File
@@ -29,7 +29,7 @@ Controls what is explicitly fed into the context compiler.
- **Base Dir:** Defines the root for path resolution and tool constraints.
- **Paths:** Explicit files or wildcard globs (e.g., src/**/*.rs).
- When generating a request, these files are summarized symbolically (summarize.py) to conserve tokens, unless the AI explicitly decides to read their full contents via its internal tools.
- When generating a request, full file contents are inlined into the context by default (`summary_only=False`). The AI can also call `get_file_summary` via its MCP tools to get a compact structural view of any file on demand.
## Interaction Panels
@@ -46,8 +46,9 @@ Switch between API backends (Gemini, Anthropic) on the fly. Clicking "Fetch Mode
### Global Text Viewer & Script Outputs
- **Last Script Output:** Whenever the AI executes a background script, this window pops up, flashing blue. It contains both the executed script and the stdout/stderr.
- **Last Script Output:** Whenever the AI executes a background script, this window pops up, flashing blue. It contains both the executed script and the stdout/stderr. The `[+ Maximize]` buttons read directly from stored instance variables (`_last_script`, `_last_output`) rather than DPG widget tags, so they work correctly regardless of word-wrap state.
- **Text Viewer:** A large, resizable global popup invoked anytime you click a [+] or [+ Maximize] button in the UI. Used for deep-reading long logs, discussion entries, or script bodies.
- **Confirm Dialog:** The `[+ Maximize]` button in the script approval modal passes the script text directly as `user_data` at button-creation time, so it remains safe to click even after the dialog has been dismissed.
## System Prompts
+5 -5
View File
@@ -1,4 +1,4 @@
# Guide: Architecture
# Guide: Architecture
Overview of the package design, state management, and code-path layout.
@@ -33,10 +33,9 @@ This occurs inside aggregate.run.
If using the default workflow, aggregate.py hashes through the following process:
1. **Glob Resolution:** Iterates through config["files"]["paths"] and unpacks any wildcards (e.g., src/**/*.rs) against the designated base_dir.
2. **Summarization Pass:** Instead of concatenating raw file bodies (which would quickly overwhelm the ~200k token limit over multiple rounds), the files are passed to summarize.py.
3. **AST Parsing:** summarize.py runs a heuristic pass. For Python files, it uses the standard ast module to read structural nodes (Classes, Methods, Imports, Constants). It outputs a compact Markdown table.
4. **Markdown Generation:** The final <project>_00N.md string is constructed, comprising the truncated AST summaries, the user's current project system prompt, and the active discussion branch.
5. The Markdown file is persisted to disk (./md_gen/ by default) for auditing.
2. **File Item Build:** `build_file_items()` reads each resolved file once, storing path, content, and `mtime`. This list is returned alongside the markdown so `ai_client.py` can use it for dynamic context refresh after tool calls without re-reading from disk.
3. **Markdown Generation:** `build_markdown_from_items()` assembles the final `<project>_00N.md` string. By default (`summary_only=False`) it inlines full file contents. If `summary_only=True`, it delegates to `summarize.build_summary_markdown()` which uses AST-based heuristics to produce compact structural summaries instead.
4. The Markdown file is persisted to disk (`./md_gen/` by default) for auditing. `run()` returns a 3-tuple `(markdown_str, output_path, file_items)`.
### AI Communication & The Tool Loop
@@ -85,3 +84,4 @@ All I/O bound session data is recorded sequentially. session_logger.py hooks int
- logs/comms_<ts>.log: A JSON-L structured timeline of every raw payload sent/received.
- logs/toolcalls_<ts>.log: A sequential markdown record detailing every AI tool invocation and its exact stdout result.
- scripts/generated/: Every .ps1 script approved and executed by the shell runner is physically written to disk for version control transparency.
+12 -7
View File
@@ -12,17 +12,22 @@ Implemented in mcp_client.py. These tools allow the AI to selectively expand its
### Security & Scope
Every filesystem MCP tool passes its arguments through _resolve_and_check. This function ensures that the requested path falls under one of the allowed directories defined in the GUI's Base Dir configurations.
Every **filesystem** MCP tool passes its arguments through `_resolve_and_check`. This function ensures that the requested path falls under one of the allowed directories defined in the GUI's Base Dir configurations.
If the AI attempts to read or search a path outside the project bounds, the tool safely catches the constraint violation and returns ACCESS DENIED.
The two **web tools** (`web_search`, `fetch_url`) bypass this check entirely — they have no filesystem access and are unrestricted.
### Supplied Tools:
* read_file(path): Returns the raw UTF-8 text of a file.
* list_directory(path): Returns a formatted table of a directory's contents, showing file vs dir and byte sizes.
* search_files(path, pattern): Executes an absolute glob search (e.g., **/*.py) to find specific files.
* get_file_summary(path): Invokes the local summarize.py heuristic parser to get the AST structure of a file without reading the whole body.
* web_search(query): Queries DuckDuckGo's raw HTML endpoint and returns the top 5 results (Titles, URLs, Snippets) using a native HTMLParser to avoid heavy dependencies.
* fetch_url(url): Downloads a target webpage and strips out all scripts, styling, and structural HTML, returning only the raw prose content (clamped to 40,000 characters).
**Filesystem tools** (access-controlled via `_resolve_and_check`):
* `read_file(path)`: Returns the raw UTF-8 text of a file.
* `list_directory(path)`: Returns a formatted table of a directory's contents, showing file vs dir and byte sizes.
* `search_files(path, pattern)`: Executes a glob search (e.g., `**/*.py`) within an allowed directory.
* `get_file_summary(path)`: Invokes the local `summarize.py` heuristic parser to get the AST structure of a file without reading the whole body.
**Web tools** (unrestricted — no filesystem access):
* `web_search(query)`: Queries DuckDuckGo's raw HTML endpoint and returns the top 5 results (title, URL, snippet) using a native `_DDGParser` (HTMLParser subclass) to avoid heavy dependencies.
* `fetch_url(url)`: Downloads a target webpage and strips out all scripts, styling, and structural HTML via `_TextExtractor`, returning only the raw prose content (clamped to 40,000 characters). Automatically resolves DuckDuckGo redirect links.
## 2. Destructive Execution (run_powershell)
+165 -27
View File
@@ -1,4 +1,4 @@
# gui.py
# gui.py
"""
Note(Gemini):
The main DearPyGui interface orchestrator.
@@ -14,6 +14,8 @@ import tomli_w
import threading
import time
import math
import sys
import os
from pathlib import Path
from tkinter import filedialog, Tk
import aggregate
@@ -22,6 +24,7 @@ from ai_client import ProviderError
import shell_runner
import session_logger
import project_manager
import api_hooks
import theme
CONFIG_PATH = Path("config.toml")
@@ -47,6 +50,30 @@ def hide_tk_root() -> Tk:
root.wm_attributes("-topmost", True)
return root
def get_total_token_usage() -> dict:
"""Returns aggregated token usage across the entire session from comms log."""
usage = {
"input_tokens": 0,
"output_tokens": 0,
"cache_read_input_tokens": 0,
"cache_creation_input_tokens": 0
}
for entry in ai_client.get_comms_log():
if entry.get("kind") == "response" and "usage" in entry.get("payload", {}):
u = entry["payload"]["usage"]
for k in usage.keys():
usage[k] += u.get(k, 0) or 0
return usage
def truncate_entries(entries: list[dict], max_pairs: int) -> list[dict]:
"""Truncates history to the last N pairs of User/AI messages."""
if max_pairs <= 0:
return []
target_count = max_pairs * 2
if len(entries) <= target_count:
return entries
return entries[-target_count:]
# ------------------------------------------------------------------ comms rendering helpers
@@ -121,19 +148,10 @@ def _add_kv_row(parent: str, key: str, val, val_color=None):
def _render_usage(parent: str, usage: dict):
"""Render Anthropic usage dict as a compact token table, with true totals."""
"""Render Anthropic usage dict as a compact token table."""
if not usage:
return
dpg.add_text("usage:", color=_SUBHDR_COLOR, parent=parent)
cache_read = usage.get("cache_read_input_tokens", 0)
cache_create = usage.get("cache_creation_input_tokens", 0)
raw_input = usage.get("input_tokens", 0)
total_in = cache_read + cache_create + raw_input
if total_in > raw_input:
_add_kv_row(parent, " total_input_tokens", total_in, _NUM_COLOR)
order = [
"input_tokens",
"cache_read_input_tokens",
@@ -310,9 +328,9 @@ class ConfirmDialog:
with dpg.group(horizontal=True):
dpg.add_text("Script:")
dpg.add_button(
label="[+ Maximize]",
user_data=f"{self._tag}_script",
callback=lambda s, a, u: _show_text_viewer("Confirm Script", dpg.get_value(u))
label="[+ Maximize]",
user_data=self._script,
callback=lambda s, a, u: _show_text_viewer("Confirm Script", u)
)
dpg.add_input_text(
tag=f"{self._tag}_script",
@@ -378,6 +396,12 @@ def _parse_history_entries(history: list[str], roles: list[str] | None = None) -
class App:
def __init__(self):
self.config = load_config()
# Controls whether API hooks are enabled, based on CLI arg or env var
self.test_hooks_enabled: bool = (
'--enable-test-hooks' in sys.argv or
os.environ.get('SLOP_TEST_HOOKS') == '1')
# The API hook server instance
self.hook_server: api_hooks.HookServer = api_hooks.HookServer(self)
# ---- global settings from config.toml ----
ai_cfg = self.config.get("ai", {})
@@ -441,6 +465,8 @@ class App:
self._pending_dialog_lock = threading.Lock()
self._tool_log: list[tuple[str, str]] = []
self._last_script: str = ""
self._last_output: str = ""
# Comms log entries queued from background thread for main-thread rendering
self._pending_comms: list[dict] = []
@@ -451,6 +477,12 @@ class App:
self._pending_history_adds: list[dict] = []
self._pending_history_adds_lock = threading.Lock()
# API GUI Hooks Queue
# Tasks (e.g., set_value, click) to be executed on the main DPG thread
self._pending_gui_tasks: list[dict] = []
# Lock for _pending_gui_tasks to ensure thread safety
self._pending_gui_tasks_lock = threading.Lock()
# Blink state
self._trigger_blink = False
self._is_blinking = False
@@ -572,6 +604,13 @@ class App:
dpg.set_value("auto_add_history", proj.get("discussion", {}).get("auto_add", False))
if dpg.does_item_exist("project_word_wrap"):
dpg.set_value("project_word_wrap", proj.get("project", {}).get("word_wrap", True))
agent_tools = proj.get("agent", {}).get("tools", {})
for t_name in ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]:
tag = f"tool_toggle_{t_name}"
if dpg.does_item_exist(tag):
dpg.set_value(tag, agent_tools.get(t_name, True))
self.cb_word_wrap_toggled(app_data=proj.get("project", {}).get("word_wrap", True))
def _save_active_project(self):
@@ -720,6 +759,44 @@ class App:
for entry in entries:
self._comms_entry_count += 1
self._append_comms_entry(entry, self._comms_entry_count)
if entries:
self._update_token_usage()
def _update_token_usage(self):
if not dpg.does_item_exist("ai_token_usage"):
return
usage = get_total_token_usage()
total = usage["input_tokens"] + usage["output_tokens"]
dpg.set_value("ai_token_usage", f"Tokens: {total} (In: {usage['input_tokens']} Out: {usage['output_tokens']})")
def _update_telemetry_panel(self):
"""Updates the token budget visualizer in the Provider panel."""
# Update history bleed stats for all providers
stats = ai_client.get_history_bleed_stats()
if dpg.does_item_exist("token_budget_bar"):
percentage = stats.get("percentage", 0.0)
dpg.set_value("token_budget_bar", percentage / 100.0 if percentage else 0.0)
if dpg.does_item_exist("token_budget_label"):
current = stats.get("current", 0)
limit = stats.get("limit", 0)
dpg.set_value("token_budget_label", f"{current:,} / {limit:,}")
# Update Gemini-specific cache stats
if dpg.does_item_exist("gemini_cache_label"):
if self.current_provider == "gemini":
try:
cache_stats = ai_client.get_gemini_cache_stats()
count = cache_stats.get("cache_count", 0)
size_bytes = cache_stats.get("total_size_bytes", 0)
size_kb = size_bytes / 1024.0
text = f"Gemini Caches: {count} ({size_kb:.1f} KB)"
dpg.set_value("gemini_cache_label", text)
dpg.configure_item("gemini_cache_label", show=True)
except Exception as e:
# If the API call fails, just hide the label
dpg.configure_item("gemini_cache_label", show=False)
else:
dpg.configure_item("gemini_cache_label", show=False)
def _append_comms_entry(self, entry: dict, idx: int):
if not dpg.does_item_exist("comms_scroll"):
@@ -757,6 +834,8 @@ class App:
return output
def _append_tool_log(self, script: str, result: str):
self._last_script = script
self._last_output = result
self._tool_log.append((script, result))
self._rebuild_tool_log()
@@ -839,6 +918,13 @@ class App:
if dpg.does_item_exist("project_word_wrap"):
proj["project"]["word_wrap"] = dpg.get_value("project_word_wrap")
# Agent tools
proj.setdefault("agent", {}).setdefault("tools", {})
for t_name in ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]:
tag = f"tool_toggle_{t_name}"
if dpg.does_item_exist(tag):
proj["agent"]["tools"][t_name] = dpg.get_value(tag)
# Discussion
self._flush_disc_entries_to_project()
disc_sec = proj.setdefault("discussion", {})
@@ -864,7 +950,7 @@ class App:
}
theme.save_to_config(self.config)
def _do_generate(self) -> tuple[str, str, Path, list]:
def _do_generate(self) -> tuple[str, Path, list]:
self._flush_to_project()
self._save_active_project()
self._flush_to_config()
@@ -1119,9 +1205,8 @@ class App:
def cb_md_only(self):
try:
s_md, d_md, path, _file_items = self._do_generate()
self.last_static_md = s_md
self.last_dynamic_md = d_md
md, path, _file_items = self._do_generate()
self.last_md = md
self.last_md_path = path
self._update_status(f"md written: {path.name}")
except Exception as e:
@@ -1144,9 +1229,8 @@ class App:
if self.send_thread and self.send_thread.is_alive():
return
try:
s_md, d_md, path, file_items = self._do_generate()
self.last_static_md = s_md
self.last_dynamic_md = d_md
md, path, file_items = self._do_generate()
self.last_md = md
self.last_md_path = path
self.last_file_items = file_items
except Exception as e:
@@ -1163,7 +1247,7 @@ class App:
if global_sp: combined_sp.append(global_sp.strip())
if project_sp: combined_sp.append(project_sp.strip())
ai_client.set_custom_system_prompt("\n\n".join(combined_sp))
ai_client.set_agent_tools(self.project.get("agent", {}).get("tools", {}))
temp = dpg.get_value("ai_temperature") if dpg.does_item_exist("ai_temperature") else 0.0
max_tok = dpg.get_value("ai_max_tokens") if dpg.does_item_exist("ai_max_tokens") else 8192
trunc = dpg.get_value("ai_history_trunc") if dpg.does_item_exist("ai_history_trunc") else 8000
@@ -1174,7 +1258,7 @@ class App:
if auto_add:
self._queue_history_add("User", user_msg)
try:
response = ai_client.send(getattr(self, "last_static_md", ""), getattr(self, "last_dynamic_md", ""), user_msg, base_dir, self.last_file_items)
response = ai_client.send(self.last_md, user_msg, base_dir, self.last_file_items)
self._update_response(response)
self._update_status("done")
self._trigger_blink = True
@@ -1225,6 +1309,7 @@ class App:
with self._pending_comms_lock:
self._pending_comms.clear()
self._comms_entry_count = 0
self._update_token_usage()
if dpg.does_item_exist("comms_scroll"):
dpg.delete_item("comms_scroll", children_only=True)
@@ -1327,6 +1412,12 @@ class App:
self.disc_entries.clear()
self._rebuild_disc_list()
def cb_disc_truncate(self):
pairs = dpg.get_value("disc_truncate_pairs") if dpg.does_item_exist("disc_truncate_pairs") else 2
self.disc_entries = truncate_entries(self.disc_entries, pairs)
self._rebuild_disc_list()
self._update_status(f"history truncated to {pairs} pairs")
def cb_disc_collapse_all(self):
for i, entry in enumerate(self.disc_entries):
tag = f"disc_content_{i}"
@@ -1667,6 +1758,15 @@ class App:
default_value=self.project.get("project", {}).get("word_wrap", True),
callback=self.cb_word_wrap_toggled
)
dpg.add_separator()
dpg.add_text("Agent Capabilities")
agent_tools = self.project.get("agent", {}).get("tools", {})
for t_name in ["run_powershell", "read_file", "list_directory", "search_files", "get_file_summary", "web_search", "fetch_url"]:
dpg.add_checkbox(
tag=f"tool_toggle_{t_name}",
label=f"Enable {t_name}",
default_value=agent_tools.get(t_name, True)
)
# ---- Files panel ----
with dpg.window(
@@ -1744,6 +1844,9 @@ class App:
dpg.add_button(label="+ Entry", callback=self.cb_disc_append_entry)
dpg.add_button(label="-All", callback=self.cb_disc_collapse_all)
dpg.add_button(label="+All", callback=self.cb_disc_expand_all)
dpg.add_text("Keep Pairs:", color=(160, 160, 160))
dpg.add_input_int(tag="disc_truncate_pairs", default_value=2, width=120, min_value=1)
dpg.add_button(label="Truncate", callback=self.cb_disc_truncate)
dpg.add_button(label="Clear All", callback=self.cb_disc_clear)
dpg.add_button(label="Save", callback=self.cb_disc_save)
dpg.add_checkbox(
@@ -1796,6 +1899,12 @@ class App:
callback=self.cb_model_changed,
)
dpg.add_separator()
dpg.add_text("Telemetry")
dpg.add_text("History Token Budget:", color=_LABEL_COLOR)
dpg.add_progress_bar(tag="token_budget_bar", default_value=0.0, width=-1)
dpg.add_text("0 / 0", tag="token_budget_label")
dpg.add_text("", tag="gemini_cache_label", show=False)
dpg.add_separator()
dpg.add_text("Parameters")
dpg.add_input_float(tag="ai_temperature", label="Temperature", default_value=self.temperature, min_value=0.0, max_value=2.0)
dpg.add_input_int(tag="ai_max_tokens", label="Max Tokens (Output)", default_value=self.max_tokens, step=1024)
@@ -1872,6 +1981,8 @@ class App:
with dpg.group(horizontal=True):
dpg.add_text("Status: idle", tag="ai_status", color=(200, 220, 160))
dpg.add_spacer(width=16)
dpg.add_text("Tokens: 0 (In: 0 Out: 0)", tag="ai_token_usage", color=(180, 255, 180))
dpg.add_spacer(width=16)
dpg.add_button(label="Clear", callback=self.cb_clear_comms)
dpg.add_separator()
with dpg.group(horizontal=True):
@@ -1929,8 +2040,7 @@ class App:
dpg.add_text("Script:")
dpg.add_button(
label="[+ Maximize]",
user_data="last_script_text",
callback=lambda s, a, u: _show_text_viewer("Last Script", dpg.get_value(u))
callback=lambda s, a, u: _show_text_viewer("Last Script", self._last_script),
)
dpg.add_input_text(
tag="last_script_text",
@@ -1946,8 +2056,7 @@ class App:
dpg.add_text("Output:")
dpg.add_button(
label="[+ Maximize]",
user_data="last_script_output",
callback=lambda s, a, u: _show_text_viewer("Last Output", dpg.get_value(u))
callback=lambda s, a, u: _show_text_viewer("Last Output", self._last_output),
)
dpg.add_input_text(
tag="last_script_output",
@@ -1995,6 +2104,8 @@ class App:
self._rebuild_projects_list()
self._rebuild_discussion_selector()
self._fetch_models(self.current_provider)
self.hook_server.start()
while dpg.is_dearpygui_running():
# Show any pending confirmation dialog on the main thread safely
@@ -2019,6 +2130,27 @@ class App:
# Force scroll to bottom using a very large number
dpg.set_y_scroll("disc_scroll", 99999)
# Process queued API GUI tasks
with self._pending_gui_tasks_lock:
gui_tasks = self._pending_gui_tasks[:]
self._pending_gui_tasks.clear()
for task in gui_tasks:
try:
action = task.get("action")
if action == "set_value":
item = task.get("item")
val = task.get("value")
if item and dpg.does_item_exist(item):
dpg.set_value(item, val)
elif action == "click":
item = task.get("item")
if item and dpg.does_item_exist(item):
cb = dpg.get_item_callback(item)
if cb:
cb()
except Exception as e:
print(f"Error executing GUI hook task: {e}")
# Handle retro arcade blinking effect
if self._trigger_script_blink:
self._trigger_script_blink = False
@@ -2110,6 +2242,7 @@ class App:
# Flush any comms entries queued from background threads
self._flush_pending_comms()
self._update_telemetry_panel()
dpg.render_dearpygui_frame()
@@ -2122,6 +2255,7 @@ class App:
dpg.save_init_file("dpg_layout.ini")
session_logger.close_session()
ai_client.cleanup() # Destroy active API caches to stop billing
self.hook_server.stop()
dpg.destroy_context()
@@ -2132,3 +2266,7 @@ def main():
if __name__ == "__main__":
main()
+65 -42
View File
File diff suppressed because one or more lines are too long
+11
View File
@@ -100,6 +100,17 @@ def default_project(name: str = "unnamed") -> dict:
"output": {"output_dir": "./md_gen"},
"files": {"base_dir": ".", "paths": []},
"screenshots": {"base_dir": ".", "paths": []},
"agent": {
"tools": {
"run_powershell": True,
"read_file": True,
"list_directory": True,
"search_files": True,
"get_file_summary": True,
"web_search": True,
"fetch_url": True
}
},
"discussion": {
"roles": ["User", "AI", "Vendor API", "System"],
"active": "main",
+5
View File
@@ -10,3 +10,8 @@ dependencies = [
"anthropic",
"tomli-w"
]
[dependency-groups]
dev = [
"pytest>=9.0.2",
]
+21 -2
View File
@@ -40,6 +40,7 @@ _seq_lock = threading.Lock()
_comms_fh = None # file handle: logs/comms_<ts>.log
_tool_fh = None # file handle: logs/toolcalls_<ts>.log
_api_fh = None # file handle: logs/apihooks_<ts>.log - API hook calls
def _now_ts() -> str:
@@ -52,7 +53,7 @@ def open_session():
opens the two log files for this session. Idempotent - a second call is
ignored.
"""
global _ts, _comms_fh, _tool_fh, _seq
global _ts, _comms_fh, _tool_fh, _api_fh, _seq
if _comms_fh is not None:
return # already open
@@ -65,6 +66,7 @@ def open_session():
_comms_fh = open(_LOG_DIR / f"comms_{_ts}.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_LOG_DIR / f"toolcalls_{_ts}.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_LOG_DIR / f"apihooks_{_ts}.log", "w", encoding="utf-8", buffering=1)
_tool_fh.write(f"# Tool-call log — session {_ts}\n\n")
_tool_fh.flush()
@@ -72,13 +74,30 @@ def open_session():
def close_session():
"""Flush and close both log files. Called on clean exit (optional)."""
global _comms_fh, _tool_fh
global _comms_fh, _tool_fh, _api_fh
if _comms_fh:
_comms_fh.close()
_comms_fh = None
if _tool_fh:
_tool_fh.close()
_tool_fh = None
if _api_fh:
_api_fh.close()
_api_fh = None
def log_api_hook(method: str, path: str, payload: str):
"""
Log an API hook invocation.
"""
if _api_fh is None:
return
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
try:
_api_fh.write(f"[{ts_entry}] {method} {path} - Payload: {payload}\n")
_api_fh.flush()
except Exception:
pass
def log_comms(entry: dict):
+1
View File
@@ -0,0 +1 @@
Get-Content .env | ForEach-Object { $name, $value = $_.Split('=', 2); [Environment]::SetEnvironmentVariable($name, $value, "Process") }
+17
View File
@@ -0,0 +1,17 @@
import pytest
def test_agent_capabilities_config():
# A dummy test to fulfill the Red Phase for Agent Capability Configuration.
# The new function in gui.py should be get_active_tools() or we check the project dict.
from project_manager import default_project
proj = default_project("test_proj")
# We expect 'agent' config to exist in a default project and list tools
assert "agent" in proj
assert "tools" in proj["agent"]
# By default, all tools should probably be True or defined
tools = proj["agent"]["tools"]
assert "run_powershell" in tools
assert tools["run_powershell"] is True
+23
View File
@@ -0,0 +1,23 @@
import pytest
from ai_client import set_agent_tools, _build_anthropic_tools
def test_agent_tools_wiring():
# Only enable read_file and run_powershell
agent_tools = {
"run_powershell": True,
"read_file": True,
"list_directory": False,
"search_files": False,
"get_file_summary": False,
"web_search": False,
"fetch_url": False
}
set_agent_tools(agent_tools)
anth_tools = _build_anthropic_tools()
tool_names = [t["name"] for t in anth_tools]
assert "read_file" in tool_names
assert "run_powershell" in tool_names
assert "list_directory" not in tool_names
assert "web_search" not in tool_names
+144
View File
@@ -0,0 +1,144 @@
import pytest
import requests
from unittest.mock import MagicMock, patch
import threading
import time
import json
# Import HookServer from api_hooks.py
from api_hooks import HookServer # No need for HookServerInstance, HookHandler here
from api_hook_client import ApiHookClient
@pytest.fixture(scope="module")
def hook_server_fixture():
# Mock the 'app' object that HookServer expects
mock_app = MagicMock()
mock_app.test_hooks_enabled = True # Essential for the server to start
mock_app.project = {'name': 'test_project'}
mock_app.disc_entries = [{'role': 'user', 'content': 'hello'}]
mock_app._pending_gui_tasks = []
mock_app._pending_gui_tasks_lock = threading.Lock()
# Use an ephemeral port (0) to avoid conflicts
server = HookServer(mock_app, port=0)
server.start()
# Wait a moment for the server thread to start and bind
time.sleep(0.1)
# Get the actual port assigned by the OS
actual_port = server.server.server_address[1]
# Update the base_url for the client to use the actual port
client_base_url = f"http://127.0.0.1:{actual_port}"
yield client_base_url, mock_app # Yield the base URL and the mock_app
server.stop()
def test_get_status_success(hook_server_fixture):
"""
Test that get_status successfully retrieves the server status
when the HookServer is running. This is the 'Green Phase'.
"""
base_url, _ = hook_server_fixture
client = ApiHookClient(base_url=base_url)
status = client.get_status()
assert status == {'status': 'ok'}
def test_get_project_success(hook_server_fixture):
"""
Test successful retrieval of project data.
"""
base_url, mock_app = hook_server_fixture
client = ApiHookClient(base_url=base_url)
project = client.get_project()
assert project == {'project': mock_app.project}
def test_post_project_success(hook_server_fixture):
"""Test successful posting and updating of project data."""
base_url, mock_app = hook_server_fixture
client = ApiHookClient(base_url=base_url)
new_project_data = {'name': 'updated_project', 'version': '1.0'}
response = client.post_project(new_project_data)
assert response == {'status': 'updated'}
# Verify that the mock_app.project was updated. Note: the mock_app is reused.
# The actual server state is in the real app, but for testing client, we check mock.
# This part depends on how the actual server modifies the app.project.
# For HookHandler, it does `app.project = data.get('project', app.project)`
# So, the mock_app.project will actually be the *old* value, because the mock_app
# is not the real app instance. This test is primarily for the client-server interaction.
# To test the side effect on app.project, one would need to inspect the server's app instance,
# which is not directly exposed by the fixture in a simple way.
# For now, we focus on the client's ability to send and receive the success status.
def test_get_session_success(hook_server_fixture):
"""
Test successful retrieval of session data.
"""
base_url, mock_app = hook_server_fixture
client = ApiHookClient(base_url=base_url)
session = client.get_session()
assert session == {'session': {'entries': mock_app.disc_entries}}
def test_post_session_success(hook_server_fixture):
"""
Test successful posting and updating of session data.
"""
base_url, mock_app = hook_server_fixture
client = ApiHookClient(base_url=base_url)
new_session_entries = [{'role': 'agent', 'content': 'hi'}]
response = client.post_session(new_session_entries)
assert response == {'status': 'updated'}
# Similar note as post_project about mock_app.disc_entries not being updated here.
def test_post_gui_success(hook_server_fixture):
"""
Test successful posting of GUI data.
"""
base_url, mock_app = hook_server_fixture
client = ApiHookClient(base_url=base_url)
gui_data = {'command': 'set_text', 'id': 'some_item', 'value': 'new_text'}
response = client.post_gui(gui_data)
assert response == {'status': 'queued'}
assert mock_app._pending_gui_tasks == [gui_data] # This should be updated by the server logic.
def test_get_status_connection_error_handling():
"""
Test that ApiHookClient correctly handles a connection error.
"""
client = ApiHookClient(base_url="http://127.0.0.1:1") # Use a port that is highly unlikely to be listening
with pytest.raises(requests.exceptions.Timeout):
client.get_status()
def test_post_project_server_error_handling(hook_server_fixture):
"""
Test that ApiHookClient correctly handles a server-side error (e.g., 500).
This requires mocking the server\'s response within the fixture or a specific test.
For simplicity, we\'ll simulate this by causing the HookHandler to raise an exception
for a specific path, but that\'s complex with the current fixture.
A simpler way for client-side testing is to mock the requests call directly for this scenario.
"""
base_url, _ = hook_server_fixture
client = ApiHookClient(base_url=base_url)
with patch('requests.post') as mock_post:
mock_response = MagicMock()
mock_response.status_code = 500
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("500 Server Error", response=mock_response)
mock_response.text = "Internal Server Error"
mock_post.return_value = mock_response
with pytest.raises(requests.exceptions.HTTPError) as excinfo:
client.post_project({'name': 'error_project'})
assert "HTTP error 500" in str(excinfo.value)
def test_unsupported_method_error():
"""
Test that calling an unsupported HTTP method raises a ValueError.
"""
client = ApiHookClient()
with pytest.raises(ValueError, match="Unsupported HTTP method"):
client._make_request('PUT', '/some_endpoint', data={'key': 'value'})
@@ -0,0 +1,134 @@
import pytest
from unittest.mock import MagicMock, patch
import os
import threading
import time
import json
import requests # Import requests for exception types
from api_hooks import HookServer
from api_hook_client import ApiHookClient
@pytest.fixture(scope="module")
def hook_server_fixture_for_integration():
# Mock the 'app' object that HookServer expects
mock_app = MagicMock()
mock_app.test_hooks_enabled = True # Essential for the server to start
mock_app.project = {'name': 'test_project'}
mock_app.disc_entries = [{'role': 'user', 'content': 'hello'}]
mock_app._pending_gui_tasks = []
mock_app._pending_gui_tasks_lock = threading.Lock()
# Use an ephemeral port (0) to avoid conflicts
server = HookServer(mock_app, port=0)
server.start()
time.sleep(0.1) # Wait a moment for the server thread to start and bind
actual_port = server.server.server_address[1]
client_base_url = f"http://127.0.0.1:{actual_port}"
yield client_base_url, mock_app
server.stop()
def simulate_conductor_phase_completion(client_base_url: str, mock_app: MagicMock, plan_content: str):
"""
Simulates the Conductor agent's logic for phase completion.
This function, in the *actual* implementation, will be *my* (the agent's) code.
Now includes basic result handling and simulated user feedback.
"""
print(f"Simulating Conductor phase completion. Client base URL: {client_base_url}")
client = ApiHookClient(base_url=client_base_url)
try:
status = client.get_status() # Assuming get_status is the verification call
print(f"API Hook Client status response: {status}")
if status.get('status') == 'ok':
mock_app.verification_successful = True # Simulate success flag
mock_app.verification_message = "Automated verification completed successfully."
else:
mock_app.verification_successful = False
mock_app.verification_message = f"Automated verification failed: {status}"
except requests.exceptions.Timeout:
mock_app.verification_successful = False
mock_app.verification_message = "Automated verification failed: Request timed out."
except requests.exceptions.ConnectionError:
mock_app.verification_successful = False
mock_app.verification_message = "Automated verification failed: Could not connect to API hook server."
except requests.exceptions.HTTPError as e:
mock_app.verification_successful = False
mock_app.verification_message = f"Automated verification failed: HTTP error {e.response.status_code}."
except Exception as e:
mock_app.verification_successful = False
mock_app.verification_message = f"Automated verification failed: An unexpected error occurred: {e}"
print(mock_app.verification_message)
# In a real scenario, the agent would then ask the user if they want to proceed
# if verification_successful is True, or if they want to debug/fix if False.
def test_conductor_integrates_api_hook_client_for_verification(hook_server_fixture_for_integration):
"""
Verify that Conductor's simulated phase completion logic properly integrates
and uses the ApiHookClient for verification. This test *should* pass (Green Phase)
if the integration in `simulate_conductor_phase_completion` is correct.
"""
client_base_url, mock_app = hook_server_fixture_for_integration
dummy_plan_content = """
# Implementation Plan: Test Track
## Phase 1: Initial Setup [checkpoint: abcdefg]
- [x] Task: Dummy Task 1 [1234567]
- [ ] Task: Conductor - User Manual Verification 'Phase 1: Initial Setup' (Protocol in workflow.md)
"""
# Reset mock_app's success flag for this test run
mock_app.verification_successful = False
mock_app.verification_message = ""
simulate_conductor_phase_completion(client_base_url, mock_app, dummy_plan_content)
# Assert that the verification was considered successful by the simulated Conductor
assert mock_app.verification_successful is True
assert "successfully" in mock_app.verification_message
def test_conductor_handles_api_hook_failure(hook_server_fixture_for_integration):
"""
Verify Conductor handles a simulated API hook verification failure.
This test will be 'Red' until simulate_conductor_phase_completion correctly
sets verification_successful to False and provides a failure message.
"""
client_base_url, mock_app = hook_server_fixture_for_integration
with patch.object(ApiHookClient, 'get_status', autospec=True) as mock_get_status:
# Configure mock to simulate a non-'ok' status
mock_get_status.return_value = {'status': 'failed', 'error': 'Something went wrong'}
mock_app.verification_successful = True # Reset for the test
mock_app.verification_message = ""
simulate_conductor_phase_completion(client_base_url, mock_app, "")
assert mock_app.verification_successful is False
assert "failed" in mock_app.verification_message
def test_conductor_handles_api_hook_connection_error(hook_server_fixture_for_integration):
"""
Verify Conductor handles a simulated API hook connection error.
This test will be 'Red' until simulate_conductor_phase_completion correctly
sets verification_successful to False and provides a connection error message.
"""
client_base_url, mock_app = hook_server_fixture_for_integration
with patch.object(ApiHookClient, 'get_status', autospec=True) as mock_get_status:
# Configure mock to raise a ConnectionError
mock_get_status.side_effect = requests.exceptions.ConnectionError("Mocked connection error")
mock_app.verification_successful = True # Reset for the test
mock_app.verification_message = ""
simulate_conductor_phase_completion(client_base_url, mock_app, "")
assert mock_app.verification_successful is False
assert "Could not connect" in mock_app.verification_message
+45
View File
@@ -0,0 +1,45 @@
import pytest
from unittest.mock import MagicMock, patch
# Import the necessary functions from ai_client, including the reset helper
from ai_client import get_gemini_cache_stats, reset_session
def test_get_gemini_cache_stats_with_mock_client():
"""
Test that get_gemini_cache_stats correctly processes cache lists
from a mocked client instance.
"""
# Ensure a clean state before the test by resetting the session
reset_session()
# 1. Create a mock for the cache object that the client will return
mock_cache = MagicMock()
mock_cache.name = "cachedContents/test-cache"
mock_cache.display_name = "Test Cache"
mock_cache.model = "models/gemini-1.5-pro-001"
mock_cache.size_bytes = 1024
# 2. Create a mock for the client instance
mock_client_instance = MagicMock()
# Configure its `caches.list` method to return our mock cache
mock_client_instance.caches.list.return_value = [mock_cache]
# 3. Patch the Client constructor to return our mock instance
# This intercepts the `_ensure_gemini_client` call inside the function
with patch('google.genai.Client', return_value=mock_client_instance) as mock_client_constructor:
# 4. Call the function under test
stats = get_gemini_cache_stats()
# 5. Assert that the function behaved as expected
# It should have constructed the client
mock_client_constructor.assert_called_once()
# It should have called the `list` method on the `caches` attribute
mock_client_instance.caches.list.assert_called_once()
# The returned stats dictionary should be correct
assert "cache_count" in stats
assert "total_size_bytes" in stats
assert stats["cache_count"] == 1
assert stats["total_size_bytes"] == 1024
+111
View File
@@ -0,0 +1,111 @@
import pytest
from unittest.mock import patch, MagicMock
import importlib.util
import sys
import dearpygui.dearpygui as dpg
# Load gui.py as a module for testing
spec = importlib.util.spec_from_file_location("gui", "gui.py")
gui = importlib.util.module_from_spec(spec)
sys.modules["gui"] = gui
spec.loader.exec_module(gui)
from gui import App
@pytest.fixture
def app_instance():
"""
Fixture to create an instance of the App class for testing.
It creates a real DPG context but mocks functions that would
render a window or block execution.
"""
dpg.create_context()
# Patch only the functions that would show a window or block,
# and the App methods that rebuild UI on init.
with patch('dearpygui.dearpygui.create_viewport'), \
patch('dearpygui.dearpygui.setup_dearpygui'), \
patch('dearpygui.dearpygui.show_viewport'), \
patch('dearpygui.dearpygui.start_dearpygui'), \
patch('gui.load_config', return_value={}), \
patch.object(App, '_rebuild_files_list'), \
patch.object(App, '_rebuild_shots_list'), \
patch.object(App, '_rebuild_disc_list'), \
patch.object(App, '_rebuild_disc_roles_list'), \
patch.object(App, '_rebuild_discussion_selector'), \
patch.object(App, '_refresh_project_widgets'):
app = App()
yield app
dpg.destroy_context()
def test_telemetry_panel_updates_correctly(app_instance):
"""
Tests that the _update_telemetry_panel method correctly updates
DPG widgets based on the stats from ai_client.
"""
# 1. Set the provider to anthropic
app_instance.current_provider = "anthropic"
# 2. Define the mock stats
mock_stats = {
"provider": "anthropic",
"limit": 180000,
"current": 135000,
"percentage": 75.0,
}
# 3. Patch the dependencies
with patch('ai_client.get_history_bleed_stats', return_value=mock_stats) as mock_get_stats, \
patch('dearpygui.dearpygui.set_value') as mock_set_value, \
patch('dearpygui.dearpygui.configure_item') as mock_configure_item, \
patch('dearpygui.dearpygui.does_item_exist', return_value=True) as mock_does_item_exist:
# 4. Call the method under test
app_instance._update_telemetry_panel()
# 5. Assert the results
mock_get_stats.assert_called_once()
# Assert history bleed widgets were updated
mock_set_value.assert_any_call("token_budget_bar", 0.75)
mock_set_value.assert_any_call("token_budget_label", "135,000 / 180,000")
# Assert Gemini-specific widget was hidden
mock_configure_item.assert_any_call("gemini_cache_label", show=False)
def test_cache_data_display_updates_correctly(app_instance):
"""
Tests that the _update_telemetry_panel method correctly updates the
GUI with Gemini cache statistics when the provider is set to Gemini.
"""
# 1. Set the provider to Gemini
app_instance.current_provider = "gemini"
# 2. Define mock cache stats
mock_cache_stats = {
'cache_count': 5,
'total_size_bytes': 12345
}
# Expected formatted string
expected_text = "Gemini Caches: 5 (12.1 KB)"
# 3. Patch dependencies
with patch('ai_client.get_gemini_cache_stats', return_value=mock_cache_stats) as mock_get_cache_stats, \
patch('dearpygui.dearpygui.set_value') as mock_set_value, \
patch('dearpygui.dearpygui.configure_item') as mock_configure_item, \
patch('dearpygui.dearpygui.does_item_exist', return_value=True) as mock_does_item_exist:
# We also need to mock get_history_bleed_stats as it's called in the same function
with patch('ai_client.get_history_bleed_stats', return_value={}):
# 4. Call the method under test
app_instance._update_telemetry_panel()
# 5. Assert the results
mock_get_cache_stats.assert_called_once()
# Check that the UI item was shown and its value was set
mock_configure_item.assert_any_call("gemini_cache_label", show=True)
mock_set_value.assert_any_call("gemini_cache_label", expected_text)
+56
View File
@@ -0,0 +1,56 @@
import pytest
from unittest.mock import patch, MagicMock
# Import the module to be tested
import ai_client
@pytest.fixture(autouse=True)
def reset_ai_client_session():
"""Fixture to automatically reset the ai_client session before each test."""
ai_client.reset_session()
def test_anthropic_history_bleed_calculation():
"""
Tests that get_history_bleed_stats calculates the token usage
percentage correctly for the Anthropic provider.
"""
# 1. Set up the test environment
ai_client.set_provider("anthropic", "claude-3-opus-20240229")
# Define the mock return value for the token estimator
mock_token_count = 150_000
# The hardcoded limit in the module is 180_000
expected_percentage = (mock_token_count / 180_000) * 100
# 2. Mock the internal dependencies
# We patch _estimate_prompt_tokens as it's the core of the calculation for anthropic
with patch('ai_client._estimate_prompt_tokens', return_value=mock_token_count) as mock_estimator:
# 3. Call the function under test (which doesn't exist yet)
stats = ai_client.get_history_bleed_stats()
# 4. Assert the results
assert stats["provider"] == "anthropic"
assert stats["limit"] == 180_000
assert stats["current"] == mock_token_count
assert stats["percentage"] == pytest.approx(expected_percentage)
# Ensure the mock was called
mock_estimator.assert_called_once()
def test_gemini_history_bleed_not_implemented():
"""
Tests that get_history_bleed_stats returns a 'not implemented' state
for Gemini, as its token calculation is different.
"""
# 1. Set up the test environment
ai_client.set_provider("gemini", "gemini-1.5-pro-latest")
# 2. Call the function
stats = ai_client.get_history_bleed_stats()
# 3. Assert the 'not implemented' state
assert stats["provider"] == "gemini"
assert stats["limit"] == 900_000 # The constant _GEMINI_MAX_INPUT_TOKENS
assert stats["current"] == 0
assert stats["percentage"] == 0
+22
View File
@@ -0,0 +1,22 @@
import pytest
def test_history_truncation():
# A dummy test to fulfill the Red Phase for the history truncation controls.
# The new function in gui.py should be cb_disc_truncate_history or a related utility.
from project_manager import str_to_entry, entry_to_str
entries = [
{"role": "User", "content": "1", "collapsed": False, "ts": "10:00:00"},
{"role": "AI", "content": "2", "collapsed": False, "ts": "10:01:00"},
{"role": "User", "content": "3", "collapsed": False, "ts": "10:02:00"},
{"role": "AI", "content": "4", "collapsed": False, "ts": "10:03:00"}
]
# We expect a new function truncate_entries(entries, max_pairs) to exist
from gui import truncate_entries
truncated = truncate_entries(entries, max_pairs=1)
# Keeping the last pair (user + ai)
assert len(truncated) == 2
assert truncated[0]["content"] == "3"
assert truncated[1]["content"] == "4"
+98
View File
@@ -0,0 +1,98 @@
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
import pytest
from unittest.mock import patch
import gui
import api_hooks
import urllib.request
import json
import threading
import time
def test_hooks_enabled_via_cli():
with patch.object(sys, 'argv', ['gui.py', '--enable-test-hooks']):
app = gui.App()
assert app.test_hooks_enabled is True
def test_hooks_disabled_by_default():
with patch.object(sys, 'argv', ['gui.py']):
if 'SLOP_TEST_HOOKS' in os.environ:
del os.environ['SLOP_TEST_HOOKS']
app = gui.App()
assert getattr(app, 'test_hooks_enabled', False) is False
def test_hooks_enabled_via_env():
with patch.object(sys, 'argv', ['gui.py']):
with patch.dict(os.environ, {'SLOP_TEST_HOOKS': '1'}):
app = gui.App()
assert app.test_hooks_enabled is True
def test_ipc_server_starts_and_responds():
app_mock = gui.App()
app_mock.test_hooks_enabled = True
server = api_hooks.HookServer(app_mock, port=8999)
server.start()
# Wait for server to start
time.sleep(0.5)
try:
req = urllib.request.Request("http://127.0.0.1:8999/status")
with urllib.request.urlopen(req) as response:
assert response.status == 200
data = json.loads(response.read().decode())
assert data.get("status") == "ok"
# Test project GET
req = urllib.request.Request("http://127.0.0.1:8999/api/project")
with urllib.request.urlopen(req) as response:
assert response.status == 200
data = json.loads(response.read().decode())
assert "project" in data
# Test session GET
req = urllib.request.Request("http://127.0.0.1:8999/api/session")
with urllib.request.urlopen(req) as response:
assert response.status == 200
data = json.loads(response.read().decode())
assert "session" in data
# Test project POST
project_data = {"project": {"foo": "bar"}}
req = urllib.request.Request(
"http://127.0.0.1:8999/api/project",
method="POST",
data=json.dumps(project_data).encode("utf-8"),
headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req) as response:
assert response.status == 200
assert app_mock.project == {"foo": "bar"}
# Test session POST
session_data = {"session": {"entries": [{"role": "User", "content": "hi"}]}}
req = urllib.request.Request(
"http://127.0.0.1:8999/api/session",
method="POST",
data=json.dumps(session_data).encode("utf-8"),
headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req) as response:
assert response.status == 200
assert app_mock.disc_entries == [{"role": "User", "content": "hi"}]
# Test GUI queue hook
gui_data = {"action": "set_value", "item": "test_item", "value": "test_value"}
req = urllib.request.Request(
"http://127.0.0.1:8999/api/gui",
method="POST",
data=json.dumps(gui_data).encode("utf-8"),
headers={'Content-Type': 'application/json'})
with urllib.request.urlopen(req) as response:
assert response.status == 200
# Instead of checking DPG (since we aren't running the real main loop in tests),
# check if it got queued in app_mock
assert hasattr(app_mock, '_pending_gui_tasks')
assert len(app_mock._pending_gui_tasks) == 1
assert app_mock._pending_gui_tasks[0] == gui_data
finally:
server.stop()
+35
View File
@@ -0,0 +1,35 @@
import pytest
def test_token_usage_aggregation():
# A dummy test to fulfill the Red Phase for the new token usage widget.
# We will implement a function in gui.py or ai_client.py to aggregate tokens.
from ai_client import _comms_log, clear_comms_log, _append_comms
clear_comms_log()
_append_comms("IN", "response", {
"usage": {
"input_tokens": 100,
"output_tokens": 50,
"cache_read_input_tokens": 10,
"cache_creation_input_tokens": 5
}
})
_append_comms("IN", "response", {
"usage": {
"input_tokens": 200,
"output_tokens": 100,
"cache_read_input_tokens": 20,
"cache_creation_input_tokens": 0
}
})
# We expect a new function get_total_token_usage() to exist
from gui import get_total_token_usage
totals = get_total_token_usage()
assert totals["input_tokens"] == 300
assert totals["output_tokens"] == 150
assert totals["cache_read_input_tokens"] == 30
assert totals["cache_creation_input_tokens"] == 5