import pytest import time from pathlib import Path from src.file_cache import ASTParser from src.models import Ticket, Track, WorkerContext from src.multi_agent_conductor import run_worker_lifecycle def test_targeted_extraction(): parser = ASTParser("python") code = """ def func_a(): print("A") func_b() def func_b(): print("B") func_c() def func_c(): print("C") def func_unrelated(): print("Unrelated") """ # Target func_a, should include func_b and func_c (depth 2) result = parser.get_targeted_view(code, ["func_a"]) assert "def func_a():" in result assert "def func_b():" in result assert "def func_c():" in result assert "def func_unrelated():" not in result assert "print(" not in result # Bodies should be stripped def test_class_targeted_extraction(): parser = ASTParser("python") code = """ class MyClass: def method_a(self): self.method_b() def method_b(self): pass def method_unrelated(self): pass """ result = parser.get_targeted_view(code, ["MyClass.method_a"]) assert "class MyClass:" in result assert "def method_a(self):" in result assert "def method_b(self):" in result assert "def method_unrelated(self):" not in result def test_ast_caching(tmp_path): parser = ASTParser("python") file_path = tmp_path / "test_cache.py" code = "def test(): pass" file_path.write_text(code) # First call: parses and caches start = time.time() view1 = parser.get_skeleton(code, path=str(file_path)) duration1 = time.time() - start # Second call: should use cache start = time.time() view2 = parser.get_skeleton(code, path=str(file_path)) duration2 = time.time() - start assert view1 == view2 # duration2 should be significantly faster, but let's just check it works # Update file: should invalidate cache time.sleep(0.1) new_code = "def test_new(): pass" file_path.write_text(new_code) view3 = parser.get_skeleton(new_code, path=str(file_path)) assert "def test_new():" in view3 assert "def test():" not in view3 def test_performance_large_file(): parser = ASTParser("python") # Generate a large file (approx 1000 lines) code = "\n".join([f"def func_{i}():\n pass" for i in range(500)]) start = time.time() parser.get_skeleton(code) duration = time.time() - start print(f"Large file parse duration: {duration*1000:.2f}ms") assert duration < 0.5 # Should be well under 500ms even for first parse def test_token_reduction_logging(capsys): ticket = Ticket( id="T1", description="Test ticket", target_file="test.py", target_symbols=["func_a"], context_requirements=["test.py"] ) context = WorkerContext(ticket_id="T1", model_name="gemini-2.5-flash", messages=[]) code = "def func_a(): pass\n" + "\n".join([f"def func_{i}(): pass" for i in range(100)]) # Mock open to return our large code with pytest.MonkeyPatch().context() as m: m.setattr("builtins.open", lambda f, *args, **kwargs: type('obj', (object,), {'read': lambda s: code, '__enter__': lambda s: s, '__exit__': lambda s, *a: None})()) m.setattr("pathlib.Path.exists", lambda s: True) m.setattr("src.ai_client.send", lambda **kwargs: "DONE") run_worker_lifecycle(ticket, context, context_files=["test.py"]) captured = capsys.readouterr() assert "[MMA] Context pruning for T1" in captured.out assert "reduction" in captured.out