diff --git a/tests/test_provider_state_migration.py b/tests/test_provider_state_migration.py new file mode 100644 index 00000000..f2dc2626 --- /dev/null +++ b/tests/test_provider_state_migration.py @@ -0,0 +1,170 @@ +"""Regression-guard tests for src/provider_state.py +Phase 3 of any_type_componentization_20260621. Verifies the 4-method +ProviderHistory API is reachable and behaves correctly for all 6 +providers (anthropic/deepseek/minimax/qwen/grok/llama) following the +migration of _X_history aliases in src/ai_client.py. +CONVENTION: 1-space indentation. NO COMMENTS. +""" +from __future__ import annotations + +import threading + +import pytest +from src import provider_state + + +EXPECTED_PROVIDERS: tuple[str, ...] = ("anthropic", "deepseek", "minimax", "qwen", "grok", "llama") + + +def _clear_all() -> None: + provider_state.clear_all() + + +def test_each_provider_reachable() -> None: + histories = [provider_state.get_history(p) for p in EXPECTED_PROVIDERS] + assert all(isinstance(h, provider_state.ProviderHistory) for h in histories) + assert len({id(h) for h in histories}) == 6 + for p in EXPECTED_PROVIDERS: + assert provider_state.get_history(p) is provider_state.get_history(p) + + +def test_append_preserves_ordering() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": f"{p}-1"}) + h.append({"role": "assistant", "content": f"{p}-2"}) + h.append({"role": "user", "content": f"{p}-3"}) + assert h.get_all() == [ + {"role": "user", "content": f"{p}-1"}, + {"role": "assistant", "content": f"{p}-2"}, + {"role": "user", "content": f"{p}-3"}, + ] + + +def test_lock_acquisition_no_deadlock() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + def inner() -> None: + with h.lock: + h.append({"role": "user", "content": f"{p}-inner"}) + with h.lock: + assert len(h) == 0 + inner() + assert len(h) == 1 + assert h.get_all() == [{"role": "user", "content": f"{p}-inner"}] + + +def test_concurrent_append_thread_safety() -> None: + h = provider_state.get_history("anthropic") + h.clear() + def worker(start: int) -> None: + for i in range(100): + role = "user" if (i % 2 == 0) else "assistant" + h.append({"role": role, "content": f"t{start}-{i}"}) + threads = [threading.Thread(target=worker, args=(t,)) for t in range(2)] + for t in threads: + t.start() + for t in threads: + t.join() + all_msgs = h.get_all() + assert len(all_msgs) == 200 + contents = {m["content"] for m in all_msgs} + assert len(contents) == 200 + + +def test_get_all_returns_copy() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": f"{p}-original"}) + snapshot = h.get_all() + snapshot.append({"role": "user", "content": f"{p}-leaked"}) + assert h.get_all() == [{"role": "user", "content": f"{p}-original"}] + + +def test_replace_all_replaces_state() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": f"{p}-a"}) + h.append({"role": "assistant", "content": f"{p}-b"}) + h.append({"role": "user", "content": f"{p}-c"}) + h.replace_all([{"role": "user", "content": "fresh"}]) + assert len(h.get_all()) == 1 + assert h.get_all() == [{"role": "user", "content": "fresh"}] + + +def test_clear_resets_history() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": "x"}) + h.append({"role": "assistant", "content": "y"}) + h.clear() + assert len(h.get_all()) == 0 + assert bool(h) is False + + +def test_getitem_returns_specific_message() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": f"{p}-first"}) + h.append({"role": "assistant", "content": f"{p}-mid"}) + h.append({"role": "user", "content": f"{p}-last"}) + assert h[0] == {"role": "user", "content": f"{p}-first"} + assert h[1] == {"role": "assistant", "content": f"{p}-mid"} + assert h[-1] == {"role": "user", "content": f"{p}-last"} + + +def test_iter_returns_messages() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.append({"role": "user", "content": f"{p}-1"}) + h.append({"role": "assistant", "content": f"{p}-2"}) + h.append({"role": "user", "content": f"{p}-3"}) + collected = [m for m in h] + assert collected == h.get_all() + + +def test_len_returns_count() -> None: + _clear_all() + for n in (0, 1, 5, 10): + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + h.clear() + for i in range(n): + h.append({"role": "user", "content": f"{p}-{i}"}) + assert len(h) == n + + +def test_bool_empty_vs_populated() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + h = provider_state.get_history(p) + assert bool(h) is False + h.append({"role": "user", "content": "x"}) + assert bool(h) is True + h.clear() + assert bool(h) is False + + +def test_clear_all_resets_all_6() -> None: + _clear_all() + for p in EXPECTED_PROVIDERS: + provider_state.get_history(p).append({"role": "user", "content": f"{p}-msg"}) + provider_state.clear_all() + for p in EXPECTED_PROVIDERS: + assert len(provider_state.get_history(p).get_all()) == 0 + + +def test_providers_returns_6_tuple() -> None: + assert provider_state.providers() == EXPECTED_PROVIDERS + + +def test_unknown_provider_raises() -> None: + with pytest.raises(KeyError): + provider_state.get_history("nonexistent")