feat(src): Move core implementation files to src/ directory

This commit is contained in:
2026-03-04 09:55:44 -05:00
parent 30f2ec6689
commit a0276e0894
30 changed files with 146 additions and 11 deletions

0
src/__init__.py Normal file
View File

336
src/aggregate.py Normal file
View File

@@ -0,0 +1,336 @@
# aggregate.py
from __future__ import annotations
"""
Note(Gemini):
This module orchestrates the construction of the final Markdown context string.
Instead of sending every file to the AI raw (which blows up tokens), this uses a pipeline:
1. Resolve paths (handles globs and absolute paths).
2. Build file items (raw content).
3. If 'summary_only' is true (which is the default behavior now), it pipes the files through
summarize.py to generate a compacted view.
This is essential for keeping prompt tokens low while giving the AI enough structural info
to use the MCP tools to fetch only what it needs.
"""
import tomllib
import re
import glob
from pathlib import Path, PureWindowsPath
from typing import Any, cast
import summarize
import project_manager
from file_cache import ASTParser
def find_next_increment(output_dir: Path, namespace: str) -> int:
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
max_num = 0
for f in output_dir.iterdir():
if f.is_file():
match = pattern.match(f.name)
if match:
max_num = max(max_num, int(match.group(1)))
return max_num + 1
def is_absolute_with_drive(entry: str) -> bool:
try:
p = PureWindowsPath(entry)
return p.drive != ""
except Exception:
return False
def resolve_paths(base_dir: Path, entry: str) -> list[Path]:
has_drive = is_absolute_with_drive(entry)
is_wildcard = "*" in entry
matches = []
if is_wildcard:
root = Path(entry) if has_drive else base_dir / entry
matches = [Path(p) for p in glob.glob(str(root), recursive=True) if Path(p).is_file()]
else:
p = Path(entry) if has_drive else (base_dir / entry).resolve()
matches = [p]
# Blacklist filter
filtered = []
for p in matches:
name = p.name.lower()
if name == "history.toml" or name.endswith("_history.toml"):
continue
filtered.append(p)
return sorted(filtered)
def build_discussion_section(history: list[str]) -> str:
sections = []
for i, paste in enumerate(history, start=1):
sections.append(f"### Discussion Excerpt {i}\n\n{paste.strip()}")
return "\n\n---\n\n".join(sections)
def build_files_section(base_dir: Path, files: list[str | dict[str, Any]]) -> str:
sections = []
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = cast(str, entry_raw.get("path", ""))
else:
entry = entry_raw
if not entry or not isinstance(entry, str):
continue
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n```text\nERROR: no files matched: {entry}\n```")
continue
for path in paths:
suffix = path.suffix.lstrip(".")
lang = suffix if suffix else "text"
try:
content = path.read_text(encoding="utf-8")
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
except Exception as e:
content = f"ERROR: {e}"
original = entry if "*" not in entry else str(path)
sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```")
return "\n\n---\n\n".join(sections)
def build_screenshots_section(base_dir: Path, screenshots: list[str]) -> str:
sections = []
for entry in screenshots:
if not entry or not isinstance(entry, str):
continue
paths = resolve_paths(base_dir, entry)
if not paths:
sections.append(f"### `{entry}`\n\n_ERROR: no files matched: {entry}_")
continue
for path in paths:
original = entry if "*" not in entry else str(path)
if not path.exists():
sections.append(f"### `{original}`\n\n_ERROR: file not found: {path}_")
continue
sections.append(f"### `{original}`\n\n![{path.name}]({path.as_posix()})")
return "\n\n---\n\n".join(sections)
def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[dict[str, Any]]:
"""
Return a list of dicts describing each file, for use by ai_client when it
wants to upload individual files rather than inline everything as markdown.
Each dict has:
path : Path (resolved absolute path)
entry : str (original config entry string)
content : str (file text, or error string)
error : bool
mtime : float (last modification time, for skip-if-unchanged optimization)
tier : int | None (optional tier for context management)
"""
items: list[dict[str, Any]] = []
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = cast(str, entry_raw.get("path", ""))
tier = entry_raw.get("tier")
else:
entry = entry_raw
tier = None
if not entry or not isinstance(entry, str):
continue
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier})
return items
def build_summary_section(base_dir: Path, files: list[str | dict[str, Any]]) -> str:
"""
Build a compact summary section using summarize.py — one short block per file.
Used as the initial <context> block instead of full file contents.
"""
items = build_file_items(base_dir, files)
return summarize.build_summary_markdown(items)
def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str:
"""Build the files markdown section from pre-read file items (avoids double I/O)."""
sections = []
for item in file_items:
path = item.get("path")
entry = cast(str, item.get("entry", "unknown"))
content = cast(str, item.get("content", ""))
if path is None:
sections.append(f"### `{entry}`\n\n```text\n{content}\n```")
continue
p = cast(Path, path)
suffix = p.suffix.lstrip(".") if hasattr(p, "suffix") else "text"
lang = suffix if suffix else "text"
original = entry if "*" not in entry else str(p)
sections.append(f"### `{original}`\n\n```{lang}\n{content}\n```")
return "\n\n---\n\n".join(sections)
def build_markdown_from_items(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
"""Build markdown from pre-read file items instead of re-reading from disk."""
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if file_items:
if summary_only:
parts.append("## Files (Summary)\n\n" + summarize.build_summary_markdown(file_items))
else:
parts.append("## Files\n\n" + _build_files_section_from_items(file_items))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown_no_history(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], summary_only: bool = False) -> str:
"""Build markdown with only files + screenshots (no history). Used for stable caching."""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history=[], summary_only=summary_only)
def build_discussion_text(history: list[str]) -> str:
"""Build just the discussion history section text. Returns empty string if no history."""
if not history:
return ""
return "## Discussion History\n\n" + build_discussion_section(history)
def build_tier1_context(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 1 Context: Strategic/Orchestration.
Full content for core conductor files and files with tier=1, summaries for others.
"""
core_files = {"product.md", "tech-stack.md", "workflow.md", "tracks.md"}
parts = []
# Files section
if file_items:
sections = []
for item in file_items:
path = item.get("path")
name = path.name if path and isinstance(path, Path) else ""
if name in core_files or item.get("tier") == 1:
# Include in full
sections.append("### `" + (cast(str, item.get("entry")) or str(path)) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and isinstance(path, Path) and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
# Summarize
if path and isinstance(path, Path):
sections.append("### `" + (cast(str, item.get("entry")) or str(path)) + "`\n\n" +
summarize.summarise_file(path, cast(str, item.get("content", ""))))
parts.append("## Files (Tier 1 - Mixed)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_tier2_context(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str]) -> str:
"""
Tier 2 Context: Architectural/Tech Lead.
Full content for all files (standard behavior).
"""
return build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history, summary_only=False)
def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], focus_files: list[str]) -> str:
"""
Tier 3 Context: Execution/Worker.
Full content for focus_files and files with tier=3, summaries/skeletons for others.
"""
parts = []
if file_items:
sections = []
for item in file_items:
path = cast(Path, item.get("path"))
entry = cast(str, item.get("entry", ""))
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or (path_str and focus in path_str):
is_focus = True
break
if is_focus or item.get("tier") == 3:
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
content = cast(str, item.get("content", ""))
if path and path.suffix == ".py" and not item.get("error"):
try:
parser = ASTParser("python")
skeleton = parser.get_skeleton(content)
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
except Exception:
# Fallback to summary if AST parsing fails
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
else:
if path:
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False) -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:
if summary_only:
parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files))
else:
parts.append("## Files\n\n" + build_files_section(base_dir, files))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def run(config: dict[str, Any]) -> tuple[str, Path, list[dict[str, Any]]]:
namespace = config.get("project", {}).get("name")
if not namespace:
namespace = config.get("output", {}).get("namespace", "project")
output_dir = Path(config["output"]["output_dir"])
base_dir = Path(config["files"]["base_dir"])
files = config["files"].get("paths", [])
screenshot_base_dir = Path(config.get("screenshots", {}).get("base_dir", "."))
screenshots = config.get("screenshots", {}).get("paths", [])
history = config.get("discussion", {}).get("history", [])
output_dir.mkdir(parents=True, exist_ok=True)
increment = find_next_increment(output_dir, namespace)
output_file = output_dir / f"{namespace}_{increment:03d}.md"
# Build file items once, then construct markdown from them (avoids double I/O)
file_items = build_file_items(base_dir, files)
summary_only = config.get("project", {}).get("summary_only", False)
markdown = build_markdown_from_items(file_items, screenshot_base_dir, screenshots, history,
summary_only=summary_only)
output_file.write_text(markdown, encoding="utf-8")
return markdown, output_file, file_items
def main() -> None:
# Load global config to find active project
config_path = Path("config.toml")
if not config_path.exists():
print("config.toml not found.")
return
with open(config_path, "rb") as f:
global_cfg = tomllib.load(f)
active_path = global_cfg.get("projects", {}).get("active")
if not active_path:
print("No active project found in config.toml.")
return
# Use project_manager to load project (handles history segregation)
proj = project_manager.load_project(active_path)
# Use flat_config to make it compatible with aggregate.run()
config = project_manager.flat_config(proj)
markdown, output_file, _ = run(config)
print(f"Written: {output_file}")
if __name__ == "__main__":
main()

1713
src/ai_client.py Normal file

File diff suppressed because it is too large Load Diff

249
src/api_hook_client.py Normal file
View File

@@ -0,0 +1,249 @@
from __future__ import annotations
import requests # type: ignore[import-untyped]
import json
import time
from typing import Any
class ApiHookClient:
def __init__(self, base_url: str = "http://127.0.0.1:8999", max_retries: int = 5, retry_delay: float = 0.2) -> None:
self.base_url = base_url
self.max_retries = max_retries
self.retry_delay = retry_delay
def wait_for_server(self, timeout: float = 3) -> bool:
"""
Polls the /status endpoint until the server is ready or timeout is reached.
"""
start_time = time.time()
while time.time() - start_time < timeout:
try:
if self.get_status().get('status') == 'ok':
return True
except (requests.exceptions.ConnectionError, requests.exceptions.Timeout):
time.sleep(0.1)
return False
def _make_request(self, method: str, endpoint: str, data: dict[str, Any] | None = None, timeout: float | None = None) -> dict[str, Any] | None:
url = f"{self.base_url}{endpoint}"
headers = {'Content-Type': 'application/json'}
last_exception = None
# Increase default request timeout for local server
req_timeout = timeout if timeout is not None else 10.0
for attempt in range(self.max_retries + 1):
try:
if method == 'GET':
response = requests.get(url, timeout=req_timeout)
elif method == 'POST':
response = requests.post(url, json=data, headers=headers, timeout=req_timeout)
else:
raise ValueError(f"Unsupported HTTP method: {method}")
response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
res_json = response.json()
return res_json if isinstance(res_json, dict) else None
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as e:
last_exception = e
if attempt < self.max_retries:
time.sleep(self.retry_delay)
continue
else:
if isinstance(e, requests.exceptions.Timeout):
raise requests.exceptions.Timeout(f"Request to {endpoint} timed out after {self.max_retries} retries.") from e
else:
raise requests.exceptions.ConnectionError(f"Could not connect to API hook server at {self.base_url} after {self.max_retries} retries.") from e
except requests.exceptions.HTTPError as e:
raise requests.exceptions.HTTPError(f"HTTP error {e.response.status_code} for {endpoint}: {e.response.text}") from e
except json.JSONDecodeError as e:
raise ValueError(f"Failed to decode JSON from response for {endpoint}: {response.text}") from e
if last_exception:
raise last_exception
return None
def get_status(self) -> dict[str, Any]:
"""Checks the health of the hook server."""
url = f"{self.base_url}/status"
try:
response = requests.get(url, timeout=5.0)
response.raise_for_status()
res = response.json()
return res if isinstance(res, dict) else {}
except Exception:
raise requests.exceptions.ConnectionError(f"Could not reach /status at {self.base_url}")
def get_project(self) -> dict[str, Any] | None:
return self._make_request('GET', '/api/project')
def post_project(self, project_data: dict[str, Any]) -> dict[str, Any] | None:
return self._make_request('POST', '/api/project', data={'project': project_data})
def get_session(self) -> dict[str, Any] | None:
res = self._make_request('GET', '/api/session')
return res
def get_mma_status(self) -> dict[str, Any] | None:
"""Retrieves current MMA status (track, tickets, tier, etc.)"""
return self._make_request('GET', '/api/gui/mma_status')
def push_event(self, event_type: str, payload: dict[str, Any]) -> dict[str, Any] | None:
"""Pushes an event to the GUI's AsyncEventQueue via the /api/gui endpoint."""
return self.post_gui({
"action": event_type,
"payload": payload
})
def get_performance(self) -> dict[str, Any] | None:
"""Retrieves UI performance metrics."""
return self._make_request('GET', '/api/performance')
def post_session(self, session_entries: list[Any]) -> dict[str, Any] | None:
return self._make_request('POST', '/api/session', data={'session': {'entries': session_entries}})
def post_gui(self, gui_data: dict[str, Any]) -> dict[str, Any] | None:
return self._make_request('POST', '/api/gui', data=gui_data)
def select_tab(self, tab_bar: str, tab: str) -> dict[str, Any] | None:
"""Tells the GUI to switch to a specific tab in a tab bar."""
return self.post_gui({
"action": "select_tab",
"tab_bar": tab_bar,
"tab": tab
})
def select_list_item(self, listbox: str, item_value: str) -> dict[str, Any] | None:
"""Tells the GUI to select an item in a listbox by its value."""
return self.post_gui({
"action": "select_list_item",
"listbox": listbox,
"item_value": item_value
})
def set_value(self, item: str, value: Any) -> dict[str, Any] | None:
"""Sets the value of a GUI item."""
return self.post_gui({
"action": "set_value",
"item": item,
"value": value
})
def get_value(self, item: str) -> Any:
"""Gets the value of a GUI item via its mapped field."""
try:
# First try direct field querying via POST
res = self._make_request('POST', '/api/gui/value', data={"field": item})
if res and "value" in res:
v = res.get("value")
if v is not None:
return v
except Exception:
pass
try:
# Try GET fallback
res = self._make_request('GET', f'/api/gui/value/{item}')
if res and "value" in res:
v = res.get("value")
if v is not None:
return v
except Exception:
pass
try:
# Fallback for thinking/live/prior which are in diagnostics
diag = self._make_request('GET', '/api/gui/diagnostics')
if diag and item in diag:
return diag[item]
# Map common indicator tags to diagnostics keys
mapping = {
"thinking_indicator": "thinking",
"operations_live_indicator": "live",
"prior_session_indicator": "prior"
}
key = mapping.get(item)
if diag and key and key in diag:
return diag[key]
except Exception:
pass
return None
def get_text_value(self, item_tag: str) -> str | None:
"""Wraps get_value and returns its string representation, or None."""
val = self.get_value(item_tag)
return str(val) if val is not None else None
def get_node_status(self, node_tag: str) -> Any:
"""Wraps get_value for a DAG node or queries the diagnostic endpoint for its status."""
val = self.get_value(node_tag)
if val is not None:
return val
try:
diag = self._make_request('GET', '/api/gui/diagnostics')
if diag and 'nodes' in diag and node_tag in diag['nodes']:
return diag['nodes'][node_tag]
if diag and node_tag in diag:
return diag[node_tag]
except Exception:
pass
return None
def click(self, item: str, *args: Any, **kwargs: Any) -> dict[str, Any] | None:
"""Simulates a click on a GUI button or item."""
user_data = kwargs.pop('user_data', None)
return self.post_gui({
"action": "click",
"item": item,
"args": args,
"kwargs": kwargs,
"user_data": user_data
})
def get_indicator_state(self, tag: str) -> dict[str, Any]:
"""Checks if an indicator is shown using the diagnostics endpoint."""
# Mapping tag to the keys used in diagnostics endpoint
mapping = {
"thinking_indicator": "thinking",
"operations_live_indicator": "live",
"prior_session_indicator": "prior"
}
key = mapping.get(tag, tag)
try:
diag = self._make_request('GET', '/api/gui/diagnostics')
return {"tag": tag, "shown": diag.get(key, False) if diag else False}
except Exception as e:
return {"tag": tag, "shown": False, "error": str(e)}
def get_events(self) -> list[Any]:
"""Fetches and clears the event queue from the server."""
try:
res = self._make_request('GET', '/api/events')
return res.get("events", []) if res else []
except Exception:
return []
def wait_for_event(self, event_type: str, timeout: float = 5) -> dict[str, Any] | None:
"""Polls for a specific event type."""
start = time.time()
while time.time() - start < timeout:
events = self.get_events()
for ev in events:
if isinstance(ev, dict) and ev.get("type") == event_type:
return ev
time.sleep(0.1) # Fast poll
return None
def wait_for_value(self, item: str, expected: Any, timeout: float = 5) -> bool:
"""Polls until get_value(item) == expected."""
start = time.time()
while time.time() - start < timeout:
if self.get_value(item) == expected:
return True
time.sleep(0.1) # Fast poll
return False
def reset_session(self) -> dict[str, Any] | None:
"""Simulates clicking the 'Reset Session' button in the GUI."""
return self.click("btn_reset")
def request_confirmation(self, tool_name: str, args: dict[str, Any]) -> Any:
"""Asks the user for confirmation via the GUI (blocking call)."""
# Using a long timeout as this waits for human input (60 seconds)
res = self._make_request('POST', '/api/ask',
data={'type': 'tool_approval', 'tool': tool_name, 'args': args},
timeout=60.0)
return res.get('response') if res else None

310
src/api_hooks.py Normal file
View File

@@ -0,0 +1,310 @@
from __future__ import annotations
import json
import threading
import uuid
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
from typing import Any
import logging
import session_logger
class HookServerInstance(ThreadingHTTPServer):
"""Custom HTTPServer that carries a reference to the main App instance."""
def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None:
super().__init__(server_address, RequestHandlerClass)
self.app = app
class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks."""
def do_GET(self) -> None:
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == '/status':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8'))
elif self.path == '/api/project':
import project_manager
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
flat = project_manager.flat_config(app.project)
self.wfile.write(json.dumps({'project': flat}).encode('utf-8'))
elif self.path == '/api/session':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
with app._disc_entries_lock:
entries_snapshot = list(app.disc_entries)
self.wfile.write(
json.dumps({'session': {'entries': entries_snapshot}}).
encode('utf-8'))
elif self.path == '/api/performance':
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
metrics = {}
if hasattr(app, 'perf_monitor'):
metrics = app.perf_monitor.get_metrics()
self.wfile.write(json.dumps({'performance': metrics}).encode('utf-8'))
elif self.path == '/api/events':
# Long-poll or return current event queue
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
events = []
if hasattr(app, '_api_event_queue'):
with app._api_event_queue_lock:
events = list(app._api_event_queue)
app._api_event_queue.clear()
self.wfile.write(json.dumps({'events': events}).encode('utf-8'))
elif self.path == '/api/gui/value':
# POST with {"field": "field_tag"} to get value
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
data = json.loads(body.decode('utf-8'))
field_tag = data.get("field")
event = threading.Event()
result = {"value": None}
def get_val():
try:
if field_tag in app._settable_fields:
attr = app._settable_fields[field_tag]
val = getattr(app, attr, None)
result["value"] = val
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
elif self.path.startswith('/api/gui/value/'):
# Generic endpoint to get the value of any settable field
field_tag = self.path.split('/')[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
if field_tag in app._settable_fields:
attr = app._settable_fields[field_tag]
result["value"] = getattr(app, attr, None)
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_val
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/mma_status':
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = getattr(app, "mma_status", "idle")
result["ai_status"] = getattr(app, "ai_status", "idle")
result["active_tier"] = getattr(app, "active_tier", None)
at = getattr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = getattr(app, "active_tickets", [])
result["mma_step_mode"] = getattr(app, "mma_step_mode", False)
result["pending_tool_approval"] = getattr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = getattr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = getattr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = getattr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = getattr(app, "tracks", [])
result["proposed_tracks"] = getattr(app, "proposed_tracks", [])
result["mma_streams"] = getattr(app, "mma_streams", {})
result["mma_tier_usage"] = getattr(app, "mma_tier_usage", {})
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": get_mma
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/diagnostics':
event = threading.Event()
result = {}
def check_all():
try:
status = getattr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = getattr(app, "is_viewing_prior_session", False)
finally:
event.set()
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({
"action": "custom_callback",
"callback": check_all
})
if event.wait(timeout=60):
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps(result).encode('utf-8'))
else:
self.send_response(504)
self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
def do_POST(self) -> None:
app = self.server.app
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
body_str = body.decode('utf-8') if body else ""
session_logger.log_api_hook("POST", self.path, body_str)
try:
data = json.loads(body_str) if body_str else {}
if self.path == '/api/project':
app.project = data.get('project', app.project)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path.startswith('/api/confirm/'):
action_id = self.path.split('/')[-1]
approved = data.get('approved', False)
if hasattr(app, 'resolve_pending_action'):
success = app.resolve_pending_action(action_id, approved)
if success:
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
else:
self.send_response(500)
self.end_headers()
elif self.path == '/api/session':
with app._disc_entries_lock:
app.disc_entries = data.get('session', {}).get('entries', app.disc_entries)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'updated'}).encode('utf-8'))
elif self.path == '/api/gui':
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append(data)
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'queued'}).encode('utf-8'))
elif self.path == '/api/ask':
request_id = str(uuid.uuid4())
event = threading.Event()
if not hasattr(app, '_pending_asks'): app._pending_asks = {}
if not hasattr(app, '_ask_responses'): app._ask_responses = {}
app._pending_asks[request_id] = event
with app._api_event_queue_lock:
app._api_event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
if event.wait(timeout=60.0):
response_data = app._ask_responses.get(request_id)
if request_id in app._ask_responses: del app._ask_responses[request_id]
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok', 'response': response_data}).encode('utf-8'))
else:
if request_id in app._pending_asks: del app._pending_asks[request_id]
self.send_response(504)
self.end_headers()
self.wfile.write(json.dumps({'error': 'timeout'}).encode('utf-8'))
elif self.path == '/api/ask/respond':
request_id = data.get('request_id')
response_data = data.get('response')
if request_id and hasattr(app, '_pending_asks') and request_id in app._pending_asks:
app._ask_responses[request_id] = response_data
event = app._pending_asks[request_id]
event.set()
del app._pending_asks[request_id]
with app._pending_gui_tasks_lock:
app._pending_gui_tasks.append({"action": "clear_ask", "request_id": request_id})
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode('utf-8'))
else:
self.send_response(404)
self.end_headers()
else:
self.send_response(404)
self.end_headers()
except Exception as e:
self.send_response(500)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
def log_message(self, format: str, *args: Any) -> None:
logging.info("Hook API: " + format % args)
class HookServer:
def __init__(self, app: Any, port: int = 8999) -> None:
self.app = app
self.port = port
self.server = None
self.thread = None
def start(self) -> None:
if self.thread and self.thread.is_alive():
return
is_gemini_cli = getattr(self.app, 'current_provider', '') == 'gemini_cli'
if not getattr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
return
if not hasattr(self.app, '_pending_gui_tasks'): self.app._pending_gui_tasks = []
if not hasattr(self.app, '_pending_gui_tasks_lock'): self.app._pending_gui_tasks_lock = threading.Lock()
if not hasattr(self.app, '_pending_asks'): self.app._pending_asks = {}
if not hasattr(self.app, '_ask_responses'): self.app._ask_responses = {}
if not hasattr(self.app, '_api_event_queue'): self.app._api_event_queue = []
if not hasattr(self.app, '_api_event_queue_lock'): self.app._api_event_queue_lock = threading.Lock()
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
self.thread.start()
logging.info(f"Hook server started on port {self.port}")
def stop(self) -> None:
if self.server:
self.server.shutdown()
self.server.server_close()
if self.thread:
self.thread.join()
logging.info("Hook server stopped")

View File

@@ -0,0 +1,79 @@
import json
import ai_client
import mma_prompts
import re
from typing import Any
def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, Any]]:
"""
Tier 2 (Tech Lead) call.
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
"""
# 1. Set Tier 2 Model (Tech Lead - Flash)
# 2. Construct Prompt
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"
f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
"Please generate the implementation tickets for this track."
)
# Set custom system prompt for this call
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt or "")
ai_client.current_tier = "Tier 2"
try:
# 3. Call Tier 2 Model
response = ai_client.send(
md_content="",
user_message=user_message
)
# 4. Parse JSON Output
# Extract JSON array from markdown code blocks if present
json_match = response.strip()
if "```json" in json_match:
json_match = json_match.split("```json")[1].split("```")[0].strip()
elif "```" in json_match:
json_match = json_match.split("```")[1].split("```")[0].strip()
# If it's still not valid JSON, try to find a [ ... ] block
if not (json_match.startswith('[') and json_match.endswith(']')):
match = re.search(r'\[\s*\{.*\}\s*\]', json_match, re.DOTALL)
if match:
json_match = match.group(0)
tickets: list[dict[str, Any]] = json.loads(json_match)
return tickets
except Exception as e:
print(f"Error parsing Tier 2 response: {e}")
return []
finally:
# Restore old system prompt and clear tier tag
ai_client.set_custom_system_prompt(old_system_prompt or "")
ai_client.current_tier = None
from dag_engine import TrackDAG
from models import Ticket
def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Sorts a list of tickets based on their 'depends_on' field.
Raises ValueError if a circular dependency or missing internal dependency is detected.
"""
# 1. Convert to Ticket objects for TrackDAG
ticket_objs = []
for t_data in tickets:
ticket_objs.append(Ticket.from_dict(t_data))
# 2. Use TrackDAG for validation and sorting
dag = TrackDAG(ticket_objs)
try:
sorted_ids = dag.topological_sort()
except ValueError as e:
raise ValueError(f"DAG Validation Error: {e}")
# 3. Return sorted dictionaries
ticket_map = {t['id']: t for t in tickets}
return [ticket_map[tid] for tid in sorted_ids]
if __name__ == "__main__":
# Quick test if run directly
test_brief = "Implement a new feature."
test_skeletons = "class NewFeature: pass"
tickets = generate_tickets(test_brief, test_skeletons)
print(json.dumps(tickets, indent=2))

28
src/cost_tracker.py Normal file
View File

@@ -0,0 +1,28 @@
import re
# Pricing per 1M tokens in USD
MODEL_PRICING = [
(r"gemini-2\.5-flash-lite", {"input_per_mtok": 0.075, "output_per_mtok": 0.30}),
(r"gemini-2\.5-flash", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3-flash-preview", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3\.1-pro-preview", {"input_per_mtok": 3.50, "output_per_mtok": 10.50}),
(r"claude-.*-sonnet", {"input_per_mtok": 3.0, "output_per_mtok": 15.0}),
(r"claude-.*-opus", {"input_per_mtok": 15.0, "output_per_mtok": 75.0}),
(r"deepseek-v3", {"input_per_mtok": 0.27, "output_per_mtok": 1.10}),
]
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""
Estimate the cost of a model call based on input and output tokens.
Returns the total cost in USD.
"""
if not model:
return 0.0
for pattern, rates in MODEL_PRICING:
if re.search(pattern, model, re.IGNORECASE):
input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"]
output_cost = (output_tokens / 1_000_000) * rates["output_per_mtok"]
return input_cost + output_cost
return 0.0

171
src/dag_engine.py Normal file
View File

@@ -0,0 +1,171 @@
from typing import List
from models import Ticket
class TrackDAG:
"""
Manages a Directed Acyclic Graph of implementation tickets.
Provides methods for dependency resolution, cycle detection, and topological sorting.
"""
def __init__(self, tickets: List[Ticket]) -> None:
"""
Initializes the TrackDAG with a list of Ticket objects.
Args:
tickets: A list of Ticket instances defining the graph nodes and edges.
"""
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets}
def cascade_blocks(self) -> None:
"""
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Runs until stable (handles multi-hop chains: A→B→C where A blocked cascades to B then C).
"""
changed = True
while changed:
changed = False
for ticket in self.tickets:
if ticket.status == 'todo':
for dep_id in ticket.depends_on:
dep = self.ticket_map.get(dep_id)
if dep and dep.status == 'blocked':
ticket.status = 'blocked'
changed = True
break
def get_ready_tasks(self) -> List[Ticket]:
"""
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
Returns:
A list of Ticket objects ready for execution.
"""
ready = []
for ticket in self.tickets:
if ticket.status == 'todo':
# Check if all dependencies exist and are completed
all_done = True
for dep_id in ticket.depends_on:
dep = self.ticket_map.get(dep_id)
if not dep or dep.status != 'completed':
all_done = False
break
if all_done:
ready.append(ticket)
return ready
def has_cycle(self) -> bool:
"""
Performs a Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
"""
visited = set()
rec_stack = set()
def is_cyclic(ticket_id: str) -> bool:
"""Internal recursive helper for cycle detection."""
if ticket_id in rec_stack:
return True
if ticket_id in visited:
return False
visited.add(ticket_id)
rec_stack.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for neighbor in ticket.depends_on:
if is_cyclic(neighbor):
return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id):
return True
return False
def topological_sort(self) -> List[str]:
"""
Returns a list of ticket IDs in topological order (dependencies before dependents).
Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
"""
if self.has_cycle():
raise ValueError("Dependency cycle detected")
visited = set()
stack = []
def visit(ticket_id: str) -> None:
"""Internal recursive helper for topological sorting."""
if ticket_id in visited:
return
visited.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for dep_id in ticket.depends_on:
visit(dep_id)
stack.append(ticket_id)
for ticket in self.tickets:
visit(ticket.id)
return stack
class ExecutionEngine:
"""
A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval.
"""
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
"""
Initializes the ExecutionEngine.
Args:
dag: The TrackDAG instance to manage.
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
"""
self.dag = dag
self.auto_queue = auto_queue
def tick(self) -> List[Ticket]:
"""
Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution.
If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'.
Returns:
A list of ready Ticket objects.
"""
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
if self.auto_queue:
for ticket in ready:
if not ticket.step_mode:
ticket.status = "in_progress"
return ready
def approve_task(self, task_id: str) -> None:
"""
Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met.
Args:
task_id: The ID of the task to approve.
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket and ticket.status == "todo":
# Check if dependencies are met first
all_done = True
for dep_id in ticket.depends_on:
dep = self.dag.ticket_map.get(dep_id)
if not dep or dep.status != "completed":
all_done = False
break
if all_done:
ticket.status = "in_progress"
def update_task_status(self, task_id: str, status: str) -> None:
"""
Force-updates the status of a specific task.
Args:
task_id: The ID of the task.
status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked').
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket:
ticket.status = status

88
src/events.py Normal file
View File

@@ -0,0 +1,88 @@
"""
Decoupled event emission system for cross-module communication.
"""
import asyncio
from typing import Callable, Any, Dict, List, Tuple
class EventEmitter:
"""
Simple event emitter for decoupled communication between modules.
"""
def __init__(self) -> None:
"""Initializes the EventEmitter with an empty listener map."""
self._listeners: Dict[str, List[Callable[..., Any]]] = {}
def on(self, event_name: str, callback: Callable[..., Any]) -> None:
"""
Registers a callback for a specific event.
Args:
event_name: The name of the event to listen for.
callback: The function to call when the event is emitted.
"""
if event_name not in self._listeners:
self._listeners[event_name] = []
self._listeners[event_name].append(callback)
def emit(self, event_name: str, *args: Any, **kwargs: Any) -> None:
"""
Emits an event, calling all registered callbacks.
Args:
event_name: The name of the event to emit.
*args: Positional arguments to pass to callbacks.
**kwargs: Keyword arguments to pass to callbacks.
"""
if event_name in self._listeners:
for callback in self._listeners[event_name]:
callback(*args, **kwargs)
class AsyncEventQueue:
"""
Asynchronous event queue for decoupled communication using asyncio.Queue.
"""
def __init__(self) -> None:
"""Initializes the AsyncEventQueue with an internal asyncio.Queue."""
self._queue: asyncio.Queue[Tuple[str, Any]] = asyncio.Queue()
async def put(self, event_name: str, payload: Any = None) -> None:
"""
Puts an event into the queue.
Args:
event_name: The name of the event.
payload: Optional data associated with the event.
"""
await self._queue.put((event_name, payload))
async def get(self) -> Tuple[str, Any]:
"""
Gets an event from the queue.
Returns:
A tuple containing (event_name, payload).
"""
return await self._queue.get()
class UserRequestEvent:
"""
Payload for a user request event.
"""
def __init__(self, prompt: str, stable_md: str, file_items: List[Any], disc_text: str, base_dir: str) -> None:
self.prompt = prompt
self.stable_md = stable_md
self.file_items = file_items
self.disc_text = disc_text
self.base_dir = base_dir
def to_dict(self) -> Dict[str, Any]:
return {
"prompt": self.prompt,
"stable_md": self.stable_md,
"file_items": self.file_items,
"disc_text": self.disc_text,
"base_dir": self.base_dir
}

158
src/file_cache.py Normal file
View File

@@ -0,0 +1,158 @@
# file_cache.py
"""
Stub — the Anthropic Files API path has been removed.
All context is now sent as inline chunked text via _send_anthropic_chunked.
This file is kept so that any stale imports do not break.
"""
from pathlib import Path
from typing import Optional, Any, List, Tuple, Dict
import tree_sitter
import tree_sitter_python
class ASTParser:
"""
Parser for extracting AST-based views of source code.
Currently supports Python.
"""
def __init__(self, language: str) -> None:
if language != "python":
raise ValueError(f"Language '{language}' not supported yet.")
self.language_name = language
# Load the tree-sitter language grammar
self.language = tree_sitter.Language(tree_sitter_python.language())
self.parser = tree_sitter.Parser(self.language)
def parse(self, code: str) -> tree_sitter.Tree:
"""Parse the given code and return the tree-sitter Tree."""
return self.parser.parse(bytes(code, "utf8"))
def get_skeleton(self, code: str) -> str:
"""
Returns a skeleton of a Python file (preserving docstrings, stripping function bodies).
"""
tree = self.parse(code)
edits: List[Tuple[int, int, str]] = []
def is_docstring(node: tree_sitter.Node) -> bool:
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def walk(node: tree_sitter.Node) -> None:
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
# Apply edits in reverse to maintain byte offsets
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
def get_curated_view(self, code: str) -> str:
"""
Returns a curated skeleton of a Python file.
Preserves function bodies if they have @core_logic decorator or # [HOT] comment.
Otherwise strips bodies but preserves docstrings.
"""
tree = self.parse(code)
edits: List[Tuple[int, int, str]] = []
def is_docstring(node: tree_sitter.Node) -> bool:
if node.type == "expression_statement" and node.child_count > 0:
if node.children[0].type == "string":
return True
return False
def has_core_logic_decorator(node: tree_sitter.Node) -> bool:
# Check if parent is decorated_definition
parent = node.parent
if parent and parent.type == "decorated_definition":
for child in parent.children:
if child.type == "decorator":
# decorator -> ( '@', identifier ) or ( '@', call )
if "@core_logic" in code[child.start_byte:child.end_byte]:
return True
return False
def has_hot_comment(func_node: tree_sitter.Node) -> bool:
# Check all descendants of the function_definition for a [HOT] comment
stack = [func_node]
while stack:
curr = stack.pop()
if curr.type == "comment":
comment_text = code[curr.start_byte:curr.end_byte]
if "[HOT]" in comment_text:
return True
for child in curr.children:
stack.append(child)
return False
def walk(node: tree_sitter.Node) -> None:
if node.type == "function_definition":
body = node.child_by_field_name("body")
if body and body.type == "block":
# Check if we should preserve it
preserve = has_core_logic_decorator(node) or has_hot_comment(node)
if not preserve:
indent = " " * body.start_point.column
first_stmt = None
for child in body.children:
if child.type != "comment":
first_stmt = child
break
if first_stmt and is_docstring(first_stmt):
start_byte = first_stmt.end_byte
end_byte = body.end_byte
if end_byte > start_byte:
edits.append((start_byte, end_byte, f"\n{indent}..."))
else:
start_byte = body.start_byte
end_byte = body.end_byte
edits.append((start_byte, end_byte, "..."))
for child in node.children:
walk(child)
walk(tree.root_node)
# Apply edits in reverse to maintain byte offsets
edits.sort(key=lambda x: x[0], reverse=True)
code_bytes = bytearray(code, "utf8")
for start, end, replacement in edits:
code_bytes[start:end] = bytes(replacement, "utf8")
return code_bytes.decode("utf8")
def reset_client() -> None:
pass
def content_block_type(path: Path) -> str:
return "unsupported"
def get_file_id(path: Path) -> Optional[str]:
return None
def evict(path: Path) -> None:
pass
def list_cached() -> List[Dict[str, Any]]:
return []

137
src/gemini_cli_adapter.py Normal file
View File

@@ -0,0 +1,137 @@
import subprocess
import json
import os
import time
import session_logger
from typing import Optional, Callable, Any
class GeminiCliAdapter:
"""
Adapter for the Gemini CLI that parses streaming JSON output.
"""
def __init__(self, binary_path: str = "gemini"):
self.binary_path = binary_path
self.session_id: Optional[str] = None
self.last_usage: Optional[dict[str, Any]] = None
self.last_latency: float = 0.0
def send(self, message: str, safety_settings: list[Any] | None = None, system_instruction: str | None = None,
model: str | None = None, stream_callback: Optional[Callable[[str], None]] = None) -> dict[str, Any]:
"""
Sends a message to the Gemini CLI and processes the streaming JSON output.
Uses non-blocking line-by-line reading to allow stream_callback.
"""
start_time = time.time()
command_parts = [self.binary_path]
if model:
command_parts.extend(['-m', f'"{model}"'])
command_parts.extend(['--prompt', '""'])
if self.session_id:
command_parts.extend(['--resume', self.session_id])
command_parts.extend(['--output-format', 'stream-json'])
command = " ".join(command_parts)
prompt_text = message
if system_instruction:
prompt_text = f"{system_instruction}\n\n{message}"
accumulated_text = ""
tool_calls = []
stdout_content = []
env = os.environ.copy()
env["GEMINI_CLI_HOOK_CONTEXT"] = "manual_slop"
import shlex
# shlex.split handles quotes correctly even on Windows if we are careful.
# We want to split the entire binary_path into its components.
if os.name == 'nt':
# On Windows, shlex.split with default posix=True might swallow backslashes.
# Using posix=False is better for Windows paths.
cmd_list = shlex.split(self.binary_path, posix=False)
else:
cmd_list = shlex.split(self.binary_path)
if model:
cmd_list.extend(['-m', model])
cmd_list.extend(['--prompt', '""'])
if self.session_id:
cmd_list.extend(['--resume', self.session_id])
cmd_list.extend(['--output-format', 'stream-json'])
# Filter out empty strings and strip quotes (Popen doesn't want them in cmd_list elements)
cmd_list = [c.strip('"') for c in cmd_list if c]
process = subprocess.Popen(
cmd_list,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
encoding="utf-8",
shell=False,
env=env
)
# Use communicate to avoid pipe deadlocks with large input/output.
# This blocks until the process exits, so we lose real-time streaming,
# but it's much more robust. We then simulate streaming by processing the output.
stdout_final, stderr_final = process.communicate(input=prompt_text)
for line in stdout_final.splitlines():
line = line.strip()
if not line: continue
stdout_content.append(line)
try:
data = json.loads(line)
msg_type = data.get("type")
if msg_type == "init":
if "session_id" in data:
self.session_id = data.get("session_id")
elif msg_type == "message" or msg_type == "chunk":
role = data.get("role", "")
if role in ["assistant", "model"] or not role:
content = data.get("content", data.get("text"))
if content:
accumulated_text += content
if stream_callback:
stream_callback(content)
elif msg_type == "result":
self.last_usage = data.get("stats") or data.get("usage")
if "session_id" in data:
self.session_id = data.get("session_id")
elif msg_type == "tool_use":
tc = {
"name": data.get("tool_name", data.get("name")),
"args": data.get("parameters", data.get("args", {})),
"id": data.get("tool_id", data.get("id"))
}
if tc["name"]:
tool_calls.append(tc)
except json.JSONDecodeError:
continue
current_latency = time.time() - start_time
session_logger.open_session()
session_logger.log_cli_call(
command=command,
stdin_content=prompt_text,
stdout_content="\n".join(stdout_content),
stderr_content=stderr_final,
latency=current_latency
)
self.last_latency = current_latency
return {
"text": accumulated_text,
"tool_calls": tool_calls,
"stderr": stderr_final
}
def count_tokens(self, contents: list[str]) -> int:
"""
Provides a character-based token estimation for the Gemini CLI.
Uses 4 chars/token as a conservative average.
"""
total_chars = len("\n".join(contents))
return total_chars // 4

3654
src/gui_2.py Normal file

File diff suppressed because it is too large Load Diff

60
src/log_pruner.py Normal file
View File

@@ -0,0 +1,60 @@
import os
import shutil
from datetime import datetime, timedelta
from log_registry import LogRegistry
class LogPruner:
"""
Handles the automated deletion of old and insignificant session logs.
Ensures that only whitelisted or significant sessions (based on size/content)
are preserved long-term.
"""
def __init__(self, log_registry: LogRegistry, logs_dir: str) -> None:
"""
Initializes the LogPruner.
Args:
log_registry: An instance of LogRegistry to check session data.
logs_dir: The path to the directory containing session sub-directories.
"""
self.log_registry = log_registry
self.logs_dir = logs_dir
def prune(self) -> None:
"""
Prunes old and small session directories from the logs directory.
Deletes session directories that meet the following criteria:
1. The session start time is older than 24 hours (based on data from LogRegistry).
2. The session name is NOT in the whitelist provided by the LogRegistry.
3. The total size of all files within the session directory is less than 2KB (2048 bytes).
"""
now = datetime.now()
cutoff_time = now - timedelta(hours=24)
# Ensure the base logs directory exists.
if not os.path.isdir(self.logs_dir):
return
# Get sessions that are old and not whitelisted from the registry
old_sessions_to_check = self.log_registry.get_old_non_whitelisted_sessions(cutoff_time)
# Prune sessions if their size is less than 2048 bytes
for session_info in old_sessions_to_check:
session_info['session_id']
session_path = session_info['path']
if not session_path or not os.path.isdir(session_path):
continue
# Calculate total size of files in the directory
total_size = 0
try:
for entry in os.scandir(session_path):
if entry.is_file():
total_size += entry.stat().st_size
except OSError:
continue
# Prune if the total size is less than 2KB (2048 bytes)
if total_size < 2048: # 2KB
try:
shutil.rmtree(session_path)
# print(f"Pruned session '{session_id}' (Size: {total_size} bytes)")
except OSError:
pass

246
src/log_registry.py Normal file
View File

@@ -0,0 +1,246 @@
from __future__ import annotations
import tomli_w
import tomllib
from datetime import datetime
import os
from typing import Any
class LogRegistry:
"""
Manages a persistent registry of session logs using a TOML file.
Tracks session paths, start times, whitelisting status, and metadata.
"""
def __init__(self, registry_path: str) -> None:
"""
Initializes the LogRegistry with a path to the registry file.
Args:
registry_path (str): The file path to the TOML registry.
"""
self.registry_path = registry_path
self.data: dict[str, dict[str, Any]] = {}
self.load_registry()
def load_registry(self) -> None:
"""
Loads the registry data from the TOML file into memory.
Handles date/time conversions from TOML-native formats to strings for consistency.
"""
if os.path.exists(self.registry_path):
try:
with open(self.registry_path, 'rb') as f:
loaded_data = tomllib.load(f)
# Keep data as it is from TOML (strings or native datetimes)
# If we want to satisfy tests that expect strings, we ensure they are strings.
self.data = {}
for session_id, session_data in loaded_data.items():
new_session_data = session_data.copy()
# If tomllib parsed it as a datetime, convert it back to string for the tests
if 'start_time' in new_session_data and isinstance(new_session_data['start_time'], datetime):
new_session_data['start_time'] = new_session_data['start_time'].isoformat()
if 'metadata' in new_session_data and isinstance(new_session_data['metadata'], dict):
m = new_session_data['metadata']
if 'timestamp' in m and isinstance(m['timestamp'], datetime):
m['timestamp'] = m['timestamp'].isoformat()
self.data[session_id] = new_session_data
except Exception as e:
print(f"Error loading registry from {self.registry_path}: {e}")
self.data = {}
else:
self.data = {}
def save_registry(self) -> None:
"""
Serializes and saves the current registry data to the TOML file.
Converts internal datetime objects to ISO format strings for compatibility.
"""
try:
# Convert datetime objects to ISO format strings for TOML serialization
data_to_save: dict[str, Any] = {}
for session_id, session_data in self.data.items():
session_data_copy: dict[str, Any] = {}
for k, v in session_data.items():
if v is None:
continue
if k == 'start_time' and isinstance(v, datetime):
session_data_copy[k] = v.isoformat()
elif k == 'metadata' and isinstance(v, dict):
metadata_copy: dict[str, Any] = {}
for mk, mv in v.items():
if mv is None:
continue
if mk == 'timestamp' and isinstance(mv, datetime):
metadata_copy[mk] = mv.isoformat()
else:
metadata_copy[mk] = mv
session_data_copy[k] = metadata_copy
else:
session_data_copy[k] = v
data_to_save[session_id] = session_data_copy
with open(self.registry_path, 'wb') as f:
tomli_w.dump(data_to_save, f)
except Exception as e:
print(f"Error saving registry to {self.registry_path}: {e}")
def register_session(self, session_id: str, path: str, start_time: datetime | str) -> None:
"""
Registers a new session in the registry.
Args:
session_id (str): Unique identifier for the session.
path (str): File path to the session's log directory.
start_time (datetime|str): The timestamp when the session started.
"""
if session_id in self.data:
print(f"Warning: Session ID '{session_id}' already exists. Overwriting.")
# Store start_time internally as a string to satisfy tests
if isinstance(start_time, datetime):
start_time_str = start_time.isoformat()
else:
start_time_str = start_time
self.data[session_id] = {
'path': path,
'start_time': start_time_str,
'whitelisted': False,
'metadata': None
}
self.save_registry()
def update_session_metadata(self, session_id: str, message_count: int, errors: int, size_kb: int, whitelisted: bool, reason: str) -> None:
"""
Updates metadata fields for an existing session.
Args:
session_id (str): Unique identifier for the session.
message_count (int): Total number of messages in the session.
errors (int): Number of errors identified in logs.
size_kb (int): Total size of the session logs in kilobytes.
whitelisted (bool): Whether the session should be protected from pruning.
reason (str): Explanation for the current whitelisting status.
"""
if session_id not in self.data:
print(f"Error: Session ID '{session_id}' not found for metadata update.")
return
# Ensure metadata exists
if self.data[session_id].get('metadata') is None:
self.data[session_id]['metadata'] = {}
# Update fields
metadata = self.data[session_id].get('metadata')
if isinstance(metadata, dict):
metadata['message_count'] = message_count
metadata['errors'] = errors
metadata['size_kb'] = size_kb
metadata['whitelisted'] = whitelisted
metadata['reason'] = reason
# self.data[session_id]['metadata']['timestamp'] = datetime.utcnow() # Optionally add a timestamp
# Also update the top-level whitelisted flag if provided
if whitelisted is not None:
self.data[session_id]['whitelisted'] = whitelisted
self.save_registry() # Save after update
def is_session_whitelisted(self, session_id: str) -> bool:
"""
Checks if a specific session is marked as whitelisted.
Args:
session_id (str): Unique identifier for the session.
Returns:
bool: True if whitelisted, False otherwise.
"""
session_data = self.data.get(session_id)
if session_data is None:
return False # Non-existent sessions are not whitelisted
# Check the top-level 'whitelisted' flag. If it's not set or False, it's not whitelisted.
return bool(session_data.get('whitelisted', False))
def update_auto_whitelist_status(self, session_id: str) -> None:
"""
Analyzes session logs and updates whitelisting status based on heuristics.
Sessions are automatically whitelisted if they contain error keywords,
have a high message count, or exceed a size threshold.
Args:
session_id (str): Unique identifier for the session to analyze.
"""
if session_id not in self.data:
return
session_data = self.data[session_id]
session_path = session_data.get('path')
if not session_path or not os.path.isdir(str(session_path)):
return
total_size_bytes = 0
message_count = 0
found_keywords = []
keywords_to_check = ['ERROR', 'WARNING', 'EXCEPTION']
try:
for entry in os.scandir(str(session_path)):
if entry.is_file():
size = entry.stat().st_size
total_size_bytes += size
# Analyze comms.log for messages and keywords
if entry.name == "comms.log":
try:
with open(entry.path, 'r', encoding='utf-8', errors='ignore') as f:
for line in f:
message_count += 1
for kw in keywords_to_check:
if kw in line and kw not in found_keywords:
found_keywords.append(kw)
except Exception:
pass
except Exception:
pass
size_kb = total_size_bytes / 1024
whitelisted = False
reason = ""
if found_keywords:
whitelisted = True
reason = f"Found keywords: {', '.join(found_keywords)}"
elif message_count > 10:
whitelisted = True
reason = f"High message count: {message_count}"
elif size_kb > 50:
whitelisted = True
reason = f"Large session size: {size_kb:.1f} KB"
self.update_session_metadata(
session_id,
message_count=message_count,
errors=len(found_keywords),
size_kb=int(size_kb),
whitelisted=whitelisted,
reason=reason
)
def get_old_non_whitelisted_sessions(self, cutoff_datetime: datetime) -> list[dict[str, Any]]:
"""
Retrieves a list of sessions that are older than a specific cutoff time
and are not marked as whitelisted.
Args:
cutoff_datetime (datetime): The threshold time for identifying old sessions.
Returns:
list: A list of dictionaries containing session details (id, path, start_time).
"""
old_sessions = []
for session_id, session_data in self.data.items():
# Check if session is older than cutoff and not whitelisted
start_time_raw = session_data.get('start_time')
if isinstance(start_time_raw, str):
try:
start_time = datetime.fromisoformat(start_time_raw)
except ValueError:
start_time = None
else:
start_time = start_time_raw
is_whitelisted = session_data.get('whitelisted', False)
if start_time is not None and start_time < cutoff_datetime and not is_whitelisted:
old_sessions.append({
'session_id': session_id,
'path': session_data.get('path'),
'start_time': start_time_raw
})
return old_sessions

1326
src/mcp_client.py Normal file

File diff suppressed because it is too large Load Diff

153
src/mma_prompts.py Normal file
View File

@@ -0,0 +1,153 @@
"""
MMA Structured System Prompts for Tier 1 (PM) and Tier 2 (Tech Lead).
Contains templates and static strings for hierarchical orchestration.
"""
from typing import Dict
# --- Tier 1 (Strategic/Orchestration: PM) ---
TIER1_BASE_SYSTEM: str = """
You are the Tier 1 Orchestrator (Product Manager) for the Manual Slop project.
Your role is high-level strategic planning, architecture enforcement, and cross-module delegation.
You operate strictly on metadata, summaries, and executive-level directives.
NEVER request or attempt to read raw implementation code unless specifically provided in a Macro-Diff.
Maintain a "Godot ECS Flat List format" (JSON array of objects) for structural outputs.
"""
TIER1_EPIC_INIT: str = TIER1_BASE_SYSTEM + """
PATH: Epic Initialization (Project Planning)
GOAL: Break down a massive feature request into discrete Implementation Tracks.
CONSTRAINTS:
- IGNORE all source code, AST skeletons, and previous micro-task histories.
- FOCUS ONLY on the Repository Map and Project Meta-State.
OUTPUT REQUIREMENT:
Return a JSON array of 'Tracks'. Each track object must follow the Godot ECS Flat List format:
[
{
"id": "track_unique_id",
"type": "Track",
"module": "target_module_name",
"persona": "required_tech_lead_persona",
"severity": "Low|Medium|High",
"goal": "Descriptive goal",
"acceptance_criteria": ["criteria_1", "criteria_2"]
},
...
]
"""
TIER1_TRACK_DELEGATION: str = TIER1_BASE_SYSTEM + """
PATH: Track Delegation (Sprint Kickoff)
GOAL: Compile a 'Track Brief' for a Tier 2 Tech Lead.
CONSTRAINTS:
- IGNORE unrelated module docs and original massive user prompt.
- USE AST Skeleton Views (class/function definitions only) for allowed modules.
OUTPUT REQUIREMENT:
Generate a comprehensive 'Track Brief' (JSON or Markdown) which includes:
1. A tailored System Prompt for the Tier 2 Tech Lead.
2. A curated list of files (the "Allowlist") they are authorized to modify.
3. Explicit architectural constraints derived from the Skeleton View.
"""
TIER1_MACRO_MERGE: str = TIER1_BASE_SYSTEM + """
PATH: Macro-Merge & Acceptance Review
GOAL: Review high-severity changes and merge into the project history.
CONSTRAINTS:
- IGNORE Tier 3 trial-and-error histories and Tier 4 error logs.
- FOCUS on the Macro-Diff and the Executive Summary.
OUTPUT REQUIREMENT:
Return "Approved" (commits to memory) OR "Rejected".
If Rejected, provide specific architectural feedback focusing on integration breaks or logic violations.
"""
# --- Tier 2 (Architectural/Tech Lead: Conductor) ---
TIER2_BASE_SYSTEM: str = """
You are the Tier 2 Track Conductor (Tech Lead) for the Manual Slop project.
Your role is module-specific planning, code review, and worker management.
You bridge high-level architecture with code syntax using AST-aware Skeleton Views.
Enforce Interface-Driven Development (IDD) and manage Topological Dependency Graphs.
"""
TIER2_SPRINT_PLANNING: str = TIER2_BASE_SYSTEM + """
PATH: Sprint Planning (Task Delegation)
GOAL: Break down a Track Brief into discrete Tier 3 Tickets.
CONSTRAINTS:
- IGNORE the PM's overarching project-planning logic.
- USE Curated Implementation View (AST-extracted class structures + [HOT] function bodies) for target modules.
- USE Skeleton View (signatures only) for foreign modules.
OUTPUT REQUIREMENT:
Return a JSON array of 'Tickets' in Godot ECS Flat List format.
Include 'depends_on' pointers to construct an execution DAG (Directed Acyclic Graph).
[
{
"id": "ticket_id",
"type": "Ticket",
"goal": "Surgical implementation task",
"target_file": "path/to/file",
"depends_on": ["other_ticket_id"],
"context_requirements": ["list_of_needed_skeletons"]
},
...
]
"""
TIER2_CODE_REVIEW: str = TIER2_BASE_SYSTEM + """
PATH: Code Review (Local Integration)
GOAL: Review Tier 3 diffs and ensure they meet the Ticket's goals.
CONSTRAINTS:
- IGNORE the Contributor's internal trial-and-error chat history.
- FOCUS on the Proposed Diff and Tier 4 (QA) logs.
OUTPUT REQUIREMENT:
Return "Approve" (merges diff) OR "Reject" (sends technical critique back to Tier 3).
"""
TIER2_TRACK_FINALIZATION: str = TIER2_BASE_SYSTEM + """
PATH: Track Finalization (Upward Reporting)
GOAL: Summarize the completed Track for the Tier 1 PM.
CONSTRAINTS:
- IGNORE back-and-forth review cycles.
- FOCUS on the Aggregated Track Diff and Dependency Delta.
OUTPUT REQUIREMENT:
Provide an Executive Summary (~200 words) and the final Macro-Diff.
"""
TIER2_CONTRACT_FIRST: str = TIER2_BASE_SYSTEM + """
PATH: Contract-First Delegation (Stub-and-Resolve)
GOAL: Resolve cross-module dependencies via Interface-Driven Development (IDD).
TASK:
You have detected a dependency on an undefined signature.
EXECUTION PLAN:
1. Define the Interface Contract.
2. Generate a 'Stub Ticket' (signature, types, docstring).
3. Generate a 'Consumer Ticket' (codes against skeleton).
4. Generate an 'Implementation Ticket' (fills logic).
OUTPUT REQUIREMENT:
Return the Ticket set in Godot ECS Flat List format (JSON array).
"""
PROMPTS: Dict[str, str] = {
"tier1_epic_init": TIER1_EPIC_INIT,
"tier1_track_delegation": TIER1_TRACK_DELEGATION,
"tier1_macro_merge": TIER1_MACRO_MERGE,
"tier2_sprint_planning": TIER2_SPRINT_PLANNING,
"tier2_code_review": TIER2_CODE_REVIEW,
"tier2_track_finalization": TIER2_TRACK_FINALIZATION,
"tier2_contract_first": TIER2_CONTRACT_FIRST,
}

162
src/models.py Normal file
View File

@@ -0,0 +1,162 @@
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Any
from datetime import datetime
@dataclass
class Ticket:
"""
Represents a discrete unit of work within a track.
"""
id: str
description: str
status: str
assigned_to: str
target_file: Optional[str] = None
context_requirements: List[str] = field(default_factory=list)
depends_on: List[str] = field(default_factory=list)
blocked_reason: Optional[str] = None
step_mode: bool = False
retry_count: int = 0
def mark_blocked(self, reason: str) -> None:
"""Sets the ticket status to 'blocked' and records the reason."""
self.status = "blocked"
self.blocked_reason = reason
def mark_complete(self) -> None:
"""Sets the ticket status to 'completed'."""
self.status = "completed"
def get(self, key: str, default: Any = None) -> Any:
"""Helper to provide dictionary-like access to dataclass fields."""
return getattr(self, key, default)
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"description": self.description,
"status": self.status,
"assigned_to": self.assigned_to,
"target_file": self.target_file,
"context_requirements": self.context_requirements,
"depends_on": self.depends_on,
"blocked_reason": self.blocked_reason,
"step_mode": self.step_mode,
"retry_count": self.retry_count,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Ticket":
return cls(
id=data["id"],
description=data.get("description", ""),
status=data.get("status", "todo"),
assigned_to=data.get("assigned_to", ""),
target_file=data.get("target_file"),
context_requirements=data.get("context_requirements", []),
depends_on=data.get("depends_on", []),
blocked_reason=data.get("blocked_reason"),
step_mode=data.get("step_mode", False),
retry_count=data.get("retry_count", 0),
)
@dataclass
class Track:
"""
Represents a collection of tickets that together form an architectural track or epic.
"""
id: str
description: str
tickets: List[Ticket] = field(default_factory=list)
def get_executable_tickets(self) -> List[Ticket]:
"""
Returns all 'todo' tickets whose dependencies are all 'completed'.
"""
# Map ticket IDs to their current status for efficient lookup
status_map = {t.id: t.status for t in self.tickets}
executable = []
for ticket in self.tickets:
if ticket.status != "todo":
continue
# Check if all dependencies are completed
all_deps_completed = True
for dep_id in ticket.depends_on:
# If a dependency is missing from the track, we treat it as not completed (or we could raise an error)
if status_map.get(dep_id) != "completed":
all_deps_completed = False
break
if all_deps_completed:
executable.append(ticket)
return executable
@dataclass
class WorkerContext:
"""
Represents the context provided to a Tier 3 Worker for a specific ticket.
"""
ticket_id: str
model_name: str
messages: List[Dict[str, Any]]
@dataclass
class Metadata:
id: str
name: str
status: Optional[str] = None
created_at: Optional[datetime] = None
updated_at: Optional[datetime] = None
def to_dict(self) -> Dict[str, Any]:
return {
"id": self.id,
"name": self.name,
"status": self.status,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Metadata":
return cls(
id=data["id"],
name=data["name"],
status=data.get("status"),
created_at=datetime.fromisoformat(data['created_at']) if data.get('created_at') else None,
updated_at=datetime.fromisoformat(data['updated_at']) if data.get('updated_at') else None,
)
@dataclass
class TrackState:
metadata: Metadata
discussion: List[Dict[str, Any]]
tasks: List[Ticket]
def to_dict(self) -> Dict[str, Any]:
return {
"metadata": self.metadata.to_dict(),
"discussion": [
{
k: v.isoformat() if isinstance(v, datetime) else v
for k, v in item.items()
}
for item in self.discussion
],
"tasks": [task.to_dict() for task in self.tasks],
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "TrackState":
metadata = Metadata.from_dict(data["metadata"])
tasks = [Ticket.from_dict(task_data) for task_data in data["tasks"]]
return cls(
metadata=metadata,
discussion=[
{
k: datetime.fromisoformat(v) if isinstance(v, str) and 'T' in v else v # Basic check for ISO format
for k, v in item.items()
}
for item in data["discussion"]
],
tasks=tasks,
)

View File

@@ -0,0 +1,354 @@
import ai_client
import json
import asyncio
import time
import traceback
from typing import List, Optional, Tuple
from dataclasses import asdict
import events
from models import Ticket, Track, WorkerContext
from file_cache import ASTParser
from pathlib import Path
from dag_engine import TrackDAG, ExecutionEngine
class ConductorEngine:
"""
Orchestrates the execution of tickets within a track.
"""
def __init__(self, track: Track, event_queue: Optional[events.AsyncEventQueue] = None, auto_queue: bool = False) -> None:
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview"},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview"},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite"},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
async def _push_state(self, status: str = "running", active_tier: str = None) -> None:
if not self.event_queue:
return
payload = {
"status": status,
"active_tier": active_tier,
"tier_usage": self.tier_usage,
"track": {
"id": self.track.id,
"title": self.track.description,
},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
def parse_json_tickets(self, json_str: str) -> None:
"""
Parses a JSON string of ticket definitions (Godot ECS Flat List format)
and populates the Track's ticket list.
"""
try:
data = json.loads(json_str)
if not isinstance(data, list):
print("Error: JSON input must be a list of ticket definitions.")
return
for ticket_data in data:
# Construct Ticket object, using defaults for optional fields
ticket = Ticket(
id=ticket_data["id"],
description=ticket_data["description"],
status=ticket_data.get("status", "todo"),
assigned_to=ticket_data.get("assigned_to", "unassigned"),
depends_on=ticket_data.get("depends_on", []),
step_mode=ticket_data.get("step_mode", False)
)
self.track.tickets.append(ticket)
# Rebuild DAG and Engine after parsing new tickets
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=self.engine.auto_queue)
except json.JSONDecodeError as e:
print(f"Error parsing JSON tickets: {e}")
except KeyError as e:
print(f"Missing required field in ticket definition: {e}")
async def run(self, md_content: str = "") -> None:
"""
Main execution loop using the DAG engine.
Args:
md_content: The full markdown context (history + files) for AI workers.
"""
await self._push_state(status="running", active_tier="Tier 2 (Tech Lead)")
loop = asyncio.get_event_loop()
while True:
# 1. Identify ready tasks
ready_tasks = self.engine.tick()
# 2. Check for completion or blockage
if not ready_tasks:
all_done = all(t.status == "completed" for t in self.track.tickets)
if all_done:
print("Track completed successfully.")
await self._push_state(status="done", active_tier=None)
else:
# Check if any tasks are in-progress or could be ready
if any(t.status == "in_progress" for t in self.track.tickets):
# Wait for async tasks to complete
await asyncio.sleep(1)
continue
print("No more executable tickets. Track is blocked or finished.")
await self._push_state(status="blocked", active_tier=None)
break
# 3. Process ready tasks
to_run = [t for t in ready_tasks if t.status == "in_progress" or (not t.step_mode and self.engine.auto_queue)]
# Handle those awaiting approval
for ticket in ready_tasks:
if ticket not in to_run and ticket.status == "todo":
print(f"Ticket {ticket.id} is ready and awaiting approval.")
await self._push_state(active_tier=f"Awaiting Approval: {ticket.id}")
await asyncio.sleep(1)
if to_run:
tasks = []
for ticket in to_run:
ticket.status = "in_progress"
print(f"Executing ticket {ticket.id}: {ticket.description}")
await self._push_state(active_tier=f"Tier 3 (Worker): {ticket.id}")
# Escalation logic based on retry_count
models = ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3.1-pro-preview"]
model_idx = min(ticket.retry_count, len(models) - 1)
model_name = models[model_idx]
context = WorkerContext(
ticket_id=ticket.id,
model_name=model_name,
messages=[]
)
context_files = ticket.context_requirements if ticket.context_requirements else None
tasks.append(loop.run_in_executor(
None,
run_worker_lifecycle,
ticket,
context,
context_files,
self.event_queue,
self,
md_content,
loop
))
await asyncio.gather(*tasks)
# 4. Retry and escalation logic
for ticket in to_run:
if ticket.status == 'blocked':
if ticket.get('retry_count', 0) < 2:
ticket.retry_count += 1
ticket.status = 'todo'
print(f"Ticket {ticket.id} BLOCKED. Escalating to {models[min(ticket.retry_count, len(models)-1)]} and retrying...")
await self._push_state(active_tier="Tier 2 (Tech Lead)")
def _queue_put(event_queue: events.AsyncEventQueue, loop: asyncio.AbstractEventLoop, event_name: str, payload) -> None:
"""Thread-safe helper to push an event to the AsyncEventQueue from a worker thread."""
asyncio.run_coroutine_threadsafe(event_queue.put(event_name, payload), loop)
def confirm_execution(payload: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> bool:
"""
Pushes an approval request to the GUI and waits for response.
"""
dialog_container = [None]
task = {
"action": "mma_step_approval",
"ticket_id": ticket_id,
"payload": payload,
"dialog_container": dialog_container
}
if loop:
_queue_put(event_queue, loop, "mma_step_approval", task)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
approved, final_payload = dialog_container[0].wait()
return approved
return False
def confirm_spawn(role: str, prompt: str, context_md: str, event_queue: events.AsyncEventQueue, ticket_id: str, loop: asyncio.AbstractEventLoop = None) -> Tuple[bool, str, str]:
"""
Pushes a spawn approval request to the GUI and waits for response.
Returns (approved, modified_prompt, modified_context)
"""
dialog_container = [None]
task = {
"action": "mma_spawn_approval",
"ticket_id": ticket_id,
"role": role,
"prompt": prompt,
"context_md": context_md,
"dialog_container": dialog_container
}
if loop:
_queue_put(event_queue, loop, "mma_spawn_approval", task)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
# Wait for the GUI to create the dialog and for the user to respond
start = time.time()
while dialog_container[0] is None and time.time() - start < 60:
time.sleep(0.1)
if dialog_container[0]:
res = dialog_container[0].wait()
if isinstance(res, dict):
approved = res.get("approved", False)
abort = res.get("abort", False)
modified_prompt = res.get("prompt", prompt)
modified_context = res.get("context_md", context_md)
return approved and not abort, modified_prompt, modified_context
else:
# Fallback for old tuple style if any
approved, final_payload = res
modified_prompt = prompt
modified_context = context_md
if isinstance(final_payload, dict):
modified_prompt = final_payload.get("prompt", prompt)
modified_context = final_payload.get("context_md", context_md)
return approved, modified_prompt, modified_context
return False, prompt, context_md
def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: List[str] | None = None, event_queue: events.AsyncEventQueue | None = None, engine: Optional['ConductorEngine'] = None, md_content: str = "", loop: asyncio.AbstractEventLoop = None) -> None:
"""
Simulates the lifecycle of a single agent working on a ticket.
Calls the AI client and updates the ticket status based on the response.
Args:
ticket: The ticket to process.
context: The worker context.
context_files: List of files to include in the context.
event_queue: Queue for pushing state updates and receiving approvals.
engine: The conductor engine.
md_content: The markdown context (history + files) for AI workers.
loop: The main asyncio event loop (required for thread-safe queue access).
"""
# Enforce Context Amnesia: each ticket starts with a clean slate.
ai_client.reset_session()
ai_client.set_provider(ai_client.get_provider(), context.model_name)
context_injection = ""
if context_files:
parser = ASTParser(language="python")
for i, file_path in enumerate(context_files):
try:
Path(file_path)
# (This is a bit simplified, but helps)
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
if i == 0:
view = parser.get_curated_view(content)
else:
view = parser.get_skeleton(content)
context_injection += f"\nFile: {file_path}\n{view}\n"
except Exception as e:
context_injection += f"\nError reading {file_path}: {e}\n"
# Build a prompt for the worker
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
)
if context_injection:
user_message += f"\nContext Files:\n{context_injection}\n"
user_message += (
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
# HITL Clutch: call confirm_spawn if event_queue is provided
if event_queue:
approved, modified_prompt, modified_context = confirm_spawn(
role="Tier 3 Worker",
prompt=user_message,
context_md=md_content,
event_queue=event_queue,
ticket_id=ticket.id,
loop=loop
)
if not approved:
ticket.mark_blocked("Spawn rejected by user.")
return "BLOCKED: Spawn rejected by user."
user_message = modified_prompt
md_content = modified_context
# HITL Clutch: pass the queue and ticket_id to confirm_execution
def clutch_callback(payload: str) -> bool:
if not event_queue:
return True
return confirm_execution(payload, event_queue, ticket.id, loop=loop)
def stream_callback(chunk: str) -> None:
if event_queue and loop:
_queue_put(event_queue, loop, 'mma_stream', {'stream_id': f'Tier 3 (Worker): {ticket.id}', 'text': chunk})
old_comms_cb = ai_client.comms_log_callback
def worker_comms_callback(entry: dict) -> None:
if event_queue and loop:
kind = entry.get("kind")
payload = entry.get("payload", {})
chunk = ""
if kind == "tool_call":
chunk = f"\n\n[TOOL CALL] {payload.get('name')}\n{json.dumps(payload.get('script') or payload.get('args'))}\n"
elif kind == "tool_result":
res = str(payload.get("output", ""))
if len(res) > 500: res = res[:500] + "... (truncated)"
chunk = f"\n[TOOL RESULT]\n{res}\n"
if chunk:
_queue_put(event_queue, loop, "response", {"text": chunk, "stream_id": f"Tier 3 (Worker): {ticket.id}", "status": "streaming..."})
if old_comms_cb:
old_comms_cb(entry)
ai_client.comms_log_callback = worker_comms_callback
ai_client.current_tier = "Tier 3"
try:
comms_baseline = len(ai_client.get_comms_log())
response = ai_client.send(
md_content=md_content,
user_message=user_message,
base_dir=".",
pre_tool_callback=clutch_callback if ticket.step_mode else None,
qa_callback=ai_client.run_tier4_analysis,
stream_callback=stream_callback
)
finally:
ai_client.comms_log_callback = old_comms_cb
ai_client.current_tier = None
if event_queue:
# Push via "response" event type — _process_event_queue wraps this
# as {"action": "handle_ai_response", "payload": ...} for the GUI.
try:
response_payload = {
"text": response,
"stream_id": f"Tier 3 (Worker): {ticket.id}",
"status": "done"
}
print(f"[MMA] Pushing Tier 3 response for {ticket.id}, loop={'present' if loop else 'NONE'}, stream_id={response_payload['stream_id']}")
if loop:
_queue_put(event_queue, loop, "response", response_payload)
else:
raise RuntimeError("loop is required for thread-safe event queue access")
except Exception as e:
print(f"[MMA] ERROR pushing response to UI: {e}\n{traceback.format_exc()}")
# Update usage in engine if provided
if engine:
_new_comms = ai_client.get_comms_log()[comms_baseline:]
_resp_entries = [e for e in _new_comms if e.get("direction") == "IN" and e.get("kind") == "response"]
_in_tokens = sum(e.get("payload", {}).get("usage", {}).get("input_tokens", 0) for e in _resp_entries)
_out_tokens = sum(e.get("payload", {}).get("usage", {}).get("output_tokens", 0) for e in _resp_entries)
engine.tier_usage["Tier 3"]["input"] += _in_tokens
engine.tier_usage["Tier 3"]["output"] += _out_tokens
if "BLOCKED" in response.upper():
ticket.mark_blocked(response)
else:
ticket.mark_complete()
return response

116
src/orchestrator_pm.py Normal file
View File

@@ -0,0 +1,116 @@
import json
import ai_client
import mma_prompts
import aggregate
import summarize
from pathlib import Path
from typing import Any, Optional
CONDUCTOR_PATH: Path = Path("conductor")
def get_track_history_summary() -> str:
"""
Scans conductor/archive/ and conductor/tracks/ to build a summary of past work.
"""
summary_parts = []
archive_path = CONDUCTOR_PATH / "archive"
tracks_path = CONDUCTOR_PATH / "tracks"
paths_to_scan = []
if archive_path.exists():
paths_to_scan.extend(list(archive_path.iterdir()))
if tracks_path.exists():
paths_to_scan.extend(list(tracks_path.iterdir()))
for track_dir in paths_to_scan:
if not track_dir.is_dir():
continue
metadata_file = track_dir / "metadata.json"
spec_file = track_dir / "spec.md"
title = track_dir.name
status = "unknown"
overview = "No overview available."
if metadata_file.exists():
try:
with open(metadata_file, "r", encoding="utf-8") as f:
meta = json.load(f)
title = meta.get("title", title)
status = meta.get("status", status)
except Exception:
pass
if spec_file.exists():
try:
with open(spec_file, "r", encoding="utf-8") as f:
content = f.read()
# Basic extraction of Overview section if it exists
if "## Overview" in content:
overview = content.split("## Overview")[1].split("##")[0].strip()
else:
# Just take a snippet of the beginning
overview = content[:200] + "..."
except Exception:
pass
summary_parts.append(f"Track: {title}\nStatus: {status}\nOverview: {overview}\n---")
if not summary_parts:
return "No previous tracks found."
return "\n".join(summary_parts)
def generate_tracks(user_request: str, project_config: dict[str, Any], file_items: list[dict[str, Any]], history_summary: Optional[str] = None) -> list[dict[str, Any]]:
"""
Tier 1 (Strategic PM) call.
Analyzes the project state and user request to generate a list of Tracks.
"""
# 1. Build Repository Map (Summary View)
repo_map = summarize.build_summary_markdown(file_items)
# 2. Construct Prompt
system_prompt = mma_prompts.PROMPTS.get("tier1_epic_init")
user_message_parts = [
f"### USER REQUEST:\n{user_request}\n",
f"### REPOSITORY MAP:\n{repo_map}\n"
]
if history_summary:
user_message_parts.append(f"### TRACK HISTORY:\n{history_summary}\n")
user_message_parts.append("Please generate the implementation tracks for this request.")
user_message = "\n".join(user_message_parts)
# Set custom system prompt for this call
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt or "")
try:
# 3. Call Tier 1 Model (Strategic - Pro)
# Note: We use gemini-1.5-pro or similar high-reasoning model for Tier 1
response = ai_client.send(
md_content="", # We pass everything in user_message for clarity
user_message=user_message,
enable_tools=False,
)
# 4. Parse JSON Output
try:
# The prompt asks for a JSON array. We need to extract it if the AI added markdown blocks.
json_match = response.strip()
if "```json" in json_match:
json_match = json_match.split("```json")[1].split("```")[0].strip()
elif "```" in json_match:
json_match = json_match.split("```")[1].split("```")[0].strip()
tracks: list[dict[str, Any]] = json.loads(json_match)
# Ensure each track has a 'title' for the GUI
for t in tracks:
if "title" not in t:
t["title"] = t.get("goal", "Untitled Track")[:50]
return tracks
except Exception as e:
print(f"Error parsing Tier 1 response: {e}")
print(f"Raw response: {response}")
return []
finally:
# Restore old system prompt
ai_client.set_custom_system_prompt(old_system_prompt or "")
if __name__ == "__main__":
# Quick CLI test
import project_manager
proj = project_manager.load_project("manual_slop.toml")
flat = project_manager.flat_config(proj)
file_items = aggregate.build_file_items(Path("."), flat.get("files", {}).get("paths", []))
print("Testing Tier 1 Track Generation...")
history = get_track_history_summary()
tracks = generate_tracks("Implement a basic unit test for the ai_client.py module.", flat, file_items, history_summary=history)
print(json.dumps(tracks, indent=2))

56
src/outline_tool.py Normal file
View File

@@ -0,0 +1,56 @@
import ast
from pathlib import Path
class CodeOutliner:
def __init__(self) -> None:
pass
def outline(self, code: str) -> str:
code = code.lstrip(chr(0xFEFF))
try:
tree = ast.parse(code)
except SyntaxError as e:
return f"ERROR parsing code: {e}"
output = []
def get_docstring(node: ast.AST) -> str | None:
if isinstance(node, (ast.AsyncFunctionDef, ast.FunctionDef, ast.ClassDef, ast.Module)):
doc = ast.get_docstring(node)
if doc:
return doc.splitlines()[0]
return None
def walk(node: ast.AST, indent: int = 0) -> None:
if isinstance(node, ast.ClassDef):
start_line = node.lineno
end_line = getattr(node, "end_lineno", start_line)
output.append(f"{' ' * indent}[Class] {node.name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for item in node.body:
walk(item, indent + 1)
elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
start_line = node.lineno
end_line = getattr(node, "end_lineno", start_line)
prefix = "[Async Func]" if isinstance(node, ast.AsyncFunctionDef) else "[Func]"
# Check if it's a method
# We can check the indent or the parent, but in AST walk we know if we are inside a ClassDef
# Let's use a simpler heuristic for the outline: if indent > 0, it's likely a method.
if indent > 0:
prefix = "[Method]"
output.append(f"{' ' * indent}{prefix} {node.name} (Lines {start_line}-{end_line})")
doc = get_docstring(node)
if doc:
output.append(f"{' ' * (indent + 1)}\"\"\"{doc}\"\"\"")
for node in tree.body:
walk(node)
return "\n".join(output)
def get_outline(path: Path, code: str) -> str:
suffix = path.suffix.lower()
if suffix == ".py":
outliner = CodeOutliner()
return outliner.outline(code)
else:
return f"Outlining not supported for {suffix} files yet."

124
src/performance_monitor.py Normal file
View File

@@ -0,0 +1,124 @@
from __future__ import annotations
import time
import psutil
import threading
from typing import Any, Optional, Callable
class PerformanceMonitor:
def __init__(self) -> None:
self._start_time: Optional[float] = None
self._last_frame_time: float = 0.0
self._fps: float = 0.0
self._last_calculated_fps: float = 0.0
self._frame_count: int = 0
self._total_frame_count: int = 0
self._fps_last_time: float = time.time()
self._process: psutil.Process = psutil.Process()
self._cpu_usage: float = 0.0
self._cpu_lock: threading.Lock = threading.Lock()
# Input lag tracking
self._last_input_time: Optional[float] = None
self._input_lag_ms: float = 0.0
# Alerts
self.alert_callback: Optional[Callable[[str], None]] = None
self.thresholds: dict[str, float] = {
'frame_time_ms': 33.3, # < 30 FPS
'cpu_percent': 80.0,
'input_lag_ms': 100.0
}
self._last_alert_time: float = 0.0
self._alert_cooldown: int = 30 # seconds
# Detailed profiling
self._component_timings: dict[str, float] = {}
self._comp_start: dict[str, float] = {}
# Start CPU usage monitoring thread
self._stop_event: threading.Event = threading.Event()
self._cpu_thread: threading.Thread = threading.Thread(target=self._monitor_cpu, daemon=True)
self._cpu_thread.start()
def _monitor_cpu(self) -> None:
while not self._stop_event.is_set():
# psutil.cpu_percent with interval=1.0 is blocking for 1 second.
# To be responsive to stop_event, we use a smaller interval or no interval
# and handle the timing ourselves.
try:
usage = self._process.cpu_percent()
with self._cpu_lock:
self._cpu_usage = usage
except Exception:
pass
# Sleep in small increments to stay responsive to stop_event
for _ in range(10):
if self._stop_event.is_set():
break
time.sleep(0.1)
def start_frame(self) -> None:
self._start_time = time.time()
def record_input_event(self) -> None:
self._last_input_time = time.time()
def start_component(self, name: str) -> None:
self._comp_start[name] = time.time()
def end_component(self, name: str) -> None:
if name in self._comp_start:
elapsed = (time.time() - self._comp_start[name]) * 1000.0
self._component_timings[name] = elapsed
def end_frame(self) -> None:
if self._start_time is None:
return
end_time = time.time()
self._last_frame_time = (end_time - self._start_time) * 1000.0
self._frame_count += 1
self._total_frame_count += 1
# Calculate input lag if an input occurred during this frame
if self._last_input_time is not None:
self._input_lag_ms = (end_time - self._last_input_time) * 1000.0
self._last_input_time = None
self._check_alerts()
elapsed_since_fps = end_time - self._fps_last_time
if elapsed_since_fps >= 1.0:
self._fps = self._frame_count / elapsed_since_fps
self._last_calculated_fps = self._fps
self._frame_count = 0
self._fps_last_time = end_time
def _check_alerts(self) -> None:
if not self.alert_callback:
return
now = time.time()
if now - self._last_alert_time < self._alert_cooldown:
return
metrics = self.get_metrics()
alerts = []
if metrics['last_frame_time_ms'] > self.thresholds['frame_time_ms']:
alerts.append(f"Frame time high: {metrics['last_frame_time_ms']:.1f}ms")
if metrics['cpu_percent'] > self.thresholds['cpu_percent']:
alerts.append(f"CPU usage high: {metrics['cpu_percent']:.1f}%")
if metrics['input_lag_ms'] > self.thresholds['input_lag_ms']:
alerts.append(f"Input lag high: {metrics['input_lag_ms']:.1f}ms")
if alerts:
self._last_alert_time = now
self.alert_callback("; ".join(alerts))
def get_metrics(self) -> dict[str, Any]:
with self._cpu_lock:
cpu_usage = self._cpu_usage
metrics: dict[str, Any] = {
'last_frame_time_ms': self._last_frame_time,
'fps': self._last_calculated_fps,
'cpu_percent': cpu_usage,
'total_frames': self._total_frame_count,
'input_lag_ms': self._input_lag_ms
}
# Add detailed timings
for name, elapsed in self._component_timings.items():
metrics[f'time_{name}_ms'] = elapsed
return metrics
def stop(self) -> None:
self._stop_event.set()
self._cpu_thread.join(timeout=2.0)

353
src/project_manager.py Normal file
View File

@@ -0,0 +1,353 @@
# project_manager.py
"""
Note(Gemini):
Handles loading/saving of project .toml configurations.
Also handles serializing the discussion history into the TOML format using a special
@timestamp prefix to preserve the exact sequence of events.
"""
import subprocess
import datetime
import tomllib
import tomli_w
import re
import json
from typing import Any, Optional, TYPE_CHECKING, Union
from pathlib import Path
if TYPE_CHECKING:
from models import TrackState
TS_FMT: str = "%Y-%m-%dT%H:%M:%S"
def now_ts() -> str:
return datetime.datetime.now().strftime(TS_FMT)
def parse_ts(s: str) -> Optional[datetime.datetime]:
try:
return datetime.datetime.strptime(s, TS_FMT)
except Exception:
return None
# ── entry serialisation ──────────────────────────────────────────────────────
def entry_to_str(entry: dict[str, Any]) -> str:
"""Serialise a disc entry dict -> stored string."""
ts = entry.get("ts", "")
role = entry.get("role", "User")
content = entry.get("content", "")
if ts:
return f"@{ts}\n{role}:\n{content}"
return f"{role}:\n{content}"
def str_to_entry(raw: str, roles: list[str]) -> dict[str, Any]:
"""Parse a stored string back to a disc entry dict."""
ts = ""
rest = raw
if rest.startswith("@"):
nl = rest.find("\n")
if nl != -1:
ts = rest[1:nl]
rest = rest[nl + 1:]
known = roles or ["User", "AI", "Vendor API", "System"]
role_pat = re.compile(
r"^(?:\[)?(" + "|".join(re.escape(r) for r in known) + r")(?:\])?:?\s*$",
re.IGNORECASE,
)
parts = rest.split("\n", 1)
matched_role = "User"
content = rest.strip()
if parts:
m = role_pat.match(parts[0].strip())
if m:
raw_role = m.group(1)
matched_role = next((r for r in known if r.lower() == raw_role.lower()), raw_role)
content = parts[1].strip() if len(parts) > 1 else ""
return {"role": matched_role, "content": content, "collapsed": False, "ts": ts}
# ── git helpers ──────────────────────────────────────────────────────────────
def get_git_commit(git_dir: str) -> str:
try:
r = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True, text=True, cwd=git_dir, timeout=5,
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
def get_git_log(git_dir: str, n: int = 5) -> str:
try:
r = subprocess.run(
["git", "log", "--oneline", f"-{n}"],
capture_output=True, text=True, cwd=git_dir, timeout=5,
)
return r.stdout.strip() if r.returncode == 0 else ""
except Exception:
return ""
# ── default structures ───────────────────────────────────────────────────────
def default_discussion() -> dict[str, Any]:
return {"git_commit": "", "last_updated": now_ts(), "history": []}
def default_project(name: str = "unnamed") -> dict[str, Any]:
return {
"project": {"name": name, "git_dir": "", "system_prompt": "", "main_context": ""},
"output": {"output_dir": "./md_gen"},
"files": {"base_dir": ".", "paths": [], "tier_assignments": {}},
"screenshots": {"base_dir": ".", "paths": []},
"gemini_cli": {"binary_path": "gemini"},
"deepseek": {"reasoning_effort": "medium"},
"agent": {
"tools": {
"run_powershell": True,
"read_file": True,
"list_directory": True,
"search_files": True,
"get_file_summary": True,
"web_search": True,
"fetch_url": True,
"py_get_skeleton": True,
"py_get_code_outline": True,
"get_file_slice": True,
"py_get_definition": True,
"py_get_signature": True,
"py_get_class_summary": True,
"py_get_var_declaration": True,
"get_git_diff": True,
"py_find_usages": True,
"py_get_imports": True,
"py_check_syntax": True,
"py_get_hierarchy": True,
"py_get_docstring": True,
"get_tree": True,
"get_ui_performance": True,
"set_file_slice": False,
"py_update_definition": False,
"py_set_signature": False,
"py_set_var_declaration": False,
}
},
"discussion": {
"roles": ["User", "AI", "Vendor API", "System", "Reasoning"],
"active": "main",
"discussions": {"main": default_discussion()},
},
"mma": {
"epic": "",
"active_track_id": "",
"tracks": []
}
}
# ── load / save ──────────────────────────────────────────────────────────────
def get_history_path(project_path: Union[str, Path]) -> Path:
"""Return the Path to the sibling history TOML file for a given project."""
p = Path(project_path)
return p.parent / f"{p.stem}_history.toml"
def load_project(path: Union[str, Path]) -> dict[str, Any]:
"""
Load a project TOML file.
Automatically migrates legacy 'discussion' keys to a sibling history file.
"""
with open(path, "rb") as f:
proj = tomllib.load(f)
hist_path = get_history_path(path)
if "discussion" in proj:
disc = proj.pop("discussion")
with open(hist_path, "wb") as f:
tomli_w.dump(disc, f)
save_project(proj, path)
proj["discussion"] = disc
else:
if hist_path.exists():
proj["discussion"] = load_history(path)
return proj
def load_history(project_path: Union[str, Path]) -> dict[str, Any]:
"""Load the segregated discussion history from its dedicated TOML file."""
hist_path = get_history_path(project_path)
if hist_path.exists():
with open(hist_path, "rb") as f:
return tomllib.load(f)
return {}
def clean_nones(data: Any) -> Any:
"""Recursively remove None values from a dictionary/list."""
if isinstance(data, dict):
return {k: clean_nones(v) for k, v in data.items() if v is not None}
elif isinstance(data, list):
return [clean_nones(v) for v in data if v is not None]
return data
def save_project(proj: dict[str, Any], path: Union[str, Path], disc_data: Optional[dict[str, Any]] = None) -> None:
"""
Save the project TOML.
If 'discussion' is present in proj, it is moved to the sibling history file.
"""
proj = clean_nones(proj)
if "discussion" in proj:
if disc_data is None:
disc_data = proj["discussion"]
proj = dict(proj)
del proj["discussion"]
with open(path, "wb") as f:
tomli_w.dump(proj, f)
if disc_data:
disc_data = clean_nones(disc_data)
hist_path = get_history_path(path)
with open(hist_path, "wb") as f:
tomli_w.dump(disc_data, f)
# ── migration helper ─────────────────────────────────────────────────────────
def migrate_from_legacy_config(cfg: dict[str, Any]) -> dict[str, Any]:
"""Build a fresh project dict from a legacy flat config.toml. Does NOT save."""
name = cfg.get("output", {}).get("namespace", "project")
proj = default_project(name)
for key in ("output", "files", "screenshots"):
if key in cfg:
proj[key] = dict(cfg[key])
disc = cfg.get("discussion", {})
proj["discussion"]["roles"] = disc.get("roles", ["User", "AI", "Vendor API", "System"])
main_disc = proj["discussion"]["discussions"]["main"]
main_disc["history"] = disc.get("history", [])
main_disc["last_updated"] = now_ts()
return proj
# ── flat config for aggregate.run() ─────────────────────────────────────────
def flat_config(proj: dict[str, Any], disc_name: Optional[str] = None, track_id: Optional[str] = None) -> dict[str, Any]:
"""Return a flat config dict compatible with aggregate.run()."""
disc_sec = proj.get("discussion", {})
if track_id:
history = load_track_history(track_id, proj.get("files", {}).get("base_dir", "."))
else:
name = disc_name or disc_sec.get("active", "main")
disc_data = disc_sec.get("discussions", {}).get(name, {})
history = disc_data.get("history", [])
return {
"project": proj.get("project", {}),
"output": proj.get("output", {}),
"files": proj.get("files", {}),
"screenshots": proj.get("screenshots", {}),
"discussion": {
"roles": disc_sec.get("roles", []),
"history": history,
},
}
# ── track state persistence ─────────────────────────────────────────────────
def save_track_state(track_id: str, state: 'TrackState', base_dir: Union[str, Path] = ".") -> None:
"""
Saves a TrackState object to conductor/tracks/<track_id>/state.toml.
"""
track_dir = Path(base_dir) / "conductor" / "tracks" / track_id
track_dir.mkdir(parents=True, exist_ok=True)
state_file = track_dir / "state.toml"
data = clean_nones(state.to_dict())
with open(state_file, "wb") as f:
tomli_w.dump(data, f)
def load_track_state(track_id: str, base_dir: Union[str, Path] = ".") -> Optional['TrackState']:
"""
Loads a TrackState object from conductor/tracks/<track_id>/state.toml.
"""
from models import TrackState
state_file = Path(base_dir) / "conductor" / "tracks" / track_id / "state.toml"
if not state_file.exists():
return None
with open(state_file, "rb") as f:
data = tomllib.load(f)
return TrackState.from_dict(data)
def load_track_history(track_id: str, base_dir: Union[str, Path] = ".") -> list[str]:
"""
Loads the discussion history for a specific track from its state.toml.
Returns a list of entry strings formatted with @timestamp.
"""
state = load_track_state(track_id, base_dir)
if not state:
return []
history: list[str] = []
for entry in state.discussion:
e = dict(entry)
ts = e.get("ts")
if isinstance(ts, datetime.datetime):
e["ts"] = ts.strftime(TS_FMT)
history.append(entry_to_str(e))
return history
def save_track_history(track_id: str, history: list[str], base_dir: Union[str, Path] = ".") -> None:
"""
Saves the discussion history for a specific track to its state.toml.
'history' is expected to be a list of formatted strings.
"""
state = load_track_state(track_id, base_dir)
if not state:
return
roles = ["User", "AI", "Vendor API", "System", "Reasoning"]
entries = [str_to_entry(h, roles) for h in history]
state.discussion = entries
save_track_state(track_id, state, base_dir)
def get_all_tracks(base_dir: Union[str, Path] = ".") -> list[dict[str, Any]]:
"""
Scans the conductor/tracks/ directory and returns a list of dictionaries
containing track metadata: 'id', 'title', 'status', 'complete', 'total',
and 'progress' (0.0 to 1.0).
Handles missing or malformed metadata.json or state.toml by falling back
to available info or defaults.
"""
tracks_dir = Path(base_dir) / "conductor" / "tracks"
if not tracks_dir.exists():
return []
results: list[dict[str, Any]] = []
for entry in tracks_dir.iterdir():
if not entry.is_dir():
continue
track_id = entry.name
track_info: dict[str, Any] = {
"id": track_id,
"title": track_id,
"status": "unknown",
"complete": 0,
"total": 0,
"progress": 0.0
}
state_found = False
try:
state = load_track_state(track_id, base_dir)
if state:
track_info["id"] = state.metadata.id or track_id
track_info["title"] = state.metadata.name or track_id
track_info["status"] = state.metadata.status or "unknown"
track_info["complete"] = len([t for t in state.tasks if t.status == "completed"])
track_info["total"] = len(state.tasks)
if track_info["total"] > 0:
track_info["progress"] = track_info["complete"] / track_info["total"]
state_found = True
except Exception:
pass
if not state_found:
metadata_file = entry / "metadata.json"
if metadata_file.exists():
try:
with open(metadata_file, "r") as f:
data = json.load(f)
track_info["id"] = data.get("id", data.get("track_id", track_id))
track_info["title"] = data.get("title", data.get("name", data.get("description", track_id)))
track_info["status"] = data.get("status", "unknown")
except Exception:
pass
if track_info["total"] == 0:
plan_file = entry / "plan.md"
if plan_file.exists():
try:
with open(plan_file, "r", encoding="utf-8") as f:
content = f.read()
tasks = re.findall(r"^[ \t]*- \[[ x~]\] .*", content, re.MULTILINE)
completed_tasks = re.findall(r"^[ \t]*- \[x\] .*", content, re.MULTILINE)
track_info["total"] = len(tasks)
track_info["complete"] = len(completed_tasks)
if track_info["total"] > 0:
track_info["progress"] = float(track_info["complete"]) / track_info["total"]
except Exception:
pass
results.append(track_info)
return results

186
src/session_logger.py Normal file
View File

@@ -0,0 +1,186 @@
# session_logger.py
"""
Opens timestamped log/script files at startup and keeps them open for the
lifetime of the process. The next run of the GUI creates new files; the
previous run's files are simply closed when the process exits.
File layout
-----------
logs/sessions/
comms_<ts>.log - every comms entry (direction/kind/payload) as JSON-L
toolcalls_<ts>.log - sequential record of every tool invocation
clicalls_<ts>.log - sequential record of every CLI subprocess call
scripts/generated/
<ts>_<seq:04d>.ps1 - each PowerShell script the AI generated, in order
Where <ts> = YYYYMMDD_HHMMSS of when this session was started.
"""
import atexit
import datetime
import json
import threading
from typing import Any, Optional, TextIO
from pathlib import Path
_LOG_DIR: Path = Path("./logs/sessions")
_SCRIPTS_DIR: Path = Path("./scripts/generated")
_ts: str = "" # session timestamp string e.g. "20260301_142233"
_session_id: str = "" # YYYYMMDD_HHMMSS[_Label]
_session_dir: Optional[Path] = None # Path to the sub-directory for this session
_seq: int = 0 # monotonic counter for script files this session
_seq_lock: threading.Lock = threading.Lock()
_comms_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/comms.log
_tool_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/toolcalls.log
_api_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/apihooks.log
_cli_fh: Optional[TextIO] = None # file handle: logs/sessions/<session_id>/clicalls.log
def _now_ts() -> str:
return datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def open_session(label: Optional[str] = None) -> None:
"""
Called once at GUI startup. Creates the log directories if needed and
opens the log files for this session within a sub-directory.
"""
global _ts, _session_id, _session_dir, _comms_fh, _tool_fh, _api_fh, _cli_fh, _seq
if _comms_fh is not None:
return
_ts = _now_ts()
_session_id = _ts
if label:
safe_label = "".join(c if c.isalnum() or c in ("-", "_") else "_" for c in label)
_session_id += f"_{safe_label}"
_session_dir = _LOG_DIR / _session_id
_session_dir.mkdir(parents=True, exist_ok=True)
_SCRIPTS_DIR.mkdir(parents=True, exist_ok=True)
_seq = 0
_comms_fh = open(_session_dir / "comms.log", "w", encoding="utf-8", buffering=1)
_tool_fh = open(_session_dir / "toolcalls.log", "w", encoding="utf-8", buffering=1)
_api_fh = open(_session_dir / "apihooks.log", "w", encoding="utf-8", buffering=1)
_cli_fh = open(_session_dir / "clicalls.log", "w", encoding="utf-8", buffering=1)
_tool_fh.write(f"# Tool-call log — session {_session_id}\n\n")
_tool_fh.flush()
_cli_fh.write(f"# CLI Subprocess Call Log — session {_session_id}\n\n")
_cli_fh.flush()
try:
from log_registry import LogRegistry
registry = LogRegistry(str(_LOG_DIR / "log_registry.toml"))
registry.register_session(_session_id, str(_session_dir), datetime.datetime.now())
except Exception as e:
print(f"Warning: Could not register session in LogRegistry: {e}")
atexit.register(close_session)
def close_session() -> None:
"""Flush and close all log files. Called on clean exit."""
global _comms_fh, _tool_fh, _api_fh, _cli_fh, _session_id, _LOG_DIR
if _comms_fh is None:
return
if _comms_fh:
_comms_fh.close()
_comms_fh = None
if _tool_fh:
_tool_fh.close()
_tool_fh = None
if _api_fh:
_api_fh.close()
_api_fh = None
if _cli_fh:
_cli_fh.close()
_cli_fh = None
try:
from log_registry import LogRegistry
registry = LogRegistry(str(_LOG_DIR / "log_registry.toml"))
registry.update_auto_whitelist_status(_session_id)
except Exception as e:
print(f"Warning: Could not update auto-whitelist on close: {e}")
def log_api_hook(method: str, path: str, payload: str) -> None:
"""Log an API hook invocation."""
if _api_fh is None:
return
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
try:
_api_fh.write(f"[{ts_entry}] {method} {path} - Payload: {payload}\n")
_api_fh.flush()
except Exception:
pass
def log_comms(entry: dict[str, Any]) -> None:
"""
Append one comms entry to the comms log file as a JSON-L line.
Thread-safe (GIL + line-buffered file).
"""
if _comms_fh is None:
return
try:
_comms_fh.write(json.dumps(entry, ensure_ascii=False, default=str) + "\n")
except Exception:
pass
def log_tool_call(script: str, result: str, script_path: Optional[str]) -> Optional[str]:
"""
Append a tool-call record to the toolcalls log and write the PS1 script to
scripts/generated/. Returns the path of the written script file.
"""
global _seq
if _tool_fh is None:
return script_path
with _seq_lock:
_seq += 1
seq = _seq
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
ps1_name = f"{_ts}_{seq:04d}.ps1"
ps1_path: Optional[Path] = _SCRIPTS_DIR / ps1_name
try:
if ps1_path:
ps1_path.write_text(script, encoding="utf-8")
except Exception as exc:
ps1_path = None
ps1_name = f"(write error: {exc})"
try:
_tool_fh.write(
f"## Call #{seq} [{ts_entry}]\n"
f"Script file: {ps1_path}\n\n"
f"### Result\n\n"
f"```\n{result}\n```\n\n"
f"---\n\n"
)
_tool_fh.flush()
except Exception:
pass
return str(ps1_path) if ps1_path else None
def log_cli_call(command: str, stdin_content: Optional[str], stdout_content: Optional[str], stderr_content: Optional[str], latency: float) -> None:
"""Log details of a CLI subprocess execution."""
if _cli_fh is None:
return
ts_entry = datetime.datetime.now().strftime("%H:%M:%S")
try:
log_data = {
"timestamp": ts_entry,
"command": command,
"stdin": stdin_content,
"stdout": stdout_content,
"stderr": stderr_content,
"latency_sec": latency
}
_cli_fh.write(json.dumps(log_data, ensure_ascii=False, default=str) + "\n")
_cli_fh.flush()
except Exception:
pass

83
src/shell_runner.py Normal file
View File

@@ -0,0 +1,83 @@
# shell_runner.py
import os
import subprocess
import shutil
from pathlib import Path
from typing import Callable, Optional
try:
import tomllib
except ImportError:
import tomli as tomllib # type: ignore[no-redef]
TIMEOUT_SECONDS: int = 60
_ENV_CONFIG: dict = {}
def _load_env_config() -> dict:
"""Load mcp_env.toml from project root (sibling of this file or parent dir)."""
candidates = [
Path(__file__).parent / "mcp_env.toml",
Path(__file__).parent.parent / "mcp_env.toml",
]
for p in candidates:
if p.exists():
with open(p, "rb") as f:
return tomllib.load(f)
return {}
def _build_subprocess_env() -> dict[str, str]:
"""Build env dict for subprocess: current env + mcp_env.toml overrides."""
global _ENV_CONFIG
if not _ENV_CONFIG:
_ENV_CONFIG = _load_env_config()
env = os.environ.copy()
# Apply [path].prepend entries
prepend_dirs = _ENV_CONFIG.get("path", {}).get("prepend", [])
if prepend_dirs:
env["PATH"] = os.pathsep.join(prepend_dirs) + os.pathsep + env.get("PATH", "")
# Apply [env] key-value pairs, expanding ${VAR} references
for key, val in _ENV_CONFIG.get("env", {}).items():
env[key] = os.path.expandvars(str(val))
return env
def run_powershell(script: str, base_dir: str, qa_callback: Optional[Callable[[str], str]] = None) -> str:
"""
Run a PowerShell script with working directory set to base_dir.
Returns a string combining stdout, stderr, and exit code.
Environment is configured via mcp_env.toml (project root).
If qa_callback is provided and the command fails or has stderr,
the callback is called with the stderr content and its result is appended.
"""
safe_dir: str = str(base_dir).replace("'", "''")
full_script: str = f"Set-Location -LiteralPath '{safe_dir}'\n{script}"
exe: Optional[str] = next((x for x in ["powershell.exe", "pwsh.exe", "powershell", "pwsh"] if shutil.which(x)), None)
if not exe: return "ERROR: Neither powershell nor pwsh found in PATH"
try:
process = subprocess.Popen(
[exe, "-NoProfile", "-NonInteractive", "-Command", full_script],
stdin=subprocess.DEVNULL,
stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True,
cwd=base_dir, env=_build_subprocess_env(),
)
stdout, stderr = process.communicate(timeout=TIMEOUT_SECONDS)
parts: list[str] = []
if stdout.strip(): parts.append(f"STDOUT:\n{stdout.strip()}")
if stderr.strip(): parts.append(f"STDERR:\n{stderr.strip()}")
parts.append(f"EXIT CODE: {process.returncode}")
if (process.returncode != 0 or stderr.strip()) and qa_callback:
qa_analysis: Optional[str] = qa_callback(stderr.strip())
if qa_analysis:
parts.append(f"\nQA ANALYSIS:\n{qa_analysis}")
return "\n".join(parts)
except subprocess.TimeoutExpired:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
return f"ERROR: timed out after {TIMEOUT_SECONDS}s"
except KeyboardInterrupt:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
raise
except Exception as e:
if 'process' in locals() and process:
subprocess.run(["taskkill", "/F", "/T", "/PID", str(process.pid)], capture_output=True)
return f"ERROR: {e}"

192
src/summarize.py Normal file
View File

@@ -0,0 +1,192 @@
# summarize.py
"""
Note(Gemini):
Local heuristic summariser. Doesn't use any AI or network.
Uses Python's AST to reliably pull out classes, methods, and functions.
Regex is used for TOML and Markdown.
The rationale here is simple: giving the AI the *structure* of a codebase is 90%
as good as giving it the full source, but costs 1% of the tokens.
If it needs the full source of a file after reading the summary, it can just call read_file.
"""
# summarize.py
"""
Local symbolic summariser — no AI calls, no network.
For each file, extracts structural information:
.py : imports, classes (with methods), top-level functions, global constants
.toml : top-level table keys + array lengths
.md : headings (h1-h3)
other : line count + first 8 lines as preview
Returns a compact markdown string per file, suitable for use as a low-token
context block that replaces full file contents in the initial <context> send.
"""
import ast
import re
from pathlib import Path
from typing import Callable, Any
# ------------------------------------------------------------------ per-type extractors
def _summarise_python(path: Path, content: str) -> str:
lines = content.splitlines()
line_count = len(lines)
parts = [f"**Python** — {line_count} lines"]
try:
tree = ast.parse(content.lstrip(chr(0xFEFF)), filename=str(path))
except SyntaxError as e:
parts.append(f"_Parse error: {e}_")
return "\n".join(parts)
# Imports
imports = []
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name.split(".")[0])
elif isinstance(node, ast.ImportFrom):
if node.module:
imports.append(node.module.split(".")[0])
if imports:
unique_imports = sorted(set(imports))
parts.append(f"imports: {', '.join(unique_imports)}")
# Top-level constants (ALL_CAPS assignments)
constants = []
for node in ast.iter_child_nodes(tree):
if isinstance(node, ast.Assign):
for t in node.targets:
if isinstance(t, ast.Name) and t.id.isupper():
constants.append(t.id)
elif isinstance(node, (ast.AnnAssign,)):
if isinstance(node.target, ast.Name) and node.target.id.isupper():
constants.append(node.target.id)
if constants:
parts.append(f"constants: {', '.join(constants)}")
# Classes + their methods
for node in ast.iter_child_nodes(tree):
if isinstance(node, ast.ClassDef):
methods = [
n.name for n in ast.iter_child_nodes(node)
if isinstance(n, (ast.FunctionDef, ast.AsyncFunctionDef))
]
if methods:
parts.append(f"class {node.name}: {', '.join(methods)}")
else:
parts.append(f"class {node.name}")
# Top-level functions
top_fns = [
node.name for node in ast.iter_child_nodes(tree)
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
]
if top_fns:
parts.append(f"functions: {', '.join(top_fns)}")
return "\n".join(parts)
def _summarise_toml(path: Path, content: str) -> str:
lines = content.splitlines()
line_count = len(lines)
parts = [f"**TOML** — {line_count} lines"]
# Extract top-level table headers [key] and [[key]]
table_pat = re.compile(r"^\s*\[{1,2}([^\[\]]+)\]{1,2}")
tables = []
for line in lines:
m = table_pat.match(line)
if m:
tables.append(m.group(1).strip())
if tables:
parts.append(f"tables: {', '.join(tables)}")
# Top-level key = value (not inside a [table])
kv_pat = re.compile(r"^([a-zA-Z_][a-zA-Z0-9_]*)\s*=")
in_table = False
top_keys = []
for line in lines:
if table_pat.match(line):
in_table = True
continue
if not in_table:
m = kv_pat.match(line)
if m:
top_keys.append(m.group(1))
if top_keys:
parts.append(f"top-level keys: {', '.join(top_keys)}")
return "\n".join(parts)
def _summarise_markdown(path: Path, content: str) -> str:
lines = content.splitlines()
line_count = len(lines)
parts = [f"**Markdown** — {line_count} lines"]
headings = []
for line in lines:
m = re.match(r"^(#{1,3})\s+(.+)", line)
if m:
level = len(m.group(1))
text = m.group(2).strip()
indent = " " * (level - 1)
headings.append(f"{indent}{text}")
if headings:
parts.append("headings:\n" + "\n".join(f" {h}" for h in headings))
return "\n".join(parts)
def _summarise_generic(path: Path, content: str) -> str:
lines = content.splitlines()
line_count = len(lines)
suffix = path.suffix.lstrip(".").upper() or "TEXT"
parts = [f"**{suffix}** — {line_count} lines"]
preview = lines[:8]
if preview:
parts.append("preview:\n```\n" + "\n".join(preview) + "\n```")
return "\n".join(parts)
# ------------------------------------------------------------------ dispatch
_SUMMARISERS: dict[str, Callable[[Path, str], str]] = {
".py": _summarise_python,
".toml": _summarise_toml,
".md": _summarise_markdown,
".ini": _summarise_generic,
".txt": _summarise_generic,
".ps1": _summarise_generic,
}
def summarise_file(path: Path, content: str) -> str:
"""
Return a compact markdown summary string for a single file.
`content` is the already-read file text (or an error string).
"""
suffix = path.suffix.lower() if hasattr(path, "suffix") else ""
fn = _SUMMARISERS.get(suffix, _summarise_generic)
try:
return fn(path, content)
except Exception as e:
return f"_Summariser error: {e}_"
def summarise_items(file_items: list[dict[str, Any]]) -> list[dict[str, Any]]:
"""
Given a list of file_item dicts (as returned by aggregate.build_file_items),
return a parallel list of dicts with an added `summary` key.
"""
result = []
for item in file_items:
path = item.get("path")
content = item.get("content", "")
error = item.get("error", False)
if error or path is None:
summary = "_Error reading file_"
else:
p = Path(path) if not isinstance(path, Path) else path
summary = summarise_file(p, content)
result.append({**item, "summary": summary})
return result
def build_summary_markdown(file_items: list[dict[str, Any]]) -> str:
"""
Build a compact markdown string of file summaries, suitable for the
initial <context> block instead of full file contents.
"""
summarised = summarise_items(file_items)
parts = []
for item in summarised:
path = item.get("path") or item.get("entry", "unknown")
summary = item.get("summary", "")
parts.append(f"### `{path}`\n\n{summary}")
return "\n\n---\n\n".join(parts)

390
src/theme.py Normal file
View File

@@ -0,0 +1,390 @@
# theme.py
"""
Theming support for manual_slop GUI.
Palettes
--------
Each palette is a dict mapping semantic names to (R,G,B) or (R,G,B,A) tuples.
The names correspond to dpg theme colour / style constants.
Font handling
-------------
Call apply_font(path, size) to load a TTF and bind it as the global default.
Call set_scale(factor) to set the global font scale (DPI scaling).
Usage
-----
import theme
theme.apply("10x") # apply a named palette
theme.apply_font("C:/Windows/Fonts/CascadiaCode.ttf", 15)
theme.set_scale(1.25)
"""
import dearpygui.dearpygui as dpg
from pathlib import Path
from typing import Any
# ------------------------------------------------------------------ palettes
# Colour key names match the DPG mvThemeCol_* constants (string lookup below).
# Only keys that differ from DPG defaults need to be listed.
_PALETTES: dict[str, dict[str, Any]] = {
"DPG Default": {}, # empty = reset to DPG built-in defaults
"10x Dark": {
# Window / frame chrome
"WindowBg": ( 34, 32, 28),
"ChildBg": ( 30, 28, 24),
"PopupBg": ( 35, 30, 20),
"Border": ( 60, 55, 50),
"BorderShadow": ( 0, 0, 0, 0),
"FrameBg": ( 45, 42, 38),
"FrameBgHovered": ( 60, 56, 50),
"FrameBgActive": ( 75, 70, 62),
# Title bars
"TitleBg": ( 40, 35, 25),
"TitleBgActive": ( 60, 45, 15),
"TitleBgCollapsed": ( 30, 27, 20),
# Menu bar
"MenuBarBg": ( 35, 30, 20),
# Scrollbar
"ScrollbarBg": ( 30, 28, 24),
"ScrollbarGrab": ( 80, 78, 72),
"ScrollbarGrabHovered": (100, 100, 92),
"ScrollbarGrabActive": (120, 118, 110),
# Check marks / radio buttons
"CheckMark": (194, 164, 74),
# Sliders
"SliderGrab": (126, 78, 14),
"SliderGrabActive": (194, 140, 30),
# Buttons
"Button": ( 83, 76, 60),
"ButtonHovered": (126, 78, 14),
"ButtonActive": (115, 90, 70),
# Headers (collapsing headers, selectables, listbox items)
"Header": ( 83, 76, 60),
"HeaderHovered": (126, 78, 14),
"HeaderActive": (115, 90, 70),
# Separator
"Separator": ( 70, 65, 55),
"SeparatorHovered": (126, 78, 14),
"SeparatorActive": (194, 164, 74),
# Resize grip
"ResizeGrip": ( 60, 55, 44),
"ResizeGripHovered": (126, 78, 14),
"ResizeGripActive": (194, 164, 74),
# Tab bar
"Tab": ( 83, 83, 70),
"TabHovered": (126, 77, 25),
"TabActive": (126, 77, 25),
"TabUnfocused": ( 60, 58, 50),
"TabUnfocusedActive": ( 90, 80, 55),
# Docking
"DockingPreview": (126, 78, 14, 180),
"DockingEmptyBg": ( 20, 20, 20),
# Text
"Text": (200, 200, 200),
"TextDisabled": (130, 130, 120),
# Input text cursor / selection
"TextSelectedBg": ( 59, 86, 142, 180),
# Plot / table lines
"TableHeaderBg": ( 55, 50, 38),
"TableBorderStrong": ( 70, 65, 55),
"TableBorderLight": ( 50, 47, 42),
"TableRowBg": ( 0, 0, 0, 0),
"TableRowBgAlt": ( 40, 38, 34, 40),
# Misc
"NavHighlight": (126, 78, 14),
"NavWindowingHighlight":(194, 164, 74, 180),
"NavWindowingDimBg": ( 20, 20, 20, 80),
"ModalWindowDimBg": ( 10, 10, 10, 100),
},
"Nord Dark": {
"WindowBg": ( 36, 41, 49),
"ChildBg": ( 30, 34, 42),
"PopupBg": ( 36, 41, 49),
"Border": ( 59, 66, 82),
"BorderShadow": ( 0, 0, 0, 0),
"FrameBg": ( 46, 52, 64),
"FrameBgHovered": ( 59, 66, 82),
"FrameBgActive": ( 67, 76, 94),
"TitleBg": ( 36, 41, 49),
"TitleBgActive": ( 59, 66, 82),
"TitleBgCollapsed": ( 30, 34, 42),
"MenuBarBg": ( 46, 52, 64),
"ScrollbarBg": ( 30, 34, 42),
"ScrollbarGrab": ( 76, 86, 106),
"ScrollbarGrabHovered": ( 94, 129, 172),
"ScrollbarGrabActive": (129, 161, 193),
"CheckMark": (136, 192, 208),
"SliderGrab": ( 94, 129, 172),
"SliderGrabActive": (129, 161, 193),
"Button": ( 59, 66, 82),
"ButtonHovered": ( 94, 129, 172),
"ButtonActive": (129, 161, 193),
"Header": ( 59, 66, 82),
"HeaderHovered": ( 94, 129, 172),
"HeaderActive": (129, 161, 193),
"Separator": ( 59, 66, 82),
"SeparatorHovered": ( 94, 129, 172),
"SeparatorActive": (136, 192, 208),
"ResizeGrip": ( 59, 66, 82),
"ResizeGripHovered": ( 94, 129, 172),
"ResizeGripActive": (136, 192, 208),
"Tab": ( 46, 52, 64),
"TabHovered": ( 94, 129, 172),
"TabActive": ( 76, 86, 106),
"TabUnfocused": ( 36, 41, 49),
"TabUnfocusedActive": ( 59, 66, 82),
"DockingPreview": ( 94, 129, 172, 180),
"DockingEmptyBg": ( 20, 22, 28),
"Text": (216, 222, 233),
"TextDisabled": (116, 128, 150),
"TextSelectedBg": ( 94, 129, 172, 180),
"TableHeaderBg": ( 59, 66, 82),
"TableBorderStrong": ( 76, 86, 106),
"TableBorderLight": ( 59, 66, 82),
"TableRowBg": ( 0, 0, 0, 0),
"TableRowBgAlt": ( 46, 52, 64, 40),
"NavHighlight": (136, 192, 208),
"ModalWindowDimBg": ( 10, 12, 16, 100),
},
"Monokai": {
"WindowBg": ( 39, 40, 34),
"ChildBg": ( 34, 35, 29),
"PopupBg": ( 39, 40, 34),
"Border": ( 60, 61, 52),
"BorderShadow": ( 0, 0, 0, 0),
"FrameBg": ( 50, 51, 44),
"FrameBgHovered": ( 65, 67, 56),
"FrameBgActive": ( 80, 82, 68),
"TitleBg": ( 39, 40, 34),
"TitleBgActive": ( 73, 72, 62),
"TitleBgCollapsed": ( 30, 31, 26),
"MenuBarBg": ( 50, 51, 44),
"ScrollbarBg": ( 34, 35, 29),
"ScrollbarGrab": ( 80, 80, 72),
"ScrollbarGrabHovered": (102, 217, 39),
"ScrollbarGrabActive": (166, 226, 46),
"CheckMark": (166, 226, 46),
"SliderGrab": (102, 217, 39),
"SliderGrabActive": (166, 226, 46),
"Button": ( 73, 72, 62),
"ButtonHovered": (249, 38, 114),
"ButtonActive": (198, 30, 92),
"Header": ( 73, 72, 62),
"HeaderHovered": (249, 38, 114),
"HeaderActive": (198, 30, 92),
"Separator": ( 60, 61, 52),
"SeparatorHovered": (249, 38, 114),
"SeparatorActive": (166, 226, 46),
"ResizeGrip": ( 73, 72, 62),
"ResizeGripHovered": (249, 38, 114),
"ResizeGripActive": (166, 226, 46),
"Tab": ( 73, 72, 62),
"TabHovered": (249, 38, 114),
"TabActive": (249, 38, 114),
"TabUnfocused": ( 50, 51, 44),
"TabUnfocusedActive": ( 90, 88, 76),
"DockingPreview": (249, 38, 114, 180),
"DockingEmptyBg": ( 20, 20, 18),
"Text": (248, 248, 242),
"TextDisabled": (117, 113, 94),
"TextSelectedBg": (249, 38, 114, 150),
"TableHeaderBg": ( 60, 61, 52),
"TableBorderStrong": ( 73, 72, 62),
"TableBorderLight": ( 55, 56, 48),
"TableRowBg": ( 0, 0, 0, 0),
"TableRowBgAlt": ( 50, 51, 44, 40),
"NavHighlight": (166, 226, 46),
"ModalWindowDimBg": ( 10, 10, 8, 100),
},
}
PALETTE_NAMES: list[str] = list(_PALETTES.keys())
# ------------------------------------------------------------------ colour key -> mvThemeCol_* mapping
# Maps our friendly name -> dpg constant name
_COL_MAP: dict[str, str] = {
"Text": "mvThemeCol_Text",
"TextDisabled": "mvThemeCol_TextDisabled",
"WindowBg": "mvThemeCol_WindowBg",
"ChildBg": "mvThemeCol_ChildBg",
"PopupBg": "mvThemeCol_PopupBg",
"Border": "mvThemeCol_Border",
"BorderShadow": "mvThemeCol_BorderShadow",
"FrameBg": "mvThemeCol_FrameBg",
"FrameBgHovered": "mvThemeCol_FrameBgHovered",
"FrameBgActive": "mvThemeCol_FrameBgActive",
"TitleBg": "mvThemeCol_TitleBg",
"TitleBgActive": "mvThemeCol_TitleBgActive",
"TitleBgCollapsed": "mvThemeCol_TitleBgCollapsed",
"MenuBarBg": "mvThemeCol_MenuBarBg",
"ScrollbarBg": "mvThemeCol_ScrollbarBg",
"ScrollbarGrab": "mvThemeCol_ScrollbarGrab",
"ScrollbarGrabHovered": "mvThemeCol_ScrollbarGrabHovered",
"ScrollbarGrabActive": "mvThemeCol_ScrollbarGrabActive",
"CheckMark": "mvThemeCol_CheckMark",
"SliderGrab": "mvThemeCol_SliderGrab",
"SliderGrabActive": "mvThemeCol_SliderGrabActive",
"Button": "mvThemeCol_Button",
"ButtonHovered": "mvThemeCol_ButtonHovered",
"ButtonActive": "mvThemeCol_ButtonActive",
"Header": "mvThemeCol_Header",
"HeaderHovered": "mvThemeCol_HeaderHovered",
"HeaderActive": "mvThemeCol_HeaderActive",
"Separator": "mvThemeCol_Separator",
"SeparatorHovered": "mvThemeCol_SeparatorHovered",
"SeparatorActive": "mvThemeCol_SeparatorActive",
"ResizeGrip": "mvThemeCol_ResizeGrip",
"ResizeGripHovered": "mvThemeCol_ResizeGripHovered",
"ResizeGripActive": "mvThemeCol_ResizeGripActive",
"Tab": "mvThemeCol_Tab",
"TabHovered": "mvThemeCol_TabHovered",
"TabActive": "mvThemeCol_TabActive",
"TabUnfocused": "mvThemeCol_TabUnfocused",
"TabUnfocusedActive": "mvThemeCol_TabUnfocusedActive",
"DockingPreview": "mvThemeCol_DockingPreview",
"DockingEmptyBg": "mvThemeCol_DockingEmptyBg",
"TextSelectedBg": "mvThemeCol_TextSelectedBg",
"TableHeaderBg": "mvThemeCol_TableHeaderBg",
"TableBorderStrong": "mvThemeCol_TableBorderStrong",
"TableBorderLight": "mvThemeCol_TableBorderLight",
"TableRowBg": "mvThemeCol_TableRowBg",
"TableRowBgAlt": "mvThemeCol_TableRowBgAlt",
"NavHighlight": "mvThemeCol_NavHighlight",
"NavWindowingHighlight": "mvThemeCol_NavWindowingHighlight",
"NavWindowingDimBg": "mvThemeCol_NavWindowingDimBg",
"ModalWindowDimBg": "mvThemeCol_ModalWindowDimBg",
}
# ------------------------------------------------------------------ state
_current_theme_tag: str | None = None
_current_font_tag: str | None = None
_font_registry_tag: str | None = None
_current_palette: str = "DPG Default"
_current_font_path: str = ""
_current_font_size: float = 14.0
_current_scale: float = 1.0
# ------------------------------------------------------------------ public API
def get_palette_names() -> list[str]:
return list(_PALETTES.keys())
def get_current_palette() -> str:
return _current_palette
def get_current_font_path() -> str:
return _current_font_path
def get_current_font_size() -> float:
return _current_font_size
def get_current_scale() -> float:
return _current_scale
def get_palette_colours(name: str) -> dict[str, Any]:
"""Return a copy of the colour dict for the named palette."""
return dict(_PALETTES.get(name, {}))
def apply(palette_name: str, overrides: dict[str, Any] | None = None) -> None:
"""
Build a global DPG theme from the named palette plus optional per-colour
overrides, and bind it as the default theme.
overrides: {colour_key: (R,G,B) or (R,G,B,A)} — merged on top of palette.
"""
global _current_theme_tag, _current_palette
_current_palette = palette_name
colours = dict(_PALETTES.get(palette_name, {}))
if overrides:
colours.update(overrides)
# Delete the old theme if one exists
if _current_theme_tag is not None:
try:
dpg.delete_item(_current_theme_tag)
except Exception:
pass
_current_theme_tag = None
if palette_name == "DPG Default" and not overrides:
# Bind an empty theme to reset to DPG defaults
with dpg.theme() as t:
with dpg.theme_component(dpg.mvAll):
pass
dpg.bind_theme(t)
_current_theme_tag = t
return
with dpg.theme() as t:
with dpg.theme_component(dpg.mvAll):
for name, colour in colours.items():
const_name = _COL_MAP.get(name)
if const_name is None:
continue
const = getattr(dpg, const_name, None)
if const is None:
continue
# Ensure 4-tuple
if len(colour) == 3:
colour = (*colour, 255)
dpg.add_theme_color(const, colour)
dpg.bind_theme(t)
_current_theme_tag = t
def apply_font(font_path: str, size: float = 14.0) -> None:
"""
Load the TTF at font_path at the given point size and bind it globally.
Safe to call multiple times. Uses a single persistent font_registry; only
the font *item* tag is tracked. Passing an empty path or a missing file
resets to the DPG built-in font.
"""
global _current_font_tag, _current_font_path, _current_font_size, _font_registry_tag
_current_font_path = font_path
_current_font_size = size
if not font_path or not Path(font_path).exists():
# Reset to default built-in font
dpg.bind_font(0)
_current_font_tag = None
return
# Create the registry once
if _font_registry_tag is None or not dpg.does_item_exist(_font_registry_tag):
with dpg.font_registry() as reg:
_font_registry_tag = reg
# Delete previous custom font item only (not the registry)
if _current_font_tag is not None:
try:
dpg.delete_item(_current_font_tag)
except Exception:
pass
_current_font_tag = None
font = dpg.add_font(font_path, size, parent=_font_registry_tag)
_current_font_tag = font
dpg.bind_font(font)
def set_scale(factor: float) -> None:
"""Set the global Dear PyGui font/UI scale factor."""
global _current_scale
_current_scale = factor
dpg.set_global_font_scale(factor)
def save_to_config(config: dict[str, Any]) -> None:
"""Persist theme settings into the config dict under [theme]."""
config.setdefault("theme", {})
config["theme"]["palette"] = _current_palette
config["theme"]["font_path"] = _current_font_path
config["theme"]["font_size"] = _current_font_size
config["theme"]["scale"] = _current_scale
def load_from_config(config: dict[str, Any]) -> None:
"""Read [theme] from config and apply everything."""
t = config.get("theme", {})
palette = t.get("palette", "DPG Default")
font_path = t.get("font_path", "")
font_size = float(t.get("font_size", 14.0))
scale = float(t.get("scale", 1.0))
apply(palette)
if font_path:
apply_font(font_path, font_size)
set_scale(scale)

257
src/theme_2.py Normal file
View File

@@ -0,0 +1,257 @@
# theme_2.py
"""
Theming support for manual_slop GUI — imgui-bundle port.
Replaces theme.py (DearPyGui-specific) with imgui-bundle equivalents.
Palettes are applied via imgui.get_style().set_color_() calls.
Font loading uses hello_imgui.load_font().
Scale uses imgui.get_style().font_scale_main.
"""
from imgui_bundle import imgui
# ------------------------------------------------------------------ palettes
# Each palette maps imgui color enum values to (R, G, B, A) floats [0..1].
# Only keys that differ from the ImGui dark defaults need to be listed.
def _c(r: int, g: int, b: int, a: int = 255) -> tuple[float, float, float, float]:
"""Convert 0-255 RGBA to 0.0-1.0 floats."""
return (r / 255.0, g / 255.0, b / 255.0, a / 255.0)
_PALETTES: dict[str, dict[int, tuple]] = {
"ImGui Dark": {}, # empty = use imgui dark defaults
"10x Dark": {
imgui.Col_.window_bg: _c( 34, 32, 28),
imgui.Col_.child_bg: _c( 30, 28, 24),
imgui.Col_.popup_bg: _c( 35, 30, 20),
imgui.Col_.border: _c( 60, 55, 50),
imgui.Col_.border_shadow: _c( 0, 0, 0, 0),
imgui.Col_.frame_bg: _c( 45, 42, 38),
imgui.Col_.frame_bg_hovered: _c( 60, 56, 50),
imgui.Col_.frame_bg_active: _c( 75, 70, 62),
imgui.Col_.title_bg: _c( 40, 35, 25),
imgui.Col_.title_bg_active: _c( 60, 45, 15),
imgui.Col_.title_bg_collapsed: _c( 30, 27, 20),
imgui.Col_.menu_bar_bg: _c( 35, 30, 20),
imgui.Col_.scrollbar_bg: _c( 30, 28, 24),
imgui.Col_.scrollbar_grab: _c( 80, 78, 72),
imgui.Col_.scrollbar_grab_hovered: _c(100, 100, 92),
imgui.Col_.scrollbar_grab_active: _c(120, 118, 110),
imgui.Col_.check_mark: _c(194, 164, 74),
imgui.Col_.slider_grab: _c(126, 78, 14),
imgui.Col_.slider_grab_active: _c(194, 140, 30),
imgui.Col_.button: _c( 83, 76, 60),
imgui.Col_.button_hovered: _c(126, 78, 14),
imgui.Col_.button_active: _c(115, 90, 70),
imgui.Col_.header: _c( 83, 76, 60),
imgui.Col_.header_hovered: _c(126, 78, 14),
imgui.Col_.header_active: _c(115, 90, 70),
imgui.Col_.separator: _c( 70, 65, 55),
imgui.Col_.separator_hovered: _c(126, 78, 14),
imgui.Col_.separator_active: _c(194, 164, 74),
imgui.Col_.resize_grip: _c( 60, 55, 44),
imgui.Col_.resize_grip_hovered: _c(126, 78, 14),
imgui.Col_.resize_grip_active: _c(194, 164, 74),
imgui.Col_.tab: _c( 83, 83, 70),
imgui.Col_.tab_hovered: _c(126, 77, 25),
imgui.Col_.tab_selected: _c(126, 77, 25),
imgui.Col_.tab_dimmed: _c( 60, 58, 50),
imgui.Col_.tab_dimmed_selected: _c( 90, 80, 55),
imgui.Col_.docking_preview: _c(126, 78, 14, 180),
imgui.Col_.docking_empty_bg: _c( 20, 20, 20),
imgui.Col_.text: _c(200, 200, 200),
imgui.Col_.text_disabled: _c(130, 130, 120),
imgui.Col_.text_selected_bg: _c( 59, 86, 142, 180),
imgui.Col_.table_header_bg: _c( 55, 50, 38),
imgui.Col_.table_border_strong: _c( 70, 65, 55),
imgui.Col_.table_border_light: _c( 50, 47, 42),
imgui.Col_.table_row_bg: _c( 0, 0, 0, 0),
imgui.Col_.table_row_bg_alt: _c( 40, 38, 34, 40),
imgui.Col_.nav_cursor: _c(126, 78, 14),
imgui.Col_.nav_windowing_highlight: _c(194, 164, 74, 180),
imgui.Col_.nav_windowing_dim_bg: _c( 20, 20, 20, 80),
imgui.Col_.modal_window_dim_bg: _c( 10, 10, 10, 100),
},
"Nord Dark": {
imgui.Col_.window_bg: _c( 36, 41, 49),
imgui.Col_.child_bg: _c( 30, 34, 42),
imgui.Col_.popup_bg: _c( 36, 41, 49),
imgui.Col_.border: _c( 59, 66, 82),
imgui.Col_.border_shadow: _c( 0, 0, 0, 0),
imgui.Col_.frame_bg: _c( 46, 52, 64),
imgui.Col_.frame_bg_hovered: _c( 59, 66, 82),
imgui.Col_.frame_bg_active: _c( 67, 76, 94),
imgui.Col_.title_bg: _c( 36, 41, 49),
imgui.Col_.title_bg_active: _c( 59, 66, 82),
imgui.Col_.title_bg_collapsed: _c( 30, 34, 42),
imgui.Col_.menu_bar_bg: _c( 46, 52, 64),
imgui.Col_.scrollbar_bg: _c( 30, 34, 42),
imgui.Col_.scrollbar_grab: _c( 76, 86, 106),
imgui.Col_.scrollbar_grab_hovered: _c( 94, 129, 172),
imgui.Col_.scrollbar_grab_active: _c(129, 161, 193),
imgui.Col_.check_mark: _c(136, 192, 208),
imgui.Col_.slider_grab: _c( 94, 129, 172),
imgui.Col_.slider_grab_active: _c(129, 161, 193),
imgui.Col_.button: _c( 59, 66, 82),
imgui.Col_.button_hovered: _c( 94, 129, 172),
imgui.Col_.button_active: _c(129, 161, 193),
imgui.Col_.header: _c( 59, 66, 82),
imgui.Col_.header_hovered: _c( 94, 129, 172),
imgui.Col_.header_active: _c(129, 161, 193),
imgui.Col_.separator: _c( 59, 66, 82),
imgui.Col_.separator_hovered: _c( 94, 129, 172),
imgui.Col_.separator_active: _c(136, 192, 208),
imgui.Col_.resize_grip: _c( 59, 66, 82),
imgui.Col_.resize_grip_hovered: _c( 94, 129, 172),
imgui.Col_.resize_grip_active: _c(136, 192, 208),
imgui.Col_.tab: _c( 46, 52, 64),
imgui.Col_.tab_hovered: _c( 94, 129, 172),
imgui.Col_.tab_selected: _c( 76, 86, 106),
imgui.Col_.tab_dimmed: _c( 36, 41, 49),
imgui.Col_.tab_dimmed_selected: _c( 59, 66, 82),
imgui.Col_.docking_preview: _c( 94, 129, 172, 180),
imgui.Col_.docking_empty_bg: _c( 20, 22, 28),
imgui.Col_.text: _c(216, 222, 233),
imgui.Col_.text_disabled: _c(116, 128, 150),
imgui.Col_.text_selected_bg: _c( 94, 129, 172, 180),
imgui.Col_.table_header_bg: _c( 59, 66, 82),
imgui.Col_.table_border_strong: _c( 76, 86, 106),
imgui.Col_.table_border_light: _c( 59, 66, 82),
imgui.Col_.table_row_bg: _c( 0, 0, 0, 0),
imgui.Col_.table_row_bg_alt: _c( 46, 52, 64, 40),
imgui.Col_.nav_cursor: _c(136, 192, 208),
imgui.Col_.modal_window_dim_bg: _c( 10, 12, 16, 100),
},
"Monokai": {
imgui.Col_.window_bg: _c( 39, 40, 34),
imgui.Col_.child_bg: _c( 34, 35, 29),
imgui.Col_.popup_bg: _c( 39, 40, 34),
imgui.Col_.border: _c( 60, 61, 52),
imgui.Col_.border_shadow: _c( 0, 0, 0, 0),
imgui.Col_.frame_bg: _c( 50, 51, 44),
imgui.Col_.frame_bg_hovered: _c( 65, 67, 56),
imgui.Col_.frame_bg_active: _c( 80, 82, 68),
imgui.Col_.title_bg: _c( 39, 40, 34),
imgui.Col_.title_bg_active: _c( 73, 72, 62),
imgui.Col_.title_bg_collapsed: _c( 30, 31, 26),
imgui.Col_.menu_bar_bg: _c( 50, 51, 44),
imgui.Col_.scrollbar_bg: _c( 34, 35, 29),
imgui.Col_.scrollbar_grab: _c( 80, 80, 72),
imgui.Col_.scrollbar_grab_hovered: _c(102, 217, 39),
imgui.Col_.scrollbar_grab_active: _c(166, 226, 46),
imgui.Col_.check_mark: _c(166, 226, 46),
imgui.Col_.slider_grab: _c(102, 217, 39),
imgui.Col_.slider_grab_active: _c(166, 226, 46),
imgui.Col_.button: _c( 73, 72, 62),
imgui.Col_.button_hovered: _c(249, 38, 114),
imgui.Col_.button_active: _c(198, 30, 92),
imgui.Col_.header: _c( 73, 72, 62),
imgui.Col_.header_hovered: _c(249, 38, 114),
imgui.Col_.header_active: _c(198, 30, 92),
imgui.Col_.separator: _c( 60, 61, 52),
imgui.Col_.separator_hovered: _c(249, 38, 114),
imgui.Col_.separator_active: _c(166, 226, 46),
imgui.Col_.resize_grip: _c( 73, 72, 62),
imgui.Col_.resize_grip_hovered: _c(249, 38, 114),
imgui.Col_.resize_grip_active: _c(166, 226, 46),
imgui.Col_.tab: _c( 73, 72, 62),
imgui.Col_.tab_hovered: _c(249, 38, 114),
imgui.Col_.tab_selected: _c(249, 38, 114),
imgui.Col_.tab_dimmed: _c( 50, 51, 44),
imgui.Col_.tab_dimmed_selected: _c( 90, 88, 76),
imgui.Col_.docking_preview: _c(249, 38, 114, 180),
imgui.Col_.docking_empty_bg: _c( 20, 20, 18),
imgui.Col_.text: _c(248, 248, 242),
imgui.Col_.text_disabled: _c(117, 113, 94),
imgui.Col_.text_selected_bg: _c(249, 38, 114, 150),
imgui.Col_.table_header_bg: _c( 60, 61, 52),
imgui.Col_.table_border_strong: _c( 73, 72, 62),
imgui.Col_.table_border_light: _c( 55, 56, 48),
imgui.Col_.table_row_bg: _c( 0, 0, 0, 0),
imgui.Col_.table_row_bg_alt: _c( 50, 51, 44, 40),
imgui.Col_.nav_cursor: _c(166, 226, 46),
imgui.Col_.modal_window_dim_bg: _c( 10, 10, 8, 100),
},
}
PALETTE_NAMES: list[str] = list(_PALETTES.keys())
# ------------------------------------------------------------------ state
_current_palette: str = "ImGui Dark"
_current_font_path: str = ""
_current_font_size: float = 16.0
_current_scale: float = 1.0
_custom_font: imgui.ImFont = None # type: ignore
# ------------------------------------------------------------------ public API
def get_palette_names() -> list[str]:
return list(_PALETTES.keys())
def get_current_palette() -> str:
return _current_palette
def get_current_font_path() -> str:
return _current_font_path
def get_current_font_size() -> float:
return _current_font_size
def get_current_scale() -> float:
return _current_scale
def apply(palette_name: str) -> None:
"""
Apply a named palette by setting all ImGui style colors.
Call this once per frame if you want dynamic switching, or once at startup.
In practice we call it once when the user picks a palette, and imgui retains the style.
"""
global _current_palette
_current_palette = palette_name
colours = _PALETTES.get(palette_name, {})
if not colours:
# Reset to imgui dark defaults
imgui.style_colors_dark()
return
style = imgui.get_style()
# Start from dark defaults so unlisted keys have sensible values
imgui.style_colors_dark()
for col_enum, rgba in colours.items():
style.set_color_(col_enum, imgui.ImVec4(*rgba))
def set_scale(factor: float) -> None:
"""Set the global font/UI scale factor."""
global _current_scale
_current_scale = factor
style = imgui.get_style()
style.font_scale_main = factor
def save_to_config(config: dict) -> None:
"""Persist theme settings into the config dict under [theme]."""
config.setdefault("theme", {})
config["theme"]["palette"] = _current_palette
config["theme"]["font_path"] = _current_font_path
config["theme"]["font_size"] = _current_font_size
config["theme"]["scale"] = _current_scale
def load_from_config(config: dict) -> None:
"""Read [theme] from config and apply palette + scale. Font is handled separately at startup."""
global _current_font_path, _current_font_size, _current_scale, _current_palette
t = config.get("theme", {})
_current_palette = t.get("palette", "ImGui Dark")
_current_font_path = t.get("font_path", "")
_current_font_size = float(t.get("font_size", 16.0))
_current_scale = float(t.get("scale", 1.0))
# Don't apply here — imgui context may not exist yet.
# Call apply_current() after imgui is initialised.
def apply_current() -> None:
"""Apply the loaded palette and scale. Call after imgui context exists."""
apply(_current_palette)
set_scale(_current_scale)
def get_font_loading_params() -> tuple[str, float]:
"""Return (font_path, font_size) for use during hello_imgui font loading callback."""
return _current_font_path, _current_font_size