Private
Public Access
0
0

fixing formatting

This commit is contained in:
2026-05-16 02:33:14 -04:00
parent 29244acc74
commit 11c9aab685
4 changed files with 104 additions and 149 deletions
+2 -2
View File
@@ -7,8 +7,8 @@ Instead of sending every file to the AI raw (which blows up tokens), this uses a
1. Resolve paths (handles globs and absolute paths).
2. Build file items (raw content).
3. If 'summary_only' is true (which is the default behavior now), it pipes the files through
summarize.py to generate a compacted view.
summarize.py to generate a compacted view.
This is essential for keeping prompt tokens low while giving the AI enough structural info
to use the MCP tools to fetch only what it needs.
"""
+23 -24
View File
@@ -35,30 +35,29 @@ import re
# Pricing per 1M tokens in USD
MODEL_PRICING = [
(r"gemini-2\.5-flash-lite", {"input_per_mtok": 0.075, "output_per_mtok": 0.30}),
(r"gemini-2\.5-flash", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3-flash-preview", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3\.1-pro-preview", {"input_per_mtok": 3.50, "output_per_mtok": 10.50}),
(r"claude-.*-sonnet", {"input_per_mtok": 3.0, "output_per_mtok": 15.0}),
(r"claude-.*-opus", {"input_per_mtok": 15.0, "output_per_mtok": 75.0}),
(r"deepseek-v3", {"input_per_mtok": 0.27, "output_per_mtok": 1.10}),
(r"gemini-2\.5-flash-lite", {"input_per_mtok": 0.075, "output_per_mtok": 0.30}),
(r"gemini-2\.5-flash", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3-flash-preview", {"input_per_mtok": 0.15, "output_per_mtok": 0.60}),
(r"gemini-3\.1-pro-preview", {"input_per_mtok": 3.50, "output_per_mtok": 10.50}),
(r"claude-.*-sonnet", {"input_per_mtok": 3.0, "output_per_mtok": 15.0}),
(r"claude-.*-opus", {"input_per_mtok": 15.0, "output_per_mtok": 75.0}),
(r"deepseek-v3", {"input_per_mtok": 0.27, "output_per_mtok": 1.10}),
]
def estimate_cost(model: str, input_tokens: int, output_tokens: int) -> float:
"""
Estimate the cost of a model call based on input and output tokens.
Returns the total cost in USD.
[C: src/gui_2.py:App._render_mma_track_summary, src/gui_2.py:App._render_mma_usage_section, src/gui_2.py:App._render_token_budget_panel, tests/test_cost_tracker.py:test_estimate_cost]
"""
if not model:
return 0.0
for pattern, rates in MODEL_PRICING:
if re.search(pattern, model, re.IGNORECASE):
input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"]
output_cost = (output_tokens / 1_000_000) * rates["output_per_mtok"]
return input_cost + output_cost
return 0.0
"""
Estimate the cost of a model call based on input and output tokens.
Returns the total cost in USD.
[C: src/gui_2.py:App._render_mma_track_summary, src/gui_2.py:App._render_mma_usage_section, src/gui_2.py:App._render_token_budget_panel, tests/test_cost_tracker.py:test_estimate_cost]
"""
if not model:
return 0.0
for pattern, rates in MODEL_PRICING:
if re.search(pattern, model, re.IGNORECASE):
input_cost = (input_tokens / 1_000_000) * rates["input_per_mtok"]
output_cost = (output_tokens / 1_000_000) * rates["output_per_mtok"]
return input_cost + output_cost
return 0.0
+45 -67
View File
@@ -32,31 +32,25 @@ from src.performance_monitor import get_monitor
class TrackDAG:
"""
Manages a Directed Acyclic Graph of implementation tickets.
Provides methods for dependency resolution, cycle detection, and topological sorting.
Manages a Directed Acyclic Graph of implementation tickets.
Provides methods for dependency resolution, cycle detection, and topological sorting.
"""
def __init__(self, tickets: List[Ticket]) -> None:
"""
Initializes the TrackDAG with a list of Ticket objects.
Args:
tickets: A list of Ticket instances defining the graph nodes and edges.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
Initializes the TrackDAG with a list of Ticket objects.
Args:
tickets: A list of Ticket instances defining the graph nodes and edges.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets}
def cascade_blocks(self) -> None:
"""
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Propagates 'blocked' status from initially blocked nodes to their dependents.
[C: tests/test_perf_dag.py:test_dag_performance]
Transitively marks `todo` tickets as `blocked` if any dependency is `blocked`.
Propagates 'blocked' status from initially blocked nodes to their dependents.
[C: tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_cascade_blocks"):
# Build adjacency list of dependents using object references to avoid lookups
@@ -90,12 +84,10 @@ class TrackDAG:
def get_ready_tasks(self) -> List[Ticket]:
"""
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
Returns:
A list of Ticket objects ready for execution.
[C: src/models.py:Track.get_executable_tickets, tests/test_dag_engine.py:test_get_ready_tasks_branching, tests/test_dag_engine.py:test_get_ready_tasks_linear, tests/test_dag_engine.py:test_get_ready_tasks_multiple_deps, tests/test_orchestration_logic.py:test_track_executable_tickets]
Returns a list of tickets that are in 'todo' status and whose dependencies are all 'completed'.
Returns:
A list of Ticket objects ready for execution.
[C: src/models.py:Track.get_executable_tickets, tests/test_dag_engine.py:test_get_ready_tasks_branching, tests/test_dag_engine.py:test_get_ready_tasks_linear, tests/test_dag_engine.py:test_get_ready_tasks_multiple_deps, tests/test_orchestration_logic.py:test_track_executable_tickets]
"""
ready = []
for ticket in self.tickets:
@@ -105,12 +97,10 @@ class TrackDAG:
def has_cycle(self) -> bool:
"""
Performs an iterative Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
[C: src/gui_2.py:App._render_task_dag_panel, tests/test_dag_engine.py:test_has_cycle_complex_no_cycle, tests/test_dag_engine.py:test_has_cycle_direct_cycle, tests/test_dag_engine.py:test_has_cycle_indirect_cycle, tests/test_dag_engine.py:test_has_cycle_no_cycle, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
Performs an iterative Depth-First Search to detect cycles in the dependency graph.
Returns:
True if a cycle is detected, False otherwise.
[C: src/gui_2.py:App._render_task_dag_panel, tests/test_dag_engine.py:test_has_cycle_complex_no_cycle, tests/test_dag_engine.py:test_has_cycle_direct_cycle, tests/test_dag_engine.py:test_has_cycle_indirect_cycle, tests/test_dag_engine.py:test_has_cycle_no_cycle, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_has_cycle"):
visited = set()
@@ -139,15 +129,13 @@ class TrackDAG:
def topological_sort(self) -> List[str]:
"""
Returns a list of ticket IDs in topological order (dependencies before dependents).
Uses Kahn's algorithm for efficient O(V+E) sorting and cycle detection.
Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
Returns a list of ticket IDs in topological order (dependencies before dependents).
Uses Kahn's algorithm for efficient O(V+E) sorting and cycle detection.
Returns:
A list of ticket ID strings.
Raises:
ValueError: If a dependency cycle is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
"""
with get_monitor().scope("dag_topological_sort"):
in_degree = {t.id: len(t.depends_on) for t in self.tickets}
@@ -176,34 +164,28 @@ class TrackDAG:
class ExecutionEngine:
"""
A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval.
A state machine that governs the progression of tasks within a TrackDAG.
Handles automatic queueing and manual task approval.
"""
def __init__(self, dag: TrackDAG, auto_queue: bool = False) -> None:
"""
Initializes the ExecutionEngine.
Args:
dag: The TrackDAG instance to manage.
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
Initializes the ExecutionEngine.
Args:
dag: The TrackDAG instance to manage.
auto_queue: If True, ready tasks will automatically move to 'in_progress'.
[C: src/mcp_client.py:_DDGParser.__init__, src/mcp_client.py:_TextExtractor.__init__]
"""
self.dag = dag
self.auto_queue = auto_queue
def tick(self) -> List[Ticket]:
"""
Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution.
If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'.
Returns:
A list of ready Ticket objects.
[C: src/multi_agent_conductor.py:ConductorEngine.run, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_multi_hop, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_simple, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_execution_engine_tick_cascades_blocks, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_in_progress_not_blocked, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_step_mode]
Evaluates the DAG and returns a list of tasks that are currently 'ready' for execution.
If auto_queue is enabled, tasks without 'step_mode' will be marked as 'in_progress'.
Returns:
A list of ready Ticket objects.
[C: src/multi_agent_conductor.py:ConductorEngine.run, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_multi_hop, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_cascade_blocks_simple, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_execution_engine_tick_cascades_blocks, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_in_progress_not_blocked, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_step_mode]
"""
with get_monitor().scope("dag_tick"):
self.dag.cascade_blocks()
@@ -212,12 +194,10 @@ class ExecutionEngine:
def approve_task(self, task_id: str) -> None:
"""
Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met.
Args:
task_id: The ID of the task to approve.
[C: src/multi_agent_conductor.py:ConductorEngine.approve_task, tests/test_execution_engine.py:test_execution_engine_approve_task, tests/test_execution_engine.py:test_execution_engine_step_mode]
Manually transitions a task from 'todo' to 'in_progress' if its dependencies are met.
Args:
task_id: The ID of the task to approve.
[C: src/multi_agent_conductor.py:ConductorEngine.approve_task, tests/test_execution_engine.py:test_execution_engine_approve_task, tests/test_execution_engine.py:test_execution_engine_step_mode]
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket and ticket.status == "todo" and self.dag.is_ticket_ready(ticket):
@@ -225,13 +205,11 @@ class ExecutionEngine:
def update_task_status(self, task_id: str, status: str) -> None:
"""
Force-updates the status of a specific task.
Args:
task_id: The ID of the task.
status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked').
[C: src/multi_agent_conductor.py:ConductorEngine.update_task_status, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_status_persistence, tests/test_execution_engine.py:test_execution_engine_update_nonexistent_task]
Force-updates the status of a specific task.
Args:
task_id: The ID of the task.
status: The new status string (e.g., 'todo', 'in_progress', 'completed', 'blocked').
[C: src/multi_agent_conductor.py:ConductorEngine.update_task_status, tests/test_arch_boundary_phase3.py:TestArchBoundaryPhase3.test_manual_unblock_restores_todo, tests/test_execution_engine.py:test_execution_engine_auto_queue, tests/test_execution_engine.py:test_execution_engine_basic_flow, tests/test_execution_engine.py:test_execution_engine_status_persistence, tests/test_execution_engine.py:test_execution_engine_update_nonexistent_task]
"""
ticket = self.dag.ticket_map.get(task_id)
if ticket:
+34 -56
View File
File diff suppressed because one or more lines are too long