From 23c1e2166110cef55255cedd65e4cd965a65d8fe Mon Sep 17 00:00:00 2001 From: Ed_ Date: Wed, 6 May 2026 14:30:22 -0400 Subject: [PATCH] feat(perf): Expand instrumentation with context manager and extended metrics --- src/aggregate.py | 191 +++++++++++++++--------------- src/dag_engine.py | 87 +++++++------- src/gui_2.py | 14 ++- src/performance_monitor.py | 30 +++++ tests/test_performance_monitor.py | 54 +++++++++ 5 files changed, 240 insertions(+), 136 deletions(-) diff --git a/src/aggregate.py b/src/aggregate.py index ac78d0b..46f3f82 100644 --- a/src/aggregate.py +++ b/src/aggregate.py @@ -21,6 +21,7 @@ from src import summarize from src import project_manager from src import beads_client from src.file_cache import ASTParser +from src.performance_monitor import get_monitor def find_next_increment(output_dir: Path, namespace: str) -> int: pattern = re.compile(rf"^{re.escape(namespace)}_(\d+)\.md$") @@ -132,52 +133,54 @@ def build_file_items(base_dir: Path, files: list[str | dict[str, Any]]) -> list[ auto_aggregate : bool force_full : bool """ - items: list[dict[str, Any]] = [] - for entry_raw in files: - if isinstance(entry_raw, dict): - entry = cast(str, entry_raw.get("path", "")) - tier = entry_raw.get("tier") - auto_aggregate = entry_raw.get("auto_aggregate", True) - force_full = entry_raw.get("force_full", False) - elif hasattr(entry_raw, "path"): - entry = entry_raw.path - tier = getattr(entry_raw, "tier", None) - auto_aggregate = getattr(entry_raw, "auto_aggregate", True) - force_full = getattr(entry_raw, "force_full", False) - else: - entry = entry_raw - tier = None - auto_aggregate = True - force_full = False - if not entry or not isinstance(entry, str): - continue - paths = resolve_paths(base_dir, entry) - if not paths: - items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full}) - continue - for path in paths: - try: - content = path.read_text(encoding="utf-8") - mtime = path.stat().st_mtime - error = False - except FileNotFoundError: - content = f"ERROR: file not found: {path}" - mtime = 0.0 - error = True - except Exception as e: - content = f"ERROR: {e}" - mtime = 0.0 - error = True - items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full}) - return items + with get_monitor().scope("build_file_items"): + items: list[dict[str, Any]] = [] + for entry_raw in files: + if isinstance(entry_raw, dict): + entry = cast(str, entry_raw.get("path", "")) + tier = entry_raw.get("tier") + auto_aggregate = entry_raw.get("auto_aggregate", True) + force_full = entry_raw.get("force_full", False) + elif hasattr(entry_raw, "path"): + entry = entry_raw.path + tier = getattr(entry_raw, "tier", None) + auto_aggregate = getattr(entry_raw, "auto_aggregate", True) + force_full = getattr(entry_raw, "force_full", False) + else: + entry = entry_raw + tier = None + auto_aggregate = True + force_full = False + if not entry or not isinstance(entry, str): + continue + paths = resolve_paths(base_dir, entry) + if not paths: + items.append({"path": None, "entry": entry, "content": f"ERROR: no files matched: {entry}", "error": True, "mtime": 0.0, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full}) + continue + for path in paths: + try: + content = path.read_text(encoding="utf-8") + mtime = path.stat().st_mtime + error = False + except FileNotFoundError: + content = f"ERROR: file not found: {path}" + mtime = 0.0 + error = True + except Exception as e: + content = f"ERROR: {e}" + mtime = 0.0 + error = True + items.append({"path": path, "entry": entry, "content": content, "error": error, "mtime": mtime, "tier": tier, "auto_aggregate": auto_aggregate, "force_full": force_full}) + return items def build_summary_section(base_dir: Path, files: list[str | dict[str, Any]]) -> str: """ Build a compact summary section using summarize.py — one short block per file. Used as the initial block instead of full file contents. """ - items = build_file_items(base_dir, files) - return summarize.build_summary_markdown(items) + with get_monitor().scope("build_summary_section"): + items = build_file_items(base_dir, files) + return summarize.build_summary_markdown(items) def _build_files_section_from_items(file_items: list[dict[str, Any]]) -> str: """Build the files markdown section from pre-read file items (avoids double I/O).""" @@ -297,62 +300,64 @@ def build_tier3_context(file_items: list[dict[str, Any]], screenshot_base_dir: P Tier 3 Context: Execution/Worker. Full content for focus_files and files with tier=3, summaries/skeletons for others. """ - parts = [] - if file_items: - sections = [] - for item in file_items: - if not item.get("auto_aggregate", True): - continue - path = cast(Path, item.get("path")) - entry = cast(str, item.get("entry", "")) - path_str = str(path) if path else "" - # Check if this file is in focus_files (by name or path) - is_focus = False - for focus in focus_files: - if focus == entry or (path and focus == path.name) or (path_str and focus in path_str): - is_focus = True - break - if is_focus or item.get("tier") == 3 or item.get("force_full"): - sections.append("### `" + (entry or path_str) + "`\n\n" + - f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```") - else: - content = cast(str, item.get("content", "")) - if path and path.suffix == ".py" and not item.get("error"): - try: - parser = ASTParser("python") - skeleton = parser.get_skeleton(content) - sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```") - except Exception: - # Fallback to summary if AST parsing fails - sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content)) + with get_monitor().scope("build_tier3_context"): + parts = [] + if file_items: + sections = [] + for item in file_items: + if not item.get("auto_aggregate", True): + continue + path = cast(Path, item.get("path")) + entry = cast(str, item.get("entry", "")) + path_str = str(path) if path else "" + # Check if this file is in focus_files (by name or path) + is_focus = False + for focus in focus_files: + if focus == entry or (path and focus == path.name) or (path_str and focus in path_str): + is_focus = True + break + if is_focus or item.get("tier") == 3 or item.get("force_full"): + sections.append("### `" + (entry or path_str) + "`\n\n" + + f"```{path.suffix.lstrip('.') if path and path.suffix else 'text'}\n{item.get('content', '')}\n```") else: - if path: - sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content)) - parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections)) - if screenshots: - parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) - if history: - parts.append("## Discussion History\n\n" + build_discussion_section(history)) - return "\n\n---\n\n".join(parts) + content = cast(str, item.get("content", "")) + if path and path.suffix == ".py" and not item.get("error"): + try: + parser = ASTParser("python") + skeleton = parser.get_skeleton(content) + sections.append(f"### `{entry or path_str}` (AST Skeleton)\n\n```python\n{skeleton}\n```") + except Exception: + # Fallback to summary if AST parsing fails + sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content)) + else: + if path: + sections.append(f"### `{entry or path_str}`\n\n" + summarize.summarise_file(path, content)) + parts.append("## Files (Tier 3 - Focused)\n\n" + "\n\n---\n\n".join(sections)) + if screenshots: + parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) + if history: + parts.append("## Discussion History\n\n" + build_discussion_section(history)) + return "\n\n---\n\n".join(parts) def build_markdown(base_dir: Path, files: list[str | dict[str, Any]], screenshot_base_dir: Path, screenshots: list[str], history: list[str], summary_only: bool = False, execution_mode: str = "standard") -> str: - parts = [] - # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits - if files: - if summary_only: - parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files)) - else: - parts.append("## Files\n\n" + build_files_section(base_dir, files)) - if screenshots: - parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) - if execution_mode == "beads": - beads_md = build_beads_section(base_dir) - if beads_md: - parts.append(beads_md) - # DYNAMIC SUFFIX: History changes every turn, must go last - if history: - parts.append("## Discussion History\n\n" + build_discussion_section(history)) - return "\n\n---\n\n".join(parts) + with get_monitor().scope("build_markdown"): + parts = [] + # STATIC PREFIX: Files and Screenshots must go first to maximize Cache Hits + if files: + if summary_only: + parts.append("## Files (Summary)\n\n" + build_summary_section(base_dir, files)) + else: + parts.append("## Files\n\n" + build_files_section(base_dir, files)) + if screenshots: + parts.append("## Screenshots\n\n" + build_screenshots_section(screenshot_base_dir, screenshots)) + if execution_mode == "beads": + beads_md = build_beads_section(base_dir) + if beads_md: + parts.append(beads_md) + # DYNAMIC SUFFIX: History changes every turn, must go last + if history: + parts.append("## Discussion History\n\n" + build_discussion_section(history)) + return "\n\n---\n\n".join(parts) def run(config: dict[str, Any], aggregation_strategy: str = "auto") -> tuple[str, Path, list[dict[str, Any]]]: namespace = config.get("project", {}).get("name") diff --git a/src/dag_engine.py b/src/dag_engine.py index 04e52a9..0116761 100644 --- a/src/dag_engine.py +++ b/src/dag_engine.py @@ -28,6 +28,7 @@ See Also: """ from typing import List from src.models import Ticket +from src.performance_monitor import get_monitor class TrackDAG: """ @@ -87,29 +88,30 @@ class TrackDAG: Returns: True if a cycle is detected, False otherwise. """ - visited = set() - rec_stack = set() + with get_monitor().scope("dag_has_cycle"): + visited = set() + rec_stack = set() - def is_cyclic(ticket_id: str) -> bool: - """Internal recursive helper for cycle detection.""" - if ticket_id in rec_stack: - return True - if ticket_id in visited: - return False - visited.add(ticket_id) - rec_stack.add(ticket_id) - ticket = self.ticket_map.get(ticket_id) - if ticket: - for neighbor in ticket.depends_on: - if is_cyclic(neighbor): - return True - rec_stack.remove(ticket_id) - return False - for ticket in self.tickets: - if ticket.id not in visited: - if is_cyclic(ticket.id): + def is_cyclic(ticket_id: str) -> bool: + """Internal recursive helper for cycle detection.""" + if ticket_id in rec_stack: return True - return False + if ticket_id in visited: + return False + visited.add(ticket_id) + rec_stack.add(ticket_id) + ticket = self.ticket_map.get(ticket_id) + if ticket: + for neighbor in ticket.depends_on: + if is_cyclic(neighbor): + return True + rec_stack.remove(ticket_id) + return False + for ticket in self.tickets: + if ticket.id not in visited: + if is_cyclic(ticket.id): + return True + return False def topological_sort(self) -> List[str]: """ @@ -119,24 +121,25 @@ class TrackDAG: Raises: ValueError: If a dependency cycle is detected. """ - if self.has_cycle(): - raise ValueError("Dependency cycle detected") - visited = set() - stack = [] + with get_monitor().scope("dag_topological_sort"): + if self.has_cycle(): + raise ValueError("Dependency cycle detected") + visited = set() + stack = [] - def visit(ticket_id: str) -> None: - """Internal recursive helper for topological sorting.""" - if ticket_id in visited: - return - visited.add(ticket_id) - ticket = self.ticket_map.get(ticket_id) - if ticket: - for dep_id in ticket.depends_on: - visit(dep_id) - stack.append(ticket_id) - for ticket in self.tickets: - visit(ticket.id) - return stack + def visit(ticket_id: str) -> None: + """Internal recursive helper for topological sorting.""" + if ticket_id in visited: + return + visited.add(ticket_id) + ticket = self.ticket_map.get(ticket_id) + if ticket: + for dep_id in ticket.depends_on: + visit(dep_id) + stack.append(ticket_id) + for ticket in self.tickets: + visit(ticket.id) + return stack class ExecutionEngine: """ @@ -161,9 +164,10 @@ class ExecutionEngine: Returns: A list of ready Ticket objects. """ - self.dag.cascade_blocks() - ready = self.dag.get_ready_tasks() - return ready + with get_monitor().scope("dag_tick"): + self.dag.cascade_blocks() + ready = self.dag.get_ready_tasks() + return ready def approve_task(self, task_id: str) -> None: """ @@ -185,4 +189,3 @@ class ExecutionEngine: ticket = self.dag.ticket_map.get(task_id) if ticket: ticket.status = status - diff --git a/src/gui_2.py b/src/gui_2.py index 571aed7..3ef35ff 100644 --- a/src/gui_2.py +++ b/src/gui_2.py @@ -2348,15 +2348,21 @@ class App: if self.perf_profiling_enabled: imgui.separator() imgui.text("Detailed Component Timings (Moving Average)") - if imgui.begin_table("comp_timings", 3, imgui.TableFlags_.borders): + if imgui.begin_table("comp_timings", 6, imgui.TableFlags_.borders): imgui.table_setup_column("Component") imgui.table_setup_column("Avg (ms)") + imgui.table_setup_column("Count") + imgui.table_setup_column("Max (ms)") + imgui.table_setup_column("Min (ms)") imgui.table_setup_column("Graph") imgui.table_headers_row() for key, val in metrics.items(): if key.startswith("time_") and key.endswith("_ms") and not key.endswith("_avg"): comp_name = key[5:-3] avg_val = metrics.get(f"{key}_avg", val) + count = int(metrics.get(f"count_{comp_name}", 0)) + max_val = metrics.get(f"max_{comp_name}_ms", 0.0) + min_val = metrics.get(f"min_{comp_name}_ms", 0.0) imgui.table_next_row() imgui.table_next_column() imgui.text(comp_name) @@ -2366,6 +2372,12 @@ class App: else: imgui.text(f"{avg_val:.2f}") imgui.table_next_column() + imgui.text(f"{count}") + imgui.table_next_column() + imgui.text(f"{max_val:.2f}") + imgui.table_next_column() + imgui.text(f"{min_val:.2f}") + imgui.table_next_column() self.perf_show_graphs.setdefault(comp_name, False) _, self.perf_show_graphs[comp_name] = imgui.checkbox(f"##g_{comp_name}", self.perf_show_graphs[comp_name]) imgui.end_table() diff --git a/src/performance_monitor.py b/src/performance_monitor.py index c705434..587eca8 100644 --- a/src/performance_monitor.py +++ b/src/performance_monitor.py @@ -62,6 +62,18 @@ from collections import deque _instance: Optional[PerformanceMonitor] = None +class PerformanceScope: + """Helper class for PerformanceMonitor.scope() context manager.""" + def __init__(self, monitor: PerformanceMonitor, name: str) -> None: + self.monitor = monitor + self.name = name + def __enter__(self) -> PerformanceScope: + self.monitor.start_component(self.name) + return self + def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: + self.monitor.end_component(self.name) + + def get_monitor() -> PerformanceMonitor: global _instance if _instance is None: @@ -90,6 +102,9 @@ class PerformanceMonitor: self._component_starts: dict[str, float] = {} self._component_timings: dict[str, float] = {} + self._component_counts: dict[str, int] = {} + self._component_max: dict[str, float] = {} + self._component_min: dict[str, float] = {} # Rolling history and running sums for O(1) average calculation # deques are thread-safe for appends and pops. @@ -192,6 +207,11 @@ class PerformanceMonitor: elapsed = (now - start) * 1000 with self._lock: self._component_timings[name] = elapsed + self._component_counts[name] = self._component_counts.get(name, 0) + 1 + if name not in self._component_max or elapsed > self._component_max[name]: + self._component_max[name] = elapsed + if name not in self._component_min or elapsed < self._component_min[name]: + self._component_min[name] = elapsed self._add_to_history(f'comp_{name}', elapsed) def get_metrics(self) -> dict[str, float]: @@ -203,6 +223,9 @@ class PerformanceMonitor: ilag = self._input_lag_ms last_calc_fps = self._last_calculated_fps timings_snapshot = dict(self._component_timings) + counts_snapshot = dict(self._component_counts) + max_snapshot = dict(self._component_max) + min_snapshot = dict(self._component_min) metrics = { 'fps': fps, @@ -217,6 +240,9 @@ class PerformanceMonitor: for name, elapsed in timings_snapshot.items(): metrics[f'time_{name}_ms'] = elapsed metrics[f'time_{name}_ms_avg'] = self._get_avg(f'comp_{name}') + metrics[f'count_{name}'] = float(counts_snapshot.get(name, 0)) + metrics[f'max_{name}_ms'] = max_snapshot.get(name, 0.0) + metrics[f'min_{name}_ms'] = min_snapshot.get(name, 0.0) return metrics def get_history(self, key: str) -> List[float]: @@ -228,6 +254,10 @@ class PerformanceMonitor: return list(self._history[f'comp_{key}']) return [] + def scope(self, name: str) -> PerformanceScope: + """Returns a context manager for timing a component.""" + return PerformanceScope(self, name) + def stop(self) -> None: self._stop_event.set() if self._cpu_thread.is_alive(): diff --git a/tests/test_performance_monitor.py b/tests/test_performance_monitor.py index 6fdf115..26297fc 100644 --- a/tests/test_performance_monitor.py +++ b/tests/test_performance_monitor.py @@ -26,3 +26,57 @@ def test_perf_monitor_component_timing() -> None: metrics = pm.get_metrics() assert metrics['time_test_comp_ms'] >= 10.0 pm.stop() + +def test_perf_monitor_scope_context_manager() -> None: + pm = PerformanceMonitor() + pm.enabled = True + + # Test normal usage + with pm.scope("test_scope"): + time.sleep(0.01) + + metrics = pm.get_metrics() + assert metrics['time_test_scope_ms'] >= 10.0 + + # Test exception handling + try: + with pm.scope("test_error"): + time.sleep(0.01) + raise ValueError("test error") + except ValueError: + pass + + metrics = pm.get_metrics() + # Component should still be finished, so timing should be recorded + assert metrics['time_test_error_ms'] >= 10.0 + + pm.stop() + +def test_perf_monitor_extended_metrics() -> None: + pm = PerformanceMonitor() + pm.enabled = True + + # 1st call: 10ms + pm.start_component("test_comp") + time.sleep(0.01) + pm.end_component("test_comp") + + # 2nd call: 30ms + pm.start_component("test_comp") + time.sleep(0.03) + pm.end_component("test_comp") + + # 3rd call: 20ms + pm.start_component("test_comp") + time.sleep(0.02) + pm.end_component("test_comp") + + metrics = pm.get_metrics() + assert metrics['count_test_comp'] == 3.0 + assert metrics['max_test_comp_ms'] >= 30.0 + assert metrics['min_test_comp_ms'] >= 10.0 + assert metrics['min_test_comp_ms'] < 20.0 + + pm.stop() + +