Files
manual_slop/tests/test_arch_boundary_phase3.py
2026-03-05 14:24:03 -05:00

93 lines
3.5 KiB
Python

import os
import sys
import unittest
from unittest.mock import patch, MagicMock
# Ensure project root is in path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class TestArchBoundaryPhase3(unittest.TestCase):
def setUp(self) -> None:
pass
def test_cascade_blocks_simple(self) -> None:
"""Test that a blocked dependency blocks its immediate dependent."""
from src.models import Ticket, Track
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
track = Track(id="TR1", description="track", tickets=[t1, t2])
# ExecutionEngine should identify T2 as blocked during tick
from src.dag_engine import TrackDAG, ExecutionEngine
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
if t2.blocked_reason:
self.assertIn("T1", t2.blocked_reason)
def test_cascade_blocks_multi_hop(self) -> None:
"""Test that blocking cascades through multiple dependencies."""
from src.models import Ticket, Track
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
t3 = Ticket(id="T3", description="d3", status="todo", assigned_to="worker1", depends_on=["T2"])
dag = TrackDAG([t1, t2, t3])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")
self.assertEqual(t3.status, "blocked")
def test_manual_unblock_restores_todo(self) -> None:
"""Test that unblocking a task manually works if dependencies are met."""
from src.models import Ticket, Track
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="completed", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="blocked", assigned_to="worker1", blocked_reason="manual")
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
# Update status to todo
engine.update_task_status("T2", "todo")
self.assertEqual(t2.status, "todo")
# Next tick should keep it todo (ready)
ready = engine.tick()
self.assertIn(t2, ready)
def test_in_progress_not_blocked(self) -> None:
"""Test that in_progress tasks are not blocked automatically (only todo)."""
from src.models import Ticket, Track
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="in_progress", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
# T2 should remain in_progress because it's already running
self.assertEqual(t2.status, "in_progress")
def test_execution_engine_tick_cascades_blocks(self) -> None:
"""Test that ExecutionEngine.tick() triggers the cascading blocks."""
from src.models import Ticket, Track
from src.dag_engine import TrackDAG, ExecutionEngine
t1 = Ticket(id="T1", description="d1", status="blocked", assigned_to="worker1")
t2 = Ticket(id="T2", description="d2", status="todo", assigned_to="worker1", depends_on=["T1"])
dag = TrackDAG([t1, t2])
engine = ExecutionEngine(dag)
engine.tick()
self.assertEqual(t2.status, "blocked")