180 lines
8.2 KiB
Python
180 lines
8.2 KiB
Python
import sys
|
|
import os
|
|
import tomli_w
|
|
import tomllib
|
|
from pathlib import Path
|
|
|
|
# Ensure project root is in path for imports
|
|
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
|
|
|
# Import necessary modules from the project
|
|
import aggregate
|
|
import project_manager
|
|
import mcp_client
|
|
import ai_client
|
|
|
|
# --- Tests for Aggregate Module ---
|
|
|
|
def test_aggregate_includes_segregated_history(tmp_path: Path) -> None:
|
|
"""
|
|
Tests if the aggregate function correctly includes history
|
|
when it's segregated into a separate file.
|
|
"""
|
|
proj_path = tmp_path / "manual_slop.toml"
|
|
tmp_path / "manual_slop_history.toml"
|
|
# Setup segregated project configuration
|
|
proj_data = project_manager.default_project("test-aggregate")
|
|
proj_data["discussion"]["discussions"]["main"]["history"] = ["@2026-02-24T14:00:00\nUser:\nShow me history"]
|
|
# Save the project, which should segregate the history
|
|
project_manager.save_project(proj_data, proj_path)
|
|
# Load the project and aggregate its content
|
|
loaded_proj = project_manager.load_project(proj_path)
|
|
config = project_manager.flat_config(loaded_proj)
|
|
markdown, output_file, file_items = aggregate.run(config)
|
|
# Assert that the history is present in the aggregated markdown
|
|
assert "## Discussion History" in markdown
|
|
assert "Show me history" in markdown
|
|
# --- Tests for MCP Client and Blacklisting ---
|
|
|
|
def test_mcp_blacklist(tmp_path: Path) -> None:
|
|
"""
|
|
Tests that the MCP client correctly blacklists specified files
|
|
and prevents listing them.
|
|
"""
|
|
# Setup a file that should be blacklisted
|
|
hist_file = tmp_path / "my_project_history.toml"
|
|
hist_file.write_text("secret history", encoding="utf-8")
|
|
# Configure MCP client to allow access to the temporary directory
|
|
# but ensure the history file is implicitly or explicitly blacklisted.
|
|
mcp_client.configure([{"path": str(hist_file)}], extra_base_dirs=[str(tmp_path)])
|
|
# Attempt to read the blacklisted file - should result in an access denied message
|
|
result = mcp_client.read_file(str(hist_file))
|
|
assert "ACCESS DENIED" in result or "BLACKLISTED" in result
|
|
# Attempt to list the directory containing the blacklisted file
|
|
result = mcp_client.list_directory(str(tmp_path))
|
|
# The blacklisted file should not appear in the directory listing
|
|
assert "my_project_history.toml" not in result
|
|
|
|
def test_aggregate_blacklist(tmp_path: Path) -> None:
|
|
"""
|
|
Tests that aggregate's path resolution respects blacklisting,
|
|
ensuring history files are not included by default.
|
|
"""
|
|
# Setup a history file in the temporary directory
|
|
hist_file = tmp_path / "my_project_history.toml"
|
|
hist_file.write_text("secret history", encoding="utf-8")
|
|
# Attempt to resolve paths including the history file using a wildcard
|
|
paths = aggregate.resolve_paths(tmp_path, "*_history.toml")
|
|
assert hist_file not in paths, "History file should be blacklisted and not resolved"
|
|
# Resolve all paths and ensure the history file is still excluded
|
|
paths = aggregate.resolve_paths(tmp_path, "*")
|
|
assert hist_file not in paths, "History file should be excluded even with a general glob"
|
|
# --- Tests for History Migration and Separation ---
|
|
|
|
def test_migration_on_load(tmp_path: Path) -> None:
|
|
"""
|
|
Tests that project loading migrates discussion history from manual_slop.toml
|
|
to manual_slop_history.toml if it exists in the main config.
|
|
"""
|
|
# Define paths for the main project config and the history file
|
|
proj_path = tmp_path / "manual_slop.toml"
|
|
hist_path = tmp_path / "manual_slop_history.toml"
|
|
# Create a legacy project data structure with discussion history
|
|
legacy_data = project_manager.default_project("test-project")
|
|
legacy_data["discussion"]["discussions"]["main"]["history"] = ["Hello", "World"]
|
|
# Save this legacy data into manual_slop.toml
|
|
with open(proj_path, "wb") as f:
|
|
tomli_w.dump(legacy_data, f)
|
|
# Load the project - this action should trigger the migration
|
|
loaded_data = project_manager.load_project(proj_path)
|
|
# Assertions:
|
|
assert "discussion" in loaded_data
|
|
assert loaded_data["discussion"]["discussions"]["main"]["history"] == ["Hello", "World"]
|
|
# 2. The history should no longer be present in the main manual_slop.toml on disk.
|
|
with open(proj_path, "rb") as f:
|
|
on_disk_main = tomllib.load(f)
|
|
assert "discussion" not in on_disk_main, "Discussion history should be removed from main config after migration"
|
|
# 3. The history file (manual_slop_history.toml) should now exist and contain the data.
|
|
assert hist_path.exists()
|
|
with open(hist_path, "rb") as f:
|
|
on_disk_hist = tomllib.load(f)
|
|
assert on_disk_hist["discussions"]["main"]["history"] == ["Hello", "World"]
|
|
|
|
def test_save_separation(tmp_path: Path) -> None:
|
|
"""
|
|
Tests that saving project data correctly separates discussion history
|
|
into manual_slop_history.toml.
|
|
"""
|
|
# Define paths for the main project config and the history file
|
|
proj_path = tmp_path / "manual_slop.toml"
|
|
hist_path = tmp_path / "manual_slop_history.toml"
|
|
# Create fresh project data, including discussion history
|
|
proj_data = project_manager.default_project("test-project")
|
|
proj_data["discussion"]["discussions"]["main"]["history"] = ["Saved", "Separately"]
|
|
# Save the project data
|
|
project_manager.save_project(proj_data, proj_path)
|
|
# Assertions:
|
|
assert proj_path.exists()
|
|
assert hist_path.exists()
|
|
# 2. The main project file should NOT contain the discussion history.
|
|
with open(proj_path, "rb") as f:
|
|
p_disk = tomllib.load(f)
|
|
assert "discussion" not in p_disk, "Discussion history should not be in main config file after save"
|
|
# 3. The history file should contain the discussion history.
|
|
with open(hist_path, "rb") as f:
|
|
h_disk = tomllib.load(f)
|
|
assert h_disk["discussions"]["main"]["history"] == ["Saved", "Separately"]
|
|
# --- Tests for History Persistence Across Turns ---
|
|
|
|
def test_history_persistence_across_turns(tmp_path: Path) -> None:
|
|
"""
|
|
Tests that discussion history is correctly persisted across multiple save/load cycles.
|
|
"""
|
|
proj_path = tmp_path / "manual_slop.toml"
|
|
hist_path = tmp_path / "manual_slop_history.toml"
|
|
# Step 1: Initialize a new project and save it.
|
|
proj = project_manager.default_project("test-persistence")
|
|
project_manager.save_project(proj, proj_path)
|
|
# Step 2: Add a first turn of discussion history.
|
|
proj = project_manager.load_project(proj_path)
|
|
entry1 = {"role": "User", "content": "Hello", "ts": "2026-02-24T13:00:00"}
|
|
proj["discussion"]["discussions"]["main"]["history"].append(project_manager.entry_to_str(entry1))
|
|
project_manager.save_project(proj, proj_path)
|
|
# Verify separation after the first save
|
|
with open(proj_path, "rb") as f:
|
|
p_disk = tomllib.load(f)
|
|
assert "discussion" not in p_disk
|
|
with open(hist_path, "rb") as f:
|
|
h_disk = tomllib.load(f)
|
|
assert h_disk["discussions"]["main"]["history"] == ["@2026-02-24T13:00:00\nUser:\nHello"]
|
|
# Step 3: Add a second turn of discussion history.
|
|
proj = project_manager.load_project(proj_path)
|
|
entry2 = {"role": "AI", "content": "Hi there!", "ts": "2026-02-24T13:01:00"}
|
|
proj["discussion"]["discussions"]["main"]["history"].append(project_manager.entry_to_str(entry2))
|
|
project_manager.save_project(proj, proj_path)
|
|
# Verify persistence
|
|
with open(hist_path, "rb") as f:
|
|
h_disk = tomllib.load(f)
|
|
assert len(h_disk["discussions"]["main"]["history"]) == 2
|
|
assert h_disk["discussions"]["main"]["history"][1] == "@2026-02-24T13:01:00\nAI:\nHi there!"
|
|
# Step 4: Reload the project from disk and check history
|
|
proj_final = project_manager.load_project(proj_path)
|
|
assert len(proj_final["discussion"]["discussions"]["main"]["history"]) == 2
|
|
# --- Tests for AI Client History Management ---
|
|
|
|
def test_get_history_bleed_stats_basic() -> None:
|
|
"""
|
|
Tests basic retrieval of history bleed statistics from the AI client.
|
|
"""
|
|
# Reset the AI client's session state
|
|
ai_client.reset_session()
|
|
# Set a custom history truncation limit for testing purposes.
|
|
ai_client.set_history_trunc_limit(500)
|
|
# For this test, we're primarily checking the structure of the returned stats
|
|
# and the configured limit.
|
|
stats = ai_client.get_history_bleed_stats()
|
|
assert 'current' in stats, "Stats dictionary should contain 'current' token usage"
|
|
assert 'limit' in stats, "Stats dictionary should contain 'limit'"
|
|
assert stats['limit'] == 500, f"Expected limit of 500, but got {stats['limit']}"
|
|
assert isinstance(stats['current'], int) and stats['current'] >= 0
|