feat(ai_client): isolation of current_tier using threading.local() for parallel agent safety

This commit is contained in:
2026-03-06 12:59:10 -05:00
parent 1fb6ebc4d0
commit 684a6d1d3b
11 changed files with 75 additions and 29 deletions

View File

@@ -20,7 +20,7 @@ This file tracks all major tracks for the project. Each track has its own detail
4. [x] **Track: Robust JSON Parsing for Tech Lead**
*Link: [./tracks/robust_json_parsing_tech_lead_20260302/](./tracks/robust_json_parsing_tech_lead_20260302/)*
5. [ ] **Track: Concurrent Tier Source Isolation**
5. [~] **Track: Concurrent Tier Source Isolation**
*Link: [./tracks/concurrent_tier_source_tier_20260302/](./tracks/concurrent_tier_source_tier_20260302/)*
6. [ ] **Track: Manual UX Validation & Polish**

View File

@@ -3,8 +3,8 @@
> **TEST DEBT FIX:** Due to ongoing test architecture instability (documented in `test_architecture_integrity_audit_20260304`), do NOT write new `live_gui` integration tests for this track. Rely strictly on in-process `unittest.mock` for `ai_client` concurrency verification.
## Phase 1: Thread-Local Context Refactoring
- [ ] Task: Initialize MMA Environment `activate_skill mma-orchestrator`
- [ ] Task: Refactor `ai_client` to `threading.local()`
- [x] Task: Initialize MMA Environment `activate_skill mma-orchestrator`
- [~] Task: Refactor `ai_client` to `threading.local()`
- [ ] WHERE: `ai_client.py`
- [ ] WHAT: Replace `current_tier = None` with `_local_context = threading.local()`. Implement safe getters/setters for the tier.
- [ ] HOW: Use standard `threading.local` attributes.

View File

@@ -1,5 +1,5 @@
[ai]
provider = "gemini"
provider = "gemini_cli"
model = "gemini-2.5-flash-lite"
temperature = 0.0
max_tokens = 8192
@@ -15,7 +15,7 @@ paths = [
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml",
"C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml",
]
active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml"
active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml"
[gui.show_windows]
"Context Hub" = true

View File

@@ -8,5 +8,5 @@ active = "main"
[discussions.main]
git_commit = ""
last_updated = "2026-03-06T00:19:13"
last_updated = "2026-03-06T12:39:50"
history = []

View File

@@ -86,8 +86,16 @@ comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None
# Injected by gui.py - called whenever a tool call completes.
tool_log_callback: Optional[Callable[[str, str], None]] = None
# Set by caller tiers before ai_client.send(); cleared in finally.
current_tier: Optional[str] = None
_local_storage = threading.local()
def get_current_tier() -> Optional[str]:
"""Returns the current tier from thread-local storage."""
return getattr(_local_storage, "current_tier", None)
def set_current_tier(tier: Optional[str]) -> None:
"""Sets the current tier in thread-local storage."""
_local_storage.current_tier = tier
# Increased to allow thorough code exploration before forcing a summary
MAX_TOOL_ROUNDS: int = 10
@@ -129,7 +137,6 @@ _comms_log: list[dict[str, Any]] = []
COMMS_CLAMP_CHARS: int = 300
def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
global current_tier
entry: dict[str, Any] = {
"ts": datetime.datetime.now().strftime("%H:%M:%S"),
"direction": direction,
@@ -137,7 +144,7 @@ def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None:
"provider": _provider,
"model": _model,
"payload": payload,
"source_tier": current_tier,
"source_tier": get_current_tier(),
}
_comms_log.append(entry)
if comms_log_callback is not None:
@@ -1231,7 +1238,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item
tool_executed = True
if not tool_executed:
if name and name in mcp_client.TOOL_NAMES:
if b_name and b_name in mcp_client.TOOL_NAMES:
_append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input})
if b_name in mcp_client.MUTATING_TOOLS and pre_tool_callback:

View File

@@ -851,7 +851,7 @@ class AppController:
self._api_event_queue.append({"type": "response", "payload": payload})
def _handle_request_event(self, event: events.UserRequestEvent) -> None:
"""Processes a UserRequestEvent by calling the AI client."""
ai_client.current_tier = None # Ensure main discussion is untagged
ai_client.set_current_tier(None) # Ensure main discussion is untagged
if self.ui_auto_add_history:
with self._pending_history_adds_lock:
self._pending_history_adds.append({
@@ -935,7 +935,7 @@ class AppController:
def _on_tool_log(self, script: str, result: str) -> None:
session_logger.log_tool_call(script, result, None)
source_tier = ai_client.current_tier
source_tier = ai_client.get_current_tier()
with self._pending_tool_calls_lock:
self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier})

View File

@@ -20,7 +20,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str,
# Set custom system prompt for this call
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt or "")
ai_client.current_tier = "Tier 2"
ai_client.set_current_tier("Tier 2")
last_error = None
try:
for _ in range(3):
@@ -53,7 +53,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str,
finally:
# Restore old system prompt and clear tier tag
ai_client.set_custom_system_prompt(old_system_prompt or "")
ai_client.current_tier = None
ai_client.set_current_tier(None)
from src.dag_engine import TrackDAG
from src.models import Ticket

View File

@@ -300,7 +300,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
old_comms_cb(entry)
ai_client.comms_log_callback = worker_comms_callback
ai_client.current_tier = "Tier 3"
ai_client.set_current_tier("Tier 3")
try:
comms_baseline = len(ai_client.get_comms_log())
response = ai_client.send(
@@ -313,7 +313,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files:
)
finally:
ai_client.comms_log_callback = old_comms_cb
ai_client.current_tier = None
ai_client.set_current_tier(None)
if event_queue:
# Push via "response" event type — _process_event_queue wraps this
# as {"action": "handle_ai_response", "payload": ...} for the GUI.

View File

@@ -0,0 +1,39 @@
import threading
from src import ai_client
def test_ai_client_tier_isolation():
original_append = ai_client._append_comms
captured_logs = []
def intercepted_append(direction, kind, payload):
captured_logs.append({
'thread_name': threading.current_thread().name,
'source_tier': ai_client.get_current_tier()
})
original_append(direction, kind, payload)
ai_client._append_comms = intercepted_append
ai_client.clear_comms_log()
barrier = threading.Barrier(2)
def run_t1():
ai_client.set_current_tier('Tier 3')
barrier.wait()
ai_client._append_comms('OUT', 'test', {})
def run_t2():
ai_client.set_current_tier('Tier 4')
barrier.wait()
ai_client._append_comms('OUT', 'test', {})
t1 = threading.Thread(target=run_t1, name='T1')
t2 = threading.Thread(target=run_t2, name='T2')
t1.start()
t2.start()
t1.join()
t2.join()
try:
log_t1 = next(log for log in captured_logs if log['thread_name'] == 'T1')
log_t2 = next(log for log in captured_logs if log['thread_name'] == 'T2')
assert log_t1['source_tier'] == 'Tier 3'
assert log_t2['source_tier'] == 'Tier 4'
finally:
ai_client._append_comms = original_append
if __name__ == "__main__":
test_ai_client_tier_isolation()

View File

@@ -7,15 +7,14 @@ state when logging comms and tools.
from src import ai_client
def reset_tier():
ai_client.current_tier = None
ai_client.set_current_tier(None)
yield
ai_client.current_tier = None
ai_client.set_current_tier(None)
def test_current_tier_variable_exists() -> None:
"""ai_client must expose a module-level current_tier variable."""
assert hasattr(ai_client, "current_tier")
# current_tier might be None or a default
pass
def test_get_current_tier_exists() -> None:
"""ai_client must expose a get_current_tier function."""
assert hasattr(ai_client, "get_current_tier")
assert callable(ai_client.get_current_tier)
def test_append_comms_has_source_tier_key() -> None:
"""Dict entries in comms log must have a 'source_tier' key."""
@@ -30,7 +29,7 @@ def test_append_comms_has_source_tier_key() -> None:
def test_append_comms_source_tier_none_when_unset() -> None:
"""When current_tier is None, source_tier in log must be None."""
ai_client.reset_session()
ai_client.current_tier = None
ai_client.set_current_tier(None)
ai_client._append_comms("OUT", "request", {"msg": "hello"})
log = ai_client.get_comms_log()
@@ -39,22 +38,22 @@ def test_append_comms_source_tier_none_when_unset() -> None:
def test_append_comms_source_tier_set_when_current_tier_set() -> None:
"""When current_tier is 'Tier 1', source_tier in log must be 'Tier 1'."""
ai_client.reset_session()
ai_client.current_tier = "Tier 1"
ai_client.set_current_tier("Tier 1")
ai_client._append_comms("OUT", "request", {"msg": "hello"})
log = ai_client.get_comms_log()
assert log[-1]["source_tier"] == "Tier 1"
ai_client.current_tier = None
ai_client.set_current_tier(None)
def test_append_comms_source_tier_tier2() -> None:
"""When current_tier is 'Tier 2', source_tier in log must be 'Tier 2'."""
ai_client.reset_session()
ai_client.current_tier = "Tier 2"
ai_client.set_current_tier("Tier 2")
ai_client._append_comms("OUT", "request", {"msg": "hello"})
log = ai_client.get_comms_log()
assert log[-1]["source_tier"] == "Tier 2"
ai_client.current_tier = None
ai_client.set_current_tier(None)
def test_append_tool_log_stores_dict(app_instance) -> None:
"""App._append_tool_log must store a dict in self._tool_log."""

View File

@@ -9,6 +9,7 @@ from src import ai_client
def test_token_usage_tracking() -> None:
ai_client.reset_session()
ai_client.clear_comms_log()
with patch("src.ai_client._ensure_gemini_client"), \
patch("src.ai_client._gemini_client") as mock_client:
mock_chat = MagicMock()