77 lines
3.3 KiB
Python
77 lines
3.3 KiB
Python
import unittest
|
|
from unittest.mock import MagicMock, patch
|
|
import conductor_tech_lead
|
|
import pytest
|
|
|
|
class TestConductorTechLead(unittest.TestCase):
|
|
def test_generate_tickets_parse_error(self) -> None:
|
|
with patch('ai_client.send') as mock_send:
|
|
mock_send.return_value = "invalid json"
|
|
# conductor_tech_lead.generate_tickets returns [] on error, doesn't raise
|
|
tickets = conductor_tech_lead.generate_tickets("brief", "skeletons")
|
|
self.assertEqual(tickets, [])
|
|
|
|
def test_generate_tickets_success(self) -> None:
|
|
with patch('ai_client.send') as mock_send:
|
|
mock_send.return_value = '[{"id": "T1", "description": "desc", "depends_on": []}]'
|
|
tickets = conductor_tech_lead.generate_tickets("brief", "skeletons")
|
|
self.assertEqual(len(tickets), 1)
|
|
self.assertEqual(tickets[0]['id'], "T1")
|
|
|
|
class TestTopologicalSort(unittest.TestCase):
|
|
def test_topological_sort_linear(self) -> None:
|
|
tickets = [
|
|
{"id": "t2", "depends_on": ["t1"]},
|
|
{"id": "t1", "depends_on": []},
|
|
]
|
|
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
|
|
self.assertEqual(sorted_tickets[0]['id'], "t1")
|
|
self.assertEqual(sorted_tickets[1]['id'], "t2")
|
|
|
|
def test_topological_sort_complex(self) -> None:
|
|
tickets = [
|
|
{"id": "t3", "depends_on": ["t1", "t2"]},
|
|
{"id": "t1", "depends_on": []},
|
|
{"id": "t2", "depends_on": ["t1"]},
|
|
]
|
|
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
|
|
self.assertEqual(sorted_tickets[0]['id'], "t1")
|
|
self.assertEqual(sorted_tickets[1]['id'], "t2")
|
|
self.assertEqual(sorted_tickets[2]['id'], "t3")
|
|
|
|
def test_topological_sort_cycle(self) -> None:
|
|
tickets = [
|
|
{"id": "t1", "depends_on": ["t2"]},
|
|
{"id": "t2", "depends_on": ["t1"]},
|
|
]
|
|
with self.assertRaises(ValueError) as cm:
|
|
conductor_tech_lead.topological_sort(tickets)
|
|
# Align with DAG Validation Error wrapping
|
|
self.assertIn("DAG Validation Error", str(cm.exception))
|
|
|
|
def test_topological_sort_empty(self) -> None:
|
|
self.assertEqual(conductor_tech_lead.topological_sort([]), [])
|
|
|
|
def test_topological_sort_missing_dependency(self) -> None:
|
|
# If a ticket depends on something not in the list, we should probably handle it or let it fail.
|
|
# Usually in our context, we only care about dependencies within the same track.
|
|
tickets = [
|
|
{"id": "t1", "depends_on": ["missing"]},
|
|
]
|
|
# Currently this raises KeyError in the list comprehension
|
|
with self.assertRaises(KeyError):
|
|
conductor_tech_lead.topological_sort(tickets)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_topological_sort_vlog(vlogger) -> None:
|
|
tickets = [
|
|
{"id": "t2", "depends_on": ["t1"]},
|
|
{"id": "t1", "depends_on": []},
|
|
]
|
|
vlogger.log_state("Input Order", ["t2", "t1"], ["t2", "t1"])
|
|
sorted_tickets = conductor_tech_lead.topological_sort(tickets)
|
|
result_ids = [t['id'] for t in sorted_tickets]
|
|
vlogger.log_state("Sorted Order", "N/A", result_ids)
|
|
assert result_ids == ["t1", "t2"]
|
|
vlogger.finalize("Topological Sort Verification", "PASS", "Linear dependencies correctly ordered.")
|