114 lines
3.3 KiB
Python
114 lines
3.3 KiB
Python
import pytest
|
|
import time
|
|
from pathlib import Path
|
|
from src.file_cache import ASTParser
|
|
from src.models import Ticket, Track, WorkerContext
|
|
from src.multi_agent_conductor import run_worker_lifecycle
|
|
|
|
def test_targeted_extraction():
|
|
parser = ASTParser("python")
|
|
code = """
|
|
def func_a():
|
|
print("A")
|
|
func_b()
|
|
|
|
def func_b():
|
|
print("B")
|
|
func_c()
|
|
|
|
def func_c():
|
|
print("C")
|
|
|
|
def func_unrelated():
|
|
print("Unrelated")
|
|
"""
|
|
# Target func_a, should include func_b and func_c (depth 2)
|
|
result = parser.get_targeted_view(code, ["func_a"])
|
|
assert "def func_a():" in result
|
|
assert "def func_b():" in result
|
|
assert "def func_c():" in result
|
|
assert "def func_unrelated():" not in result
|
|
assert "print(" not in result # Bodies should be stripped
|
|
|
|
def test_class_targeted_extraction():
|
|
parser = ASTParser("python")
|
|
code = """
|
|
class MyClass:
|
|
def method_a(self):
|
|
self.method_b()
|
|
|
|
def method_b(self):
|
|
pass
|
|
|
|
def method_unrelated(self):
|
|
pass
|
|
"""
|
|
result = parser.get_targeted_view(code, ["MyClass.method_a"])
|
|
assert "class MyClass:" in result
|
|
assert "def method_a(self):" in result
|
|
assert "def method_b(self):" in result
|
|
assert "def method_unrelated(self):" not in result
|
|
|
|
def test_ast_caching(tmp_path):
|
|
parser = ASTParser("python")
|
|
file_path = tmp_path / "test_cache.py"
|
|
code = "def test(): pass"
|
|
file_path.write_text(code)
|
|
|
|
# First call: parses and caches
|
|
start = time.time()
|
|
view1 = parser.get_skeleton(code, path=str(file_path))
|
|
duration1 = time.time() - start
|
|
|
|
# Second call: should use cache
|
|
start = time.time()
|
|
view2 = parser.get_skeleton(code, path=str(file_path))
|
|
duration2 = time.time() - start
|
|
|
|
assert view1 == view2
|
|
# duration2 should be significantly faster, but let's just check it works
|
|
|
|
# Update file: should invalidate cache
|
|
time.sleep(0.1)
|
|
new_code = "def test_new(): pass"
|
|
file_path.write_text(new_code)
|
|
view3 = parser.get_skeleton(new_code, path=str(file_path))
|
|
assert "def test_new():" in view3
|
|
assert "def test():" not in view3
|
|
|
|
def test_performance_large_file():
|
|
parser = ASTParser("python")
|
|
# Generate a large file (approx 1000 lines)
|
|
code = "\n".join([f"def func_{i}():\n pass" for i in range(500)])
|
|
|
|
start = time.time()
|
|
parser.get_skeleton(code)
|
|
duration = time.time() - start
|
|
|
|
print(f"Large file parse duration: {duration*1000:.2f}ms")
|
|
assert duration < 0.5 # Should be well under 500ms even for first parse
|
|
|
|
def test_token_reduction_logging(capsys):
|
|
ticket = Ticket(
|
|
id="T1",
|
|
description="Test ticket",
|
|
target_file="test.py",
|
|
target_symbols=["func_a"],
|
|
context_requirements=["test.py"]
|
|
)
|
|
context = WorkerContext(ticket_id="T1", model_name="gemini-2.5-flash", messages=[])
|
|
|
|
code = "def func_a(): pass\n" + "\n".join([f"def func_{i}(): pass" for i in range(100)])
|
|
|
|
# Mock open to return our large code
|
|
with pytest.MonkeyPatch().context() as m:
|
|
m.setattr("builtins.open", lambda f, *args, **kwargs: type('obj', (object,), {'read': lambda s: code, '__enter__': lambda s: s, '__exit__': lambda s, *a: None})())
|
|
m.setattr("pathlib.Path.exists", lambda s: True)
|
|
m.setattr("src.ai_client.send", lambda **kwargs: "DONE")
|
|
|
|
run_worker_lifecycle(ticket, context, context_files=["test.py"])
|
|
|
|
captured = capsys.readouterr()
|
|
assert "[MMA] Context pruning for T1" in captured.out
|
|
assert "reduction" in captured.out
|