Private
Public Access
0
0

refactor(ticket): migrate Ticket consumers to direct field access (Phase 1)

TIER-2 READ AGENTS.md, conductor/workflow.md, conductor/edit_workflow.md,
conductor/tier2/githooks/forbidden-files.txt,
conductor/tracks/tier2_leak_prevention_20260620/spec.md,
conductor/code_styleguides/data_oriented_design.md,
conductor/code_styleguides/error_handling.md,
conductor/code_styleguides/type_aliases.md before Phase 1.

Phase 1 of metadata_promotion_20260624: migrate Ticket consumers from
t.get('key', default) / t['key'] to direct field access (t.id, t.status, etc.).

Changes:
- self.active_tickets: list[Metadata] -> list[models.Ticket]
- _deserialize_active_track_result populates self.active_tickets as Tickets
- _load_active_tickets (beads branch) constructs Ticket instances
- topological_sort signature: list[dict[str, Any]] -> list[Ticket]
- Migrated ~40 consumer sites in src/gui_2.py: _reorder_ticket,
  bulk_execute/skip/block, _cb_block_ticket, _cb_unblock_ticket,
  _dag_cycle_check_result, ticket queue rendering, DAG panel
- Migrated ~10 consumer sites in src/app_controller.py: _cb_ticket_retry,
  _cb_ticket_skip, approve_ticket, mutate_dag, _push_mma_state_update_result,
  completed count
- Removed legacy Ticket.get() compat method (Task 1.5)
- Added tests/test_metadata_promotion_phase1.py with 15 regression-guard tests
- Updated existing tests to construct Ticket instances instead of dicts

Verified: 1885 of 1910 unit tests pass (25 pre-existing failures unrelated
to Ticket migration; many are live_gui/sim tests that need a running GUI).
This commit is contained in:
2026-06-25 18:20:45 -04:00
parent 9fdb7e0cc9
commit 0506c5da63
12 changed files with 358 additions and 171 deletions
+26 -24
View File
@@ -1107,7 +1107,7 @@ class AppController:
# --- Defaults set here so tests that construct AppController without # --- Defaults set here so tests that construct AppController without
# calling init_state() still see the attributes --- # calling init_state() still see the attributes ---
self.ui_global_preset_name: Optional[str] = None self.ui_global_preset_name: Optional[str] = None
self.active_tickets: list[Metadata] = [] self.active_tickets: list[models.Ticket] = []
self.ui_selected_tickets: Set[str] = set() self.ui_selected_tickets: Set[str] = set()
#region: --- Configuration Maps --- #region: --- Configuration Maps ---
@@ -2145,6 +2145,7 @@ class AppController:
description=at_data.get("description"), description=at_data.get("description"),
tickets=tickets tickets=tickets
) )
self.active_tickets = tickets
return Result(data=track) return Result(data=track)
except (TypeError, ValueError, KeyError, AttributeError) as e: except (TypeError, ValueError, KeyError, AttributeError) as e:
return Result(data=None, errors=[ErrorInfo( return Result(data=None, errors=[ErrorInfo(
@@ -3052,7 +3053,7 @@ class AppController:
elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0 elapsed_min = (time.time() - self._session_start_time) / 60.0 if self._token_history else 0
burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0 burn_rate = total_tokens / elapsed_min if elapsed_min > 0 else 0
session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output) session_cost = cost_tracker.estimate_cost("gemini-2.5-flash", total_input, total_output)
completed = sum(1 for t in self.active_tickets if t.get("status") == "complete") completed = sum(1 for t in self.active_tickets if t.status == "complete")
efficiency = total_tokens / completed if completed > 0 else 0 efficiency = total_tokens / completed if completed > 0 else 0
return { return {
"total_tokens": total_tokens, "total_tokens": total_tokens,
@@ -3273,7 +3274,8 @@ class AppController:
result = self._deserialize_active_track_result(at_data) result = self._deserialize_active_track_result(at_data)
if result.ok: if result.ok:
self.active_track = result.data self.active_track = result.data
self.active_tickets = at_data.get("tickets", []) # Keep dicts for UI table raw_tickets = at_data.get("tickets", [])
self.active_tickets = [models.Ticket.from_dict(t) if isinstance(t, dict) else t for t in raw_tickets]
else: else:
err = result.errors[0] err = result.errors[0]
self._last_request_errors.append(("active_track_deserialize", err)) self._last_request_errors.append(("active_track_deserialize", err))
@@ -4704,7 +4706,8 @@ class AppController:
"""Phase 6 Group 6.7: topological sort with Result propagation. """Phase 6 Group 6.7: topological sort with Result propagation.
On ValueError: fall back to raw_tickets (preserves existing behavior).""" On ValueError: fall back to raw_tickets (preserves existing behavior)."""
try: try:
sorted_tickets_data = conductor_tech_lead.topological_sort(raw_tickets) normalized = [models.Ticket.from_dict(t) if isinstance(t, dict) else t for t in raw_tickets]
sorted_tickets_data = conductor_tech_lead.topological_sort(normalized)
return Result(data=sorted_tickets_data) return Result(data=sorted_tickets_data)
except ValueError as e: except ValueError as e:
err = ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=str(e), err = ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=str(e),
@@ -4806,8 +4809,8 @@ class AppController:
[C: tests/test_mma_ticket_actions.py:test_cb_ticket_retry] [C: tests/test_mma_ticket_actions.py:test_cb_ticket_retry]
""" """
for t in self.active_tickets: for t in self.active_tickets:
if t.get('id') == ticket_id: if t.id == ticket_id:
t['status'] = 'todo' t.status = 'todo'
break break
self.event_queue.put("mma_retry", {"ticket_id": ticket_id}) self.event_queue.put("mma_retry", {"ticket_id": ticket_id})
@@ -4816,8 +4819,8 @@ class AppController:
[C: tests/test_mma_ticket_actions.py:test_cb_ticket_skip] [C: tests/test_mma_ticket_actions.py:test_cb_ticket_skip]
""" """
for t in self.active_tickets: for t in self.active_tickets:
if t.get('id') == ticket_id: if t.id == ticket_id:
t['status'] = 'skipped' t.status = 'skipped'
break break
self.event_queue.put("mma_skip", {"ticket_id": ticket_id}) self.event_queue.put("mma_skip", {"ticket_id": ticket_id})
@@ -4864,8 +4867,8 @@ class AppController:
else: else:
# Fallback if engine not running # Fallback if engine not running
for t in self.active_tickets: for t in self.active_tickets:
if t.get('id') == ticket_id: if t.id == ticket_id:
t['status'] = 'in_progress' t.status = 'in_progress'
break break
self._push_mma_state_update() self._push_mma_state_update()
@@ -4875,8 +4878,8 @@ class AppController:
depends_on = data.get("depends_on") depends_on = data.get("depends_on")
if ticket_id and depends_on is not None: if ticket_id and depends_on is not None:
for t in self.active_tickets: for t in self.active_tickets:
if t.get("id") == ticket_id: if t.id == ticket_id:
t["depends_on"] = depends_on t.depends_on = depends_on
break break
if self.active_track: if self.active_track:
for t in self.active_track.tickets: for t in self.active_track.tickets:
@@ -5068,11 +5071,11 @@ class AppController:
if track is None: return OK if track is None: return OK
new_tickets = [ new_tickets = [
models.Ticket( models.Ticket(
id=t.get("id", ""), id=t.id,
description=t.get("description", ""), description=t.description,
status=t.get("status", "todo"), status=t.status,
assigned_to=t.get("assigned_to", ""), assigned_to=t.assigned_to,
depends_on=t.get("depends_on", []), depends_on=list(t.depends_on),
) )
for t in self.active_tickets for t in self.active_tickets
] ]
@@ -5104,13 +5107,12 @@ class AppController:
beads_result = self._load_beads_from_path_result(Path(base)) beads_result = self._load_beads_from_path_result(Path(base))
if beads_result.ok: if beads_result.ok:
for bead in beads_result.data: for bead in beads_result.data:
self.active_tickets.append({ self.active_tickets.append(models.Ticket(
"id": bead.id, id=bead.id,
"title": bead.title, description=bead.description or "",
"description": bead.description, status=bead.status,
"status": bead.status, depends_on=[],
"depends_on": [], ))
})
elif not beads_result.ok: elif not beads_result.ok:
self._report_worker_error("load_beads", beads_result) self._report_worker_error("load_beads", beads_result)
+4 -10
View File
@@ -104,25 +104,19 @@ from src.dag_engine import TrackDAG
from src.models import Ticket from src.models import Ticket
from src.result_types import ErrorInfo, ErrorKind, Result from src.result_types import ErrorInfo, ErrorKind, Result
def topological_sort(tickets: list[dict[str, Any]]) -> list[dict[str, Any]]: def topological_sort(tickets: list[Ticket]) -> list[Ticket]:
""" """
Sorts a list of tickets based on their 'depends_on' field. Sorts a list of Ticket objects based on their depends_on field.
Raises ValueError if a circular dependency or missing internal dependency is detected. Raises ValueError if a circular dependency or missing internal dependency is detected.
[C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance] [C: tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_complex, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_cycle, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_empty, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_linear, tests/test_conductor_tech_lead.py:TestTopologicalSort.test_topological_sort_missing_dependency, tests/test_conductor_tech_lead.py:test_topological_sort_vlog, tests/test_dag_engine.py:test_topological_sort, tests/test_dag_engine.py:test_topological_sort_cycle, tests/test_orchestration_logic.py:test_topological_sort, tests/test_orchestration_logic.py:test_topological_sort_circular, tests/test_perf_dag.py:test_dag_edge_cases, tests/test_perf_dag.py:test_dag_performance]
""" """
# 1. Convert to Ticket objects for TrackDAG dag = TrackDAG(tickets)
ticket_objs = []
for t_data in tickets:
ticket_objs.append(Ticket.from_dict(t_data))
# 2. Use TrackDAG for validation and sorting
dag = TrackDAG(ticket_objs)
try: try:
sorted_ids = dag.topological_sort() sorted_ids = dag.topological_sort()
except ValueError as e: except ValueError as e:
_dag_err = Result(data=None, errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"DAG Validation Error: {e}", source="conductor_tech_lead.topological_sort", original=e)]) _dag_err = Result(data=None, errors=[ErrorInfo(kind=ErrorKind.INVALID_INPUT, message=f"DAG Validation Error: {e}", source="conductor_tech_lead.topological_sort", original=e)])
raise ValueError(f"DAG Validation Error: {e}") raise ValueError(f"DAG Validation Error: {e}")
# 3. Return sorted dictionaries ticket_map = {t.id: t for t in tickets}
ticket_map = {t['id']: t for t in tickets}
return [ticket_map[tid] for tid in sorted_ids] return [ticket_map[tid] for tid in sorted_ids]
if __name__ == "__main__": if __name__ == "__main__":
+73 -73
View File
@@ -1363,10 +1363,10 @@ class App:
ticket = new_tickets.pop(src_idx) ticket = new_tickets.pop(src_idx)
new_tickets.insert(dst_idx, ticket) new_tickets.insert(dst_idx, ticket)
# Validate dependencies: a ticket cannot be placed before any of its dependencies # Validate dependencies: a ticket cannot be placed before any of its dependencies
id_to_idx = {str(t.get('id', '')): i for i, t in enumerate(new_tickets)} id_to_idx = {str(t.id): i for i, t in enumerate(new_tickets)}
valid = True valid = True
for i, t in enumerate(new_tickets): for i, t in enumerate(new_tickets):
deps = t.get('depends_on', []) deps = t.depends_on
for d_id in deps: for d_id in deps:
if d_id in id_to_idx and id_to_idx[d_id] >= i: if d_id in id_to_idx and id_to_idx[d_id] >= i:
valid = False valid = False
@@ -1384,20 +1384,20 @@ class App:
def bulk_execute(self) -> None: def bulk_execute(self) -> None:
for tid in self.ui_selected_tickets: for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None) t = next((t for t in self.active_tickets if str(t.id) == tid), None)
if t: t['status'] = 'in_progress' if t: t.status = 'in_progress'
self._push_mma_state_update() self._push_mma_state_update()
def bulk_skip(self) -> None: def bulk_skip(self) -> None:
for tid in self.ui_selected_tickets: for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None) t = next((t for t in self.active_tickets if str(t.id) == tid), None)
if t: t['status'] = 'completed' if t: t.status = 'completed'
self._push_mma_state_update() self._push_mma_state_update()
def bulk_block(self) -> None: def bulk_block(self) -> None:
for tid in self.ui_selected_tickets: for tid in self.ui_selected_tickets:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == tid), None) t = next((t for t in self.active_tickets if str(t.id) == tid), None)
if t: t['status'] = 'blocked' if t: t.status = 'blocked'
self._push_mma_state_update() self._push_mma_state_update()
def _cb_kill_ticket(self, ticket_id: str) -> None: def _cb_kill_ticket(self, ticket_id: str) -> None:
@@ -1405,44 +1405,44 @@ class App:
self.controller.engine.kill_worker(ticket_id) self.controller.engine.kill_worker(ticket_id)
def _cb_block_ticket(self, ticket_id: str) -> None: def _cb_block_ticket(self, ticket_id: str) -> None:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == ticket_id), None) t = next((t for t in self.active_tickets if str(t.id) == ticket_id), None)
if t: if t:
t['status'] = 'blocked' t.status = 'blocked'
t['manual_block'] = True t.manual_block = True
t['blocked_reason'] = '[MANUAL] User blocked' t.blocked_reason = '[MANUAL] User blocked'
changed = True changed = True
while changed: while changed:
changed = False changed = False
for t in self.active_tickets: for t in self.active_tickets:
if t.get('status') == 'todo': if t.status == 'todo':
for dep_id in t.get('depends_on', []): for dep_id in t.depends_on:
dep = next((x for x in self.active_tickets if str(x.get('id', '')) == dep_id), None) dep = next((x for x in self.active_tickets if str(x.id) == dep_id), None)
if dep and dep.get('status') == 'blocked': if dep and dep.status == 'blocked':
t['status'] = 'blocked' t.status = 'blocked'
changed = True changed = True
break break
self._push_mma_state_update() self._push_mma_state_update()
def _cb_unblock_ticket(self, ticket_id: str) -> None: def _cb_unblock_ticket(self, ticket_id: str) -> None:
t = next((t for t in self.active_tickets if str(t.get('id', '')) == ticket_id), None) t = next((t for t in self.active_tickets if str(t.id) == ticket_id), None)
if t and t.get('manual_block', False): if t and t.manual_block:
t['status'] = 'todo' t.status = 'todo'
t['manual_block'] = False t.manual_block = False
t['blocked_reason'] = None t.blocked_reason = None
changed = True changed = True
while changed: while changed:
changed = False changed = False
for t in self.active_tickets: for t in self.active_tickets:
if t.get('status') == 'blocked' and not t.get('manual_block', False): if t.status == 'blocked' and not t.manual_block:
can_run = True can_run = True
for dep_id in t.get('depends_on', []): for dep_id in t.depends_on:
dep = next((x for x in self.active_tickets if str(x.get('id', '')) == dep_id), None) dep = next((x for x in self.active_tickets if str(x.id) == dep_id), None)
if dep and dep.get('status') != 'completed': if dep and dep.status != 'completed':
can_run = False can_run = False
break break
if can_run: if can_run:
t['status'] = 'todo' t.status = 'todo'
changed = True changed = True
self._push_mma_state_update() self._push_mma_state_update()
def _post_init_callback_result(app: "App") -> Result[None]: def _post_init_callback_result(app: "App") -> Result[None]:
@@ -1679,7 +1679,7 @@ def _dag_cycle_check_result(app: "App") -> Result[bool]:
""" """
from src.dag_engine import TrackDAG from src.dag_engine import TrackDAG
try: try:
ticket_dicts = [{'id': str(t.get('id', '')), 'depends_on': t.get('depends_on', [])} for t in app.active_tickets] ticket_dicts = [{'id': str(t.id), 'depends_on': list(t.depends_on)} for t in app.active_tickets]
temp_dag = TrackDAG(ticket_dicts) temp_dag = TrackDAG(ticket_dicts)
has_cycle = temp_dag.has_cycle() has_cycle = temp_dag.has_cycle()
return Result(data=has_cycle) return Result(data=has_cycle)
@@ -6849,25 +6849,25 @@ def render_mma_ticket_editor(app: App) -> None:
+---------------------------------------------------------+ +---------------------------------------------------------+
""" """
imgui.separator(); imgui.text_colored(C_VAL(), f"Editing: {app.ui_selected_ticket_id}") imgui.separator(); imgui.text_colored(C_VAL(), f"Editing: {app.ui_selected_ticket_id}")
ticket = next((t for t in app.active_tickets if str(t.get('id', '')) == app.ui_selected_ticket_id), None) ticket = next((t for t in app.active_tickets if str(t.id) == app.ui_selected_ticket_id), None)
if ticket: if ticket:
imgui.text(f"Status: {ticket.get('status', 'todo')}"); prio = ticket.get('priority', 'medium') imgui.text(f"Status: {ticket.status}"); prio = ticket.priority
imgui.text("Priority:"); imgui.same_line() imgui.text("Priority:"); imgui.same_line()
if imgui.begin_combo(f"##edit_prio_{ticket.get('id')}", prio): if imgui.begin_combo(f"##edit_prio_{ticket.id}", prio):
for p_opt in ['high', 'medium', 'low']: for p_opt in ['high', 'medium', 'low']:
if imgui.selectable(p_opt, p_opt == prio)[0]: ticket['priority'] = p_opt; app._push_mma_state_update() if imgui.selectable(p_opt, p_opt == prio)[0]: ticket.priority = p_opt; app._push_mma_state_update()
imgui.end_combo() imgui.end_combo()
imgui.text(f"Target: {ticket.get('target_file', '')}"); imgui.text(f"Depends on: {', '.join(ticket.get('depends_on', []))}") imgui.text(f"Target: {ticket.target_file or ''}"); imgui.text(f"Depends on: {', '.join(ticket.depends_on)}")
personas = getattr(app.controller, 'personas', {}); curr_pers = ticket.get('persona_id', '') personas = getattr(app.controller, 'personas', {}); curr_pers = ticket.persona_id or ''
imgui.text("Persona Override:"); imgui.same_line() imgui.text("Persona Override:"); imgui.same_line()
pers_opts = ["None"] + sorted(personas.keys()); pers_opts = ["None"] + sorted(personas.keys());
curr_idx = pers_opts.index(curr_pers) + 1 if curr_pers in pers_opts else 0 curr_idx = pers_opts.index(curr_pers) + 1 if curr_pers in pers_opts else 0
_, curr_idx = imgui.combo(f"##ticket_persona_{ticket.get('id')}", curr_idx, pers_opts) _, curr_idx = imgui.combo(f"##ticket_persona_{ticket.id}", curr_idx, pers_opts)
ticket['persona_id'] = None if curr_idx == 0 or pers_opts[curr_idx] == "None" else pers_opts[curr_idx] ticket.persona_id = None if curr_idx == 0 or pers_opts[curr_idx] == "None" else pers_opts[curr_idx]
if imgui.button(f"Mark Complete##{app.ui_selected_ticket_id}"): ticket['status'] = 'done'; app._push_mma_state_update() if imgui.button(f"Mark Complete##{app.ui_selected_ticket_id}"): ticket.status = 'done'; app._push_mma_state_update()
imgui.same_line() imgui.same_line()
if imgui.button(f"Delete##{app.ui_selected_ticket_id}"): if imgui.button(f"Delete##{app.ui_selected_ticket_id}"):
app.active_tickets = [t for t in app.active_tickets if str(t.get('id', '')) != app.ui_selected_ticket_id] app.active_tickets = [t for t in app.active_tickets if str(t.id) != app.ui_selected_ticket_id]
app.ui_selected_ticket_id = None app.ui_selected_ticket_id = None
app._push_mma_state_update() app._push_mma_state_update()
@@ -7068,7 +7068,7 @@ def render_ticket_queue(app: App) -> None:
return return
# Select All / None # Select All / None
if imgui.button("Select All"): app.ui_selected_tickets = {str(t.get('id', '')) for t in app.active_tickets} if imgui.button("Select All"): app.ui_selected_tickets = {str(t.id) for t in app.active_tickets}
imgui.same_line() imgui.same_line()
if imgui.button("Select None"): app.ui_selected_tickets.clear() if imgui.button("Select None"): app.ui_selected_tickets.clear()
@@ -7093,7 +7093,7 @@ def render_ticket_queue(app: App) -> None:
imgui.table_headers_row() imgui.table_headers_row()
for i, t in enumerate(app.active_tickets): for i, t in enumerate(app.active_tickets):
tid = str(t.get('id', '')) tid = str(t.id)
imgui.table_next_row() imgui.table_next_row()
# Select # Select
@@ -7125,50 +7125,50 @@ def render_ticket_queue(app: App) -> None:
# Priority # Priority
imgui.table_next_column() imgui.table_next_column()
prio = t.get('priority', 'medium') prio = t.priority
p_col = theme.get_color("text_disabled") # gray p_col = theme.get_color("text_disabled") # gray
if prio == 'high': _col = theme.get_color("status_error") # red if prio == 'high': _col = theme.get_color("status_error") # red
elif prio == 'medium': p_col = theme.get_color("status_warning") # yellow elif prio == 'medium': p_col = theme.get_color("status_warning") # yellow
imgui.push_style_color(imgui.Col_.text, p_col) imgui.push_style_color(imgui.Col_.text, p_col)
if imgui.begin_combo(f"##prio_{tid}", prio, imgui.ComboFlags_.height_small): if imgui.begin_combo(f"##prio_{tid}", prio, imgui.ComboFlags_.height_small):
for p_opt in ['high', 'medium', 'low']: for p_opt in ['high', 'medium', 'low']:
if imgui.selectable(p_opt, p_opt == prio)[0]: if imgui.selectable(p_opt, p_opt == prio)[0]:
t['priority'] = p_opt t.priority = p_opt
app._push_mma_state_update() app._push_mma_state_update()
imgui.end_combo() imgui.end_combo()
imgui.pop_style_color() imgui.pop_style_color()
# Model # Model
imgui.table_next_column() imgui.table_next_column()
model_override = t.get('model_override') model_override = t.model_override
current_model = model_override if model_override else "Default" current_model = model_override if model_override else "Default"
if imgui.begin_combo(f"##model_{tid}", current_model, imgui.ComboFlags_.height_small): if imgui.begin_combo(f"##model_{tid}", current_model, imgui.ComboFlags_.height_small):
if imgui.selectable("Default", model_override is None)[0]: if imgui.selectable("Default", model_override is None)[0]:
t['model_override'] = None; app._push_mma_state_update() t.model_override = None; app._push_mma_state_update()
for model in ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3-flash-preview", "gemini-3.1-pro-preview", "deepseek-v3"]: for model in ["gemini-2.5-flash-lite", "gemini-2.5-flash", "gemini-3-flash-preview", "gemini-3.1-pro-preview", "deepseek-v3"]:
if imgui.selectable(model, model_override == model)[0]: if imgui.selectable(model, model_override == model)[0]:
t['model_override'] = model; app._push_mma_state_update() t.model_override = model; app._push_mma_state_update()
imgui.end_combo() imgui.end_combo()
# Status # Status
imgui.table_next_column() imgui.table_next_column()
status = t.get('status', 'todo') status = t.status
if t.get('model_override'): imgui.text_colored(theme.get_color("status_warning"), f"{status} [{t.get('model_override')}]") if t.model_override: imgui.text_colored(theme.get_color("status_warning"), f"{status} [{t.model_override}]")
else: imgui.text(t.get('status', 'todo')) else: imgui.text(t.status)
# Description # Description
imgui.table_next_column() imgui.table_next_column()
imgui.text(t.get('description', '')) imgui.text(t.description)
# Actions - Kill button for in_progress tickets # Actions - Kill button for in_progress tickets
imgui.table_next_column() imgui.table_next_column()
status = t.get('status', 'todo') status = t.status
if status == 'in_progress': if status == 'in_progress':
if imgui.button(f"Kill##{tid}"): app._cb_kill_ticket(tid) if imgui.button(f"Kill##{tid}"): app._cb_kill_ticket(tid)
elif status == 'todo': elif status == 'todo':
if imgui.button(f"Block##{tid}"): app._cb_block_ticket(tid) if imgui.button(f"Block##{tid}"): app._cb_block_ticket(tid)
elif status == 'blocked' and t.get('manual_block', False): elif status == 'blocked' and t.manual_block:
if imgui.button(f"Unblock##{tid}"): app._cb_unblock_ticket(tid) if imgui.button(f"Unblock##{tid}"): app._cb_unblock_ticket(tid)
imgui.end_table() imgui.end_table()
@@ -7200,19 +7200,19 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
for node_id in selected: for node_id in selected:
node_val = node_id.id() node_val = node_id.id()
for t in app.active_tickets: for t in app.active_tickets:
if abs(hash(str(t.get('id', '')))) == node_val: if abs(hash(str(t.id))) == node_val:
app.ui_selected_ticket_id = str(t.get('id', '')) app.ui_selected_ticket_id = str(t.id)
break break
break break
for t in app.active_tickets: for t in app.active_tickets:
tid = str(t.get('id', '??')) tid = str(t.id) if t.id else '??'
int_id = abs(hash(tid)) int_id = abs(hash(tid))
ed.begin_node(ed.NodeId(int_id)) ed.begin_node(ed.NodeId(int_id))
if getattr(app, "ui_project_execution_mode", "native") == "beads": if getattr(app, "ui_project_execution_mode", "native") == "beads":
imgui.text_colored(theme.get_color("status_info"), "[B] ") imgui.text_colored(theme.get_color("status_info"), "[B] ")
imgui.same_line() imgui.same_line()
imgui.text_colored(C_KEY(), f"Ticket: {tid}") imgui.text_colored(C_KEY(), f"Ticket: {tid}")
status = t.get('status', 'todo') status = t.status
s_col = C_VAL() s_col = C_VAL()
if status == 'done' or status == 'complete': s_col = C_IN() if status == 'done' or status == 'complete': s_col = C_IN()
elif status == 'in_progress' or status == 'running': s_col = C_OUT() elif status == 'in_progress' or status == 'running': s_col = C_OUT()
@@ -7220,7 +7220,7 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
imgui.text("Status: ") imgui.text("Status: ")
imgui.same_line() imgui.same_line()
imgui.text_colored(s_col, status) imgui.text_colored(s_col, status)
imgui.text(f"Target: {t.get('target_file','')}") imgui.text(f"Target: {t.target_file or ''}")
ed.begin_pin(ed.PinId(abs(hash(tid + "_in"))), ed.PinKind.input) ed.begin_pin(ed.PinId(abs(hash(tid + "_in"))), ed.PinKind.input)
imgui.text("->") imgui.text("->")
ed.end_pin() ed.end_pin()
@@ -7230,10 +7230,10 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
ed.end_pin() ed.end_pin()
ed.end_node() ed.end_node()
for t in app.active_tickets: for t in app.active_tickets:
tid = str(t.get('id', '??')) tid = str(t.id) if t.id else '??'
for dep in t.get('depends_on', []): for dep in t.depends_on:
ed.link(ed.LinkId(abs(hash(dep + "_" + tid))), ed.PinId(abs(hash(dep + "_out"))), ed.PinId(abs(hash(tid + "_in")))) ed.link(ed.LinkId(abs(hash(dep + "_" + tid))), ed.PinId(abs(hash(dep + "_out"))), ed.PinId(abs(hash(tid + "_in"))))
# Handle link creation # Handle link creation
if ed.begin_create(): if ed.begin_create():
start_pin = ed.PinId() start_pin = ed.PinId()
@@ -7245,16 +7245,16 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
source_tid = None source_tid = None
target_tid = None target_tid = None
for t in app.active_tickets: for t in app.active_tickets:
tid = str(t.get('id', '')) tid = str(t.id)
if abs(hash(tid + "_out")) == s_id: source_tid = tid if abs(hash(tid + "_out")) == s_id: source_tid = tid
if abs(hash(tid + "_out")) == e_id: source_tid = tid if abs(hash(tid + "_out")) == e_id: source_tid = tid
if abs(hash(tid + "_in")) == s_id: target_tid = tid if abs(hash(tid + "_in")) == s_id: target_tid = tid
if abs(hash(tid + "_in")) == e_id: target_tid = tid if abs(hash(tid + "_in")) == e_id: target_tid = tid
if source_tid and target_tid and source_tid != target_tid: if source_tid and target_tid and source_tid != target_tid:
for t in app.active_tickets: for t in app.active_tickets:
if str(t.get('id', '')) == target_tid: if str(t.id) == target_tid:
if source_tid not in t.get('depends_on', []): if source_tid not in t.depends_on:
t.setdefault('depends_on', []).append(source_tid) t.depends_on = list(t.depends_on) + [source_tid]
app._push_mma_state_update() app._push_mma_state_update()
break break
ed.end_create() ed.end_create()
@@ -7266,10 +7266,10 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
if ed.accept_deleted_item(): if ed.accept_deleted_item():
lid_val = link_id.id() lid_val = link_id.id()
for t in app.active_tickets: for t in app.active_tickets:
tid = str(t.get('id', '')) tid = str(t.id)
deps = t.get('depends_on', []) deps = t.depends_on
if any(abs(hash(d + "_" + tid)) == lid_val for d in deps): if any(abs(hash(d + "_" + tid)) == lid_val for d in deps):
t['depends_on'] = [dep for dep in deps if abs(hash(dep + "_" + tid)) != lid_val] t.depends_on = [dep for dep in deps if abs(hash(dep + "_" + tid)) != lid_val]
app._push_mma_state_update() app._push_mma_state_update()
break break
ed.end_delete() ed.end_delete()
@@ -7291,7 +7291,7 @@ def render_task_dag_panel(app: App) -> None: # 4. Task DAG Visualizer
# Default Ticket ID # Default Ticket ID
max_id = 0 max_id = 0
for t in app.active_tickets: for t in app.active_tickets:
tid = t.get('id', '') tid = t.id
if tid.startswith('T-'): if tid.startswith('T-'):
parse_result = _ticket_id_max_int_result(tid) parse_result = _ticket_id_max_int_result(tid)
if parse_result.ok: if parse_result.ok:
-5
View File
File diff suppressed because one or more lines are too long
+17 -16
View File
@@ -1,6 +1,7 @@
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
from src import conductor_tech_lead from src import conductor_tech_lead
from src.models import Ticket
from src.result_types import Result from src.result_types import Result
import pytest import pytest
@@ -30,28 +31,28 @@ class TestConductorTechLead(unittest.TestCase):
class TestTopologicalSort(unittest.TestCase): class TestTopologicalSort(unittest.TestCase):
def test_topological_sort_linear(self) -> None: def test_topological_sort_linear(self) -> None:
tickets = [ tickets = [
{"id": "t2", "depends_on": ["t1"]}, Ticket(id="t2", description="t2", depends_on=["t1"]),
{"id": "t1", "depends_on": []}, Ticket(id="t1", description="t1", depends_on=[]),
] ]
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
self.assertEqual(sorted_tickets[0]['id'], "t1") self.assertEqual(sorted_tickets[0].id, "t1")
self.assertEqual(sorted_tickets[1]['id'], "t2") self.assertEqual(sorted_tickets[1].id, "t2")
def test_topological_sort_complex(self) -> None: def test_topological_sort_complex(self) -> None:
tickets = [ tickets = [
{"id": "t3", "depends_on": ["t1", "t2"]}, Ticket(id="t3", description="t3", depends_on=["t1", "t2"]),
{"id": "t1", "depends_on": []}, Ticket(id="t1", description="t1", depends_on=[]),
{"id": "t2", "depends_on": ["t1"]}, Ticket(id="t2", description="t2", depends_on=["t1"]),
] ]
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
self.assertEqual(sorted_tickets[0]['id'], "t1") self.assertEqual(sorted_tickets[0].id, "t1")
self.assertEqual(sorted_tickets[1]['id'], "t2") self.assertEqual(sorted_tickets[1].id, "t2")
self.assertEqual(sorted_tickets[2]['id'], "t3") self.assertEqual(sorted_tickets[2].id, "t3")
def test_topological_sort_cycle(self) -> None: def test_topological_sort_cycle(self) -> None:
tickets = [ tickets = [
{"id": "t1", "depends_on": ["t2"]}, Ticket(id="t1", description="t1", depends_on=["t2"]),
{"id": "t2", "depends_on": ["t1"]}, Ticket(id="t2", description="t2", depends_on=["t1"]),
] ]
with self.assertRaises(ValueError) as cm: with self.assertRaises(ValueError) as cm:
conductor_tech_lead.topological_sort(tickets) conductor_tech_lead.topological_sort(tickets)
@@ -65,7 +66,7 @@ class TestTopologicalSort(unittest.TestCase):
# If a ticket depends on something not in the list, we should handle it or let it fail. # If a ticket depends on something not in the list, we should handle it or let it fail.
# The TrackDAG silently ignores missing dependencies, causing cycle detection to trigger. # The TrackDAG silently ignores missing dependencies, causing cycle detection to trigger.
tickets = [ tickets = [
{"id": "t1", "depends_on": ["missing"]}, Ticket(id="t1", description="t1", depends_on=["missing"]),
] ]
# Currently this raises ValueError due to cycle detection on incomplete sort # Currently this raises ValueError due to cycle detection on incomplete sort
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
@@ -73,12 +74,12 @@ class TestTopologicalSort(unittest.TestCase):
def test_topological_sort_vlog(vlogger) -> None: def test_topological_sort_vlog(vlogger) -> None:
tickets = [ tickets = [
{"id": "t2", "depends_on": ["t1"]}, Ticket(id="t2", description="t2", depends_on=["t1"]),
{"id": "t1", "depends_on": []}, Ticket(id="t1", description="t1", depends_on=[]),
] ]
vlogger.log_state("Input Order", ["t2", "t1"], ["t2", "t1"]) vlogger.log_state("Input Order", ["t2", "t1"], ["t2", "t1"])
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
result_ids = [t['id'] for t in sorted_tickets] result_ids = [t.id for t in sorted_tickets]
vlogger.log_state("Sorted Order", "N/A", result_ids) vlogger.log_state("Sorted Order", "N/A", result_ids)
assert result_ids == ["t1", "t2"] assert result_ids == ["t1", "t2"]
vlogger.finalize("Topological Sort Verification", "PASS", "Linear dependencies correctly ordered.") vlogger.finalize("Topological Sort Verification", "PASS", "Linear dependencies correctly ordered.")
+5 -3
View File
@@ -2315,9 +2315,10 @@ def test_phase_10_l7271_dag_cycle_check_result_no_cycle():
opening the "Cycle Detected!" popup. opening the "Cycle Detected!" popup.
""" """
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from src.models import Ticket
import src.gui_2 as gui2_mod import src.gui_2 as gui2_mod
app = MagicMock() app = MagicMock()
app.active_tickets = [{"id": "T-001", "depends_on": []}] app.active_tickets = [Ticket(id="T-001", description="T-001", depends_on=[])]
mock_dag = MagicMock() mock_dag = MagicMock()
mock_dag.has_cycle.return_value = False mock_dag.has_cycle.return_value = False
with patch("src.dag_engine.TrackDAG", return_value=mock_dag): with patch("src.dag_engine.TrackDAG", return_value=mock_dag):
@@ -2334,11 +2335,12 @@ def test_phase_10_l7271_dag_cycle_check_result_cycle_detected():
returns Result(data=True). The caller opens the "Cycle Detected!" popup. returns Result(data=True). The caller opens the "Cycle Detected!" popup.
""" """
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from src.models import Ticket
import src.gui_2 as gui2_mod import src.gui_2 as gui2_mod
app = MagicMock() app = MagicMock()
app.active_tickets = [ app.active_tickets = [
{"id": "T-001", "depends_on": ["T-002"]}, Ticket(id="T-001", description="T-001", depends_on=["T-002"]),
{"id": "T-002", "depends_on": ["T-001"]}, Ticket(id="T-002", description="T-002", depends_on=["T-001"]),
] ]
mock_dag = MagicMock() mock_dag = MagicMock()
mock_dag.has_cycle.return_value = True mock_dag.has_cycle.return_value = True
+2 -2
View File
@@ -47,5 +47,5 @@ def test_load_active_tickets_from_beads(tmp_path: Path):
# 5. Verify active_tickets populated from Beads # 5. Verify active_tickets populated from Beads
assert len(ctrl.active_tickets) == 1 assert len(ctrl.active_tickets) == 1
assert ctrl.active_tickets[0]["id"] == "bead-1" assert ctrl.active_tickets[0].id == "bead-1"
assert ctrl.active_tickets[0]["description"] == "Description 1" assert ctrl.active_tickets[0].description == "Description 1"
+2 -1
View File
@@ -1,5 +1,6 @@
import pytest import pytest
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
from src import models
def test_gui_has_kill_button_method(): def test_gui_has_kill_button_method():
from src.gui_2 import App from src.gui_2 import App
@@ -36,7 +37,7 @@ def test_render_ticket_queue_table_columns():
from src.gui_2 import App, render_ticket_queue from src.gui_2 import App, render_ticket_queue
app = App.__new__(App) app = App.__new__(App)
app.active_track = MagicMock() app.active_track = MagicMock()
app.active_tickets = [{"id": "T-001", "priority": "medium", "status": "in_progress", "description": "Test task"}] app.active_tickets = [models.Ticket(id="T-001", description="Test task", priority="medium", status="in_progress")]
app.ui_selected_tickets = set() app.ui_selected_tickets = set()
app.ui_selected_ticket_id = None app.ui_selected_ticket_id = None
app.controller = MagicMock() app.controller = MagicMock()
+191
View File
@@ -0,0 +1,191 @@
"""
Phase 1 of metadata_promotion_20260624.
Verifies:
1. self.active_tickets load boundaries convert dicts to models.Ticket
2. conductor_tech_lead.topological_sort returns list[models.Ticket]
3. gui_2.py consumer sites use direct field access (not .get())
4. app_controller.py consumer sites use direct field access (not .get())
"""
import inspect
from unittest.mock import patch
from src.models import Ticket
class TestActiveTicketsType:
def test_active_tickets_annotation_is_list_of_ticket(self) -> None:
"""self.active_tickets type hint must be list[models.Ticket], not list[Metadata]."""
from src.app_controller import AppController
src_text = inspect.getsource(AppController.__init__)
assert "list[models.Ticket]" in src_text, (
"AppController.__init__ must declare self.active_tickets: list[models.Ticket]"
)
assert "list[Metadata]" not in src_text.split("self.active_tickets")[1].split("\n")[0], (
"AppController.__init__ must NOT declare self.active_tickets: list[Metadata]"
)
class TestActiveTicketsLoadBoundaries:
def test_load_at_data_converts_dicts_to_tickets(self) -> None:
"""_deserialize_active_track_result boundary must wrap dicts as models.Ticket."""
from src.app_controller import AppController
with patch.object(AppController, "load_config", return_value={
'ai': {'provider': 'gemini', 'model': 'gemini-2.5-flash-lite'},
'projects': {'paths': [], 'active': ''},
'gui': {'show_windows': {}},
}), patch.object(AppController, "save_config"), \
patch.object(AppController, "_prune_old_logs"), \
patch.object(AppController, "start_services"), \
patch.object(AppController, "_init_ai_and_hooks"):
ctrl = AppController.__new__(AppController)
ctrl.__init__()
at_data = {
"id": "track-x",
"title": "Track X",
"tickets": [
{"id": "T1", "description": "first", "status": "todo"},
{"id": "T2", "description": "second", "status": "todo"},
],
}
ctrl._deserialize_active_track_result(at_data)
assert ctrl.active_tickets, "load path should populate active_tickets"
for t in ctrl.active_tickets:
assert isinstance(t, Ticket), (
f"active_tickets must contain Ticket instances, got {type(t).__name__}: {t!r}"
)
def test_load_active_tickets_beads_branch_converts_dicts_to_tickets(self) -> None:
"""_load_active_tickets (beads branch) must wrap bead dicts as models.Ticket."""
from src.app_controller import AppController
from src.models import Ticket
ctrl = AppController.__new__(AppController)
ctrl._last_request_errors = []
ctrl.ui_project_execution_mode = "beads"
ctrl.ui_files_base_dir = None
class _Bead:
def __init__(self, bid: str, title: str, desc: str, status: str) -> None:
self.id = bid; self.title = title; self.description = desc; self.status = status
with patch.object(AppController, "_load_beads_from_path_result") as mock_load:
mock_load.return_value = (lambda: type("R", (), {"ok": True, "data": [
_Bead("B1", "T1", "first", "todo"), _Bead("B2", "T2", "second", "todo")
]})())
ctrl._load_active_tickets()
for t in ctrl.active_tickets:
assert isinstance(t, Ticket), (
f"beads branch must populate active_tickets with Ticket instances, got {type(t).__name__}"
)
class TestTopologicalSortReturnsTicketList:
def test_topological_sort_returns_ticket_instances(self) -> None:
"""conductor_tech_lead.topological_sort must return list[models.Ticket]."""
from src import conductor_tech_lead
sig = inspect.signature(conductor_tech_lead.topological_sort)
assert sig.return_annotation is not inspect.Signature.empty
assert "Ticket" in str(sig.return_annotation), (
f"topological_sort return annotation must reference Ticket, got {sig.return_annotation}"
)
class TestGuiConsumersDirectFieldAccess:
def test_reorder_ticket_uses_direct_field_access(self) -> None:
"""gui_2.App._reorder_ticket must use t.id / t.depends_on (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App._reorder_ticket)
assert "t.get(" not in src, (
"_reorder_ticket must not call t.get() — use t.id and t.depends_on directly"
)
def test_bulk_execute_uses_direct_field_access(self) -> None:
"""gui_2.App.bulk_execute must use t.id (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App.bulk_execute)
assert "t.get(" not in src, (
"bulk_execute must not call t.get() — use t.id directly"
)
def test_bulk_skip_uses_direct_field_access(self) -> None:
"""gui_2.App.bulk_skip must use t.id (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App.bulk_skip)
assert "t.get(" not in src, (
"bulk_skip must not call t.get() — use t.id directly"
)
def test_bulk_block_uses_direct_field_access(self) -> None:
"""gui_2.App.bulk_block must use t.id (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App.bulk_block)
assert "t.get(" not in src, (
"bulk_block must not call t.get() — use t.id directly"
)
def test_cb_block_ticket_uses_direct_field_access(self) -> None:
"""gui_2.App._cb_block_ticket must use direct field access (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App._cb_block_ticket)
assert "t.get(" not in src, (
"_cb_block_ticket must not call t.get() — use direct field access"
)
def test_cb_unblock_ticket_uses_direct_field_access(self) -> None:
"""gui_2.App._cb_unblock_ticket must use direct field access (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2.App._cb_unblock_ticket)
assert "t.get(" not in src, (
"_cb_unblock_ticket must not call t.get() — use direct field access"
)
def test_dag_cycle_check_uses_direct_field_access(self) -> None:
"""gui_2._dag_cycle_check_result must use t.id / t.depends_on (not .get())."""
import inspect
from src import gui_2
src = inspect.getsource(gui_2._dag_cycle_check_result)
assert "t.get(" not in src, (
"_dag_cycle_check_result must not call t.get() — use t.id and t.depends_on directly"
)
class TestAppControllerConsumersDirectFieldAccess:
def test_cb_ticket_retry_uses_direct_field_access(self) -> None:
"""app_controller._cb_ticket_retry must use t.id (not .get())."""
import inspect
from src import app_controller
src = inspect.getsource(app_controller.AppController._cb_ticket_retry)
assert "t.get(" not in src, (
"_cb_ticket_retry must not call t.get() — use t.id directly"
)
def test_cb_ticket_skip_uses_direct_field_access(self) -> None:
"""app_controller._cb_ticket_skip must use t.id (not .get())."""
import inspect
from src import app_controller
src = inspect.getsource(app_controller.AppController._cb_ticket_skip)
assert "t.get(" not in src, (
"_cb_ticket_skip must not call t.get() — use t.id directly"
)
def test_approve_ticket_uses_direct_field_access(self) -> None:
"""app_controller.approve_ticket must use t.id (not .get())."""
import inspect
from src import app_controller
src = inspect.getsource(app_controller.AppController.approve_ticket)
assert "t.get(" not in src, (
"approve_ticket must not call t.get() — use t.id directly"
)
def test_mutate_dag_uses_direct_field_access(self) -> None:
"""app_controller.mutate_dag must use t.id and t.depends_on (not .get())."""
import inspect
from src import app_controller
src = inspect.getsource(app_controller.AppController.mutate_dag)
assert "t.get(" not in src, (
"mutate_dag must not call t.get() — use t.id and t.depends_on directly"
)
+5 -4
View File
@@ -1,16 +1,17 @@
from src.gui_2 import App from src.gui_2 import App
from src.models import Ticket
def test_cb_ticket_retry(app_instance: App) -> None: def test_cb_ticket_retry(app_instance: App) -> None:
ticket_id = "test_ticket_1" ticket_id = "test_ticket_1"
app_instance.active_tickets = [{"id": ticket_id, "status": "failed"}] app_instance.active_tickets = [Ticket(id=ticket_id, description="test", status="failed")]
# Synchronous implementation does not use asyncio.run_coroutine_threadsafe # Synchronous implementation does not use asyncio.run_coroutine_threadsafe
app_instance.controller._cb_ticket_retry(ticket_id) app_instance.controller._cb_ticket_retry(ticket_id)
# Verify status update # Verify status update
assert app_instance.active_tickets[0]['status'] == 'todo' assert app_instance.active_tickets[0].status == 'todo'
def test_cb_ticket_skip(app_instance: App) -> None: def test_cb_ticket_skip(app_instance: App) -> None:
ticket_id = "test_ticket_2" ticket_id = "test_ticket_2"
app_instance.active_tickets = [{"id": ticket_id, "status": "todo"}] app_instance.active_tickets = [Ticket(id=ticket_id, description="test", status="todo")]
app_instance.controller._cb_ticket_skip(ticket_id) app_instance.controller._cb_ticket_skip(ticket_id)
# Verify status update # Verify status update
assert app_instance.active_tickets[0]['status'] == 'skipped' assert app_instance.active_tickets[0].status == 'skipped'
+6 -6
View File
@@ -34,17 +34,17 @@ def test_generate_tickets() -> None:
def test_topological_sort() -> None: def test_topological_sort() -> None:
tickets = [ tickets = [
{"id": "T2", "depends_on": ["T1"]}, Ticket(id="T2", description="d2", depends_on=["T1"]),
{"id": "T1", "depends_on": []} Ticket(id="T1", description="d1", depends_on=[])
] ]
sorted_tickets = conductor_tech_lead.topological_sort(tickets) sorted_tickets = conductor_tech_lead.topological_sort(tickets)
assert sorted_tickets[0]["id"] == "T1" assert sorted_tickets[0].id == "T1"
assert sorted_tickets[1]["id"] == "T2" assert sorted_tickets[1].id == "T2"
def test_topological_sort_circular() -> None: def test_topological_sort_circular() -> None:
tickets = [ tickets = [
{"id": "T1", "depends_on": ["T2"]}, Ticket(id="T1", description="d1", depends_on=["T2"]),
{"id": "T2", "depends_on": ["T1"]} Ticket(id="T2", description="d2", depends_on=["T1"])
] ]
with pytest.raises(ValueError, match="DAG Validation Error"): with pytest.raises(ValueError, match="DAG Validation Error"):
conductor_tech_lead.topological_sort(tickets) conductor_tech_lead.topological_sort(tickets)
+27 -27
View File
@@ -40,70 +40,70 @@ def test_ticket_from_dict_default_priority():
class TestBulkOperations: class TestBulkOperations:
def test_bulk_execute(self, mock_app): def test_bulk_execute(self, mock_app):
mock_app.active_tickets = [ mock_app.active_tickets = [
{"id": "T1", "status": "todo"}, Ticket(id="T1", description="T1", status="todo"),
{"id": "T2", "status": "todo"}, Ticket(id="T2", description="T2", status="todo"),
{"id": "T3", "status": "todo"} Ticket(id="T3", description="T3", status="todo")
] ]
mock_app.ui_selected_tickets = {"T1", "T3"} mock_app.ui_selected_tickets = {"T1", "T3"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push: with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_execute() mock_app.bulk_execute()
assert mock_app.active_tickets[0]["status"] == "in_progress" assert mock_app.active_tickets[0].status == "in_progress"
assert mock_app.active_tickets[1]["status"] == "todo" assert mock_app.active_tickets[1].status == "todo"
assert mock_app.active_tickets[2]["status"] == "in_progress" assert mock_app.active_tickets[2].status == "in_progress"
mock_push.assert_called_once() mock_push.assert_called_once()
def test_bulk_skip(self, mock_app): def test_bulk_skip(self, mock_app):
mock_app.active_tickets = [ mock_app.active_tickets = [
{"id": "T1", "status": "todo"}, Ticket(id="T1", description="T1", status="todo"),
{"id": "T2", "status": "todo"} Ticket(id="T2", description="T2", status="todo")
] ]
mock_app.ui_selected_tickets = {"T1"} mock_app.ui_selected_tickets = {"T1"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push: with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_skip() mock_app.bulk_skip()
assert mock_app.active_tickets[0]["status"] == "completed" assert mock_app.active_tickets[0].status == "completed"
assert mock_app.active_tickets[1]["status"] == "todo" assert mock_app.active_tickets[1].status == "todo"
mock_push.assert_called_once() mock_push.assert_called_once()
def test_bulk_block(self, mock_app): def test_bulk_block(self, mock_app):
mock_app.active_tickets = [ mock_app.active_tickets = [
{"id": "T1", "status": "todo"}, Ticket(id="T1", description="T1", status="todo"),
{"id": "T2", "status": "todo"} Ticket(id="T2", description="T2", status="todo")
] ]
mock_app.ui_selected_tickets = {"T1", "T2"} mock_app.ui_selected_tickets = {"T1", "T2"}
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push: with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
mock_app.bulk_block() mock_app.bulk_block()
assert mock_app.active_tickets[0]["status"] == "blocked" assert mock_app.active_tickets[0].status == "blocked"
assert mock_app.active_tickets[1]["status"] == "blocked" assert mock_app.active_tickets[1].status == "blocked"
mock_push.assert_called_once() mock_push.assert_called_once()
class TestReorder: class TestReorder:
def test_reorder_ticket_valid(self, mock_app): def test_reorder_ticket_valid(self, mock_app):
mock_app.active_tickets = [ mock_app.active_tickets = [
{"id": "T1", "depends_on": []}, Ticket(id="T1", description="T1", depends_on=[]),
{"id": "T2", "depends_on": []}, Ticket(id="T2", description="T2", depends_on=[]),
{"id": "T3", "depends_on": ["T1"]} Ticket(id="T3", description="T3", depends_on=["T1"])
] ]
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push: with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
# Move T1 to index 1: [T2, T1, T3]. T3 depends on T1. T1 index 1 < T3 index 2. VALID. # Move T1 to index 1: [T2, T1, T3]. T3 depends on T1. T1 index 1 < T3 index 2. VALID.
mock_app._reorder_ticket(0, 1) mock_app._reorder_ticket(0, 1)
assert mock_app.active_tickets[0]["id"] == "T2" assert mock_app.active_tickets[0].id == "T2"
assert mock_app.active_tickets[1]["id"] == "T1" assert mock_app.active_tickets[1].id == "T1"
assert mock_app.active_tickets[2]["id"] == "T3" assert mock_app.active_tickets[2].id == "T3"
mock_push.assert_called_once() mock_push.assert_called_once()
def test_reorder_ticket_invalid(self, mock_app): def test_reorder_ticket_invalid(self, mock_app):
mock_app.active_tickets = [ mock_app.active_tickets = [
{"id": "T1", "depends_on": []}, Ticket(id="T1", description="T1", depends_on=[]),
{"id": "T2", "depends_on": ["T1"]} Ticket(id="T2", description="T2", depends_on=["T1"])
] ]
with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push: with patch.object(mock_app.controller, "_push_mma_state_update") as mock_push:
# Move T1 after T2: [T2, T1]. T2 depends on T1, but T1 is now at index 1 while T2 is at index 0. # Move T1 after T2: [T2, T1]. T2 depends on T1, but T1 is now at index 1 while T2 is at index 0.
# Violation: dependency T1 (index 1) is not before T2 (index 0). # Violation: dependency T1 (index 1) is not before T2 (index 0).
mock_app._reorder_ticket(0, 1) mock_app._reorder_ticket(0, 1)
# Should NOT change # Should NOT change
assert mock_app.active_tickets[0]["id"] == "T1" assert mock_app.active_tickets[0].id == "T1"
assert mock_app.active_tickets[1]["id"] == "T2" assert mock_app.active_tickets[1].id == "T2"
mock_push.assert_not_called() mock_push.assert_not_called()