feat(perf): Expand instrumentation with context manager and extended metrics

This commit is contained in:
2026-05-06 14:30:22 -04:00
parent 022c39888c
commit 23c1e21661
5 changed files with 240 additions and 136 deletions
+98 -93
View File
@@ -21,6 +21,7 @@ from src import summarize
from src import project_manager
from src import beads_client
from src.file_cache import ASTParser
from src.performance_monitor import get_monitor
def find_next_increment(output_dir: Path, namespace: str) -> int:
pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$")
@@ -132,52 +133,54 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[
auto_aggregate : bool
force_full : bool
"""
items: list[dict[str, Any]] = []
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = cast(str, entry_raw.get("path", ""))
tier = entry_raw.get("tier")
auto_aggregate = entry_raw.get("auto_aggregate", True)
force_full = entry_raw.get("force_full", False)
elif hasattr(entry_raw, "path"):
entry = entry_raw.path
tier = getattr(entry_raw, "tier", None)
auto_aggregate = getattr(entry_raw, "auto_aggregate", True)
force_full = getattr(entry_raw, "force_full", False)
else:
entry = entry_raw
tier = None
auto_aggregate = True
force_full = False
if not entry or not isinstance(entry, str):
continue
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full})
return items
with get_monitor().scope("build_file_items"):
items: list[dict[str, Any]] = []
for entry_raw in files:
if isinstance(entry_raw, dict):
entry = cast(str, entry_raw.get("path", ""))
tier = entry_raw.get("tier")
auto_aggregate = entry_raw.get("auto_aggregate", True)
force_full = entry_raw.get("force_full", False)
elif hasattr(entry_raw, "path"):
entry = entry_raw.path
tier = getattr(entry_raw, "tier", None)
auto_aggregate = getattr(entry_raw, "auto_aggregate", True)
force_full = getattr(entry_raw, "force_full", False)
else:
entry = entry_raw
tier = None
auto_aggregate = True
force_full = False
if not entry or not isinstance(entry, str):
continue
paths = resolve_paths(base_dir, entry)
if not paths:
items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full})
continue
for path in paths:
try:
content = path.read_text(encoding="utf-8")
mtime = path.stat().st_mtime
error = False
except FileNotFoundError:
content = f"ERROR: file not found: {path}"
mtime = 0.0
error = True
except Exception as e:
content = f"ERROR: {e}"
mtime = 0.0
error = True
items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full})
return items
def build_summary_section(base_dir: Path, files: list[str | dict[str, Any]]) -> str:
"""
Build a compact summary section using summarize.py — one short block per file.
Used as the initial <context> block instead of full file contents.
"""
items = build_file_items(base_dir, files)
return summarize.build_summary_markdown(items)
with get_monitor().scope("build_summary_section"):
items = build_file_items(base_dir, files)
return summarize.build_summary_markdown(items)
def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str:
"""Build the files markdown section from pre-read file items (avoids double I/O)."""
@@ -297,62 +300,64 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P
Tier 3 Context: Execution/Worker.
Full content for focus_files and files with tier=3, summaries/skeletons for others.
"""
parts = []
if file_items:
sections = []
for item in file_items:
if not item.get("auto_aggregate", True):
continue
path = cast(Path, item.get("path"))
entry = cast(str, item.get("entry", ""))
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or (path_str and focus in path_str):
is_focus = True
break
if is_focus or item.get("tier") == 3 or item.get("force_full"):
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
content = cast(str, item.get("content", ""))
if path and path.suffix == ".py" and not item.get("error"):
try:
parser = ASTParser("python")
skeleton = parser.get_skeleton(content)
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
except Exception:
# Fallback to summary if AST parsing fails
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
with get_monitor().scope("build_tier3_context"):
parts = []
if file_items:
sections = []
for item in file_items:
if not item.get("auto_aggregate", True):
continue
path = cast(Path, item.get("path"))
entry = cast(str, item.get("entry", ""))
path_str = str(path) if path else ""
# Check if this file is in focus_files (by name or path)
is_focus = False
for focus in focus_files:
if focus == entry or (path and focus == path.name) or (path_str and focus in path_str):
is_focus = True
break
if is_focus or item.get("tier") == 3 or item.get("force_full"):
sections.append("### `" + (entry or path_str) + "`\n\n" +
f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```")
else:
if path:
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
content = cast(str, item.get("content", ""))
if path and path.suffix == ".py" and not item.get("error"):
try:
parser = ASTParser("python")
skeleton = parser.get_skeleton(content)
sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```")
except Exception:
# Fallback to summary if AST parsing fails
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
else:
if path:
sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content))
parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str:
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:
if summary_only:
parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files))
else:
parts.append("## Files\n\n" + build_files_section(base_dir, files))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if execution_mode == "beads":
beads_md = build_beads_section(base_dir)
if beads_md:
parts.append(beads_md)
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
with get_monitor().scope("build_markdown"):
parts = []
# STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits
if files:
if summary_only:
parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files))
else:
parts.append("## Files\n\n" + build_files_section(base_dir, files))
if screenshots:
parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots))
if execution_mode == "beads":
beads_md = build_beads_section(base_dir)
if beads_md:
parts.append(beads_md)
# DYNAMIC SUFFIX: History changes every turn, must go last
if history:
parts.append("## Discussion History\n\n" + build_discussion_section(history))
return "\n\n---\n\n".join(parts)
def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str, Path, list[dict[str, Any]]]:
namespace = config.get("project", {}).get("name")
+45 -42
View File
@@ -28,6 +28,7 @@ See Also:
"""
from typing import List
from src.models import Ticket
from src.performance_monitor import get_monitor
class TrackDAG:
"""
@@ -87,29 +88,30 @@ class TrackDAG:
Returns:
True if a cycle is detected, False otherwise.
"""
visited = set()
rec_stack = set()
with get_monitor().scope("dag_has_cycle"):
visited = set()
rec_stack = set()
def is_cyclic(ticket_id: str) -> bool:
"""Internal recursive helper for cycle detection."""
if ticket_id in rec_stack:
return True
if ticket_id in visited:
return False
visited.add(ticket_id)
rec_stack.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for neighbor in ticket.depends_on:
if is_cyclic(neighbor):
return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id):
def is_cyclic(ticket_id: str) -> bool:
"""Internal recursive helper for cycle detection."""
if ticket_id in rec_stack:
return True
return False
if ticket_id in visited:
return False
visited.add(ticket_id)
rec_stack.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for neighbor in ticket.depends_on:
if is_cyclic(neighbor):
return True
rec_stack.remove(ticket_id)
return False
for ticket in self.tickets:
if ticket.id not in visited:
if is_cyclic(ticket.id):
return True
return False
def topological_sort(self) -> List[str]:
"""
@@ -119,24 +121,25 @@ class TrackDAG:
Raises:
ValueError: If a dependency cycle is detected.
"""
if self.has_cycle():
raise ValueError("Dependency cycle detected")
visited = set()
stack = []
with get_monitor().scope("dag_topological_sort"):
if self.has_cycle():
raise ValueError("Dependency cycle detected")
visited = set()
stack = []
def visit(ticket_id: str) -> None:
"""Internal recursive helper for topological sorting."""
if ticket_id in visited:
return
visited.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for dep_id in ticket.depends_on:
visit(dep_id)
stack.append(ticket_id)
for ticket in self.tickets:
visit(ticket.id)
return stack
def visit(ticket_id: str) -> None:
"""Internal recursive helper for topological sorting."""
if ticket_id in visited:
return
visited.add(ticket_id)
ticket = self.ticket_map.get(ticket_id)
if ticket:
for dep_id in ticket.depends_on:
visit(dep_id)
stack.append(ticket_id)
for ticket in self.tickets:
visit(ticket.id)
return stack
class ExecutionEngine:
"""
@@ -161,9 +164,10 @@ class ExecutionEngine:
Returns:
A list of ready Ticket objects.
"""
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
return ready
with get_monitor().scope("dag_tick"):
self.dag.cascade_blocks()
ready = self.dag.get_ready_tasks()
return ready
def approve_task(self, task_id: str) -> None:
"""
@@ -185,4 +189,3 @@ class ExecutionEngine:
ticket = self.dag.ticket_map.get(task_id)
if ticket:
ticket.status = status
+13 -1
View File
@@ -2348,15 +2348,21 @@ class App:
if self.perf_profiling_enabled:
imgui.separator()
imgui.text("Detailed Component Timings (Moving Average)")
if imgui.begin_table("comp_timings", 3, imgui.TableFlags_.borders):
if imgui.begin_table("comp_timings", 6, imgui.TableFlags_.borders):
imgui.table_setup_column("Component")
imgui.table_setup_column("Avg (ms)")
imgui.table_setup_column("Count")
imgui.table_setup_column("Max (ms)")
imgui.table_setup_column("Min (ms)")
imgui.table_setup_column("Graph")
imgui.table_headers_row()
for key, val in metrics.items():
if key.startswith("time_") and key.endswith("_ms") and not key.endswith("_avg"):
comp_name = key[5:-3]
avg_val = metrics.get(f"{key}_avg", val)
count = int(metrics.get(f"count_{comp_name}", 0))
max_val = metrics.get(f"max_{comp_name}_ms", 0.0)
min_val = metrics.get(f"min_{comp_name}_ms", 0.0)
imgui.table_next_row()
imgui.table_next_column()
imgui.text(comp_name)
@@ -2366,6 +2372,12 @@ class App:
else:
imgui.text(f"{avg_val:.2f}")
imgui.table_next_column()
imgui.text(f"{count}")
imgui.table_next_column()
imgui.text(f"{max_val:.2f}")
imgui.table_next_column()
imgui.text(f"{min_val:.2f}")
imgui.table_next_column()
self.perf_show_graphs.setdefault(comp_name, False)
_, self.perf_show_graphs[comp_name] = imgui.checkbox(f"##g_{comp_name}", self.perf_show_graphs[comp_name])
imgui.end_table()
+30
View File
@@ -62,6 +62,18 @@ from collections import deque
_instance: Optional[PerformanceMonitor] = None
class PerformanceScope:
"""Helper class for PerformanceMonitor.scope() context manager."""
def __init__(self, monitor: PerformanceMonitor, name: str) -> None:
self.monitor = monitor
self.name = name
def __enter__(self) -> PerformanceScope:
self.monitor.start_component(self.name)
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
self.monitor.end_component(self.name)
def get_monitor() -> PerformanceMonitor:
global _instance
if _instance is None:
@@ -90,6 +102,9 @@ class PerformanceMonitor:
self._component_starts: dict[str, float] = {}
self._component_timings: dict[str, float] = {}
self._component_counts: dict[str, int] = {}
self._component_max: dict[str, float] = {}
self._component_min: dict[str, float] = {}
# Rolling history and running sums for O(1) average calculation
# deques are thread-safe for appends and pops.
@@ -192,6 +207,11 @@ class PerformanceMonitor:
elapsed = (now - start) * 1000
with self._lock:
self._component_timings[name] = elapsed
self._component_counts[name] = self._component_counts.get(name, 0) + 1
if name not in self._component_max or elapsed > self._component_max[name]:
self._component_max[name] = elapsed
if name not in self._component_min or elapsed < self._component_min[name]:
self._component_min[name] = elapsed
self._add_to_history(f'comp_{name}', elapsed)
def get_metrics(self) -> dict[str, float]:
@@ -203,6 +223,9 @@ class PerformanceMonitor:
ilag = self._input_lag_ms
last_calc_fps = self._last_calculated_fps
timings_snapshot = dict(self._component_timings)
counts_snapshot = dict(self._component_counts)
max_snapshot = dict(self._component_max)
min_snapshot = dict(self._component_min)
metrics = {
'fps': fps,
@@ -217,6 +240,9 @@ class PerformanceMonitor:
for name, elapsed in timings_snapshot.items():
metrics[f'time_{name}_ms'] = elapsed
metrics[f'time_{name}_ms_avg'] = self._get_avg(f'comp_{name}')
metrics[f'count_{name}'] = float(counts_snapshot.get(name, 0))
metrics[f'max_{name}_ms'] = max_snapshot.get(name, 0.0)
metrics[f'min_{name}_ms'] = min_snapshot.get(name, 0.0)
return metrics
def get_history(self, key: str) -> List[float]:
@@ -228,6 +254,10 @@ class PerformanceMonitor:
return list(self._history[f'comp_{key}'])
return []
def scope(self, name: str) -> PerformanceScope:
"""Returns a context manager for timing a component."""
return PerformanceScope(self, name)
def stop(self) -> None:
self._stop_event.set()
if self._cpu_thread.is_alive():