diff --git a/conductor/tracks.md b/conductor/tracks.md index f77dedd..249b5f2 100644 --- a/conductor/tracks.md +++ b/conductor/tracks.md @@ -20,7 +20,7 @@ This file tracks all major tracks for the project. Each track has its own detail 4. [x] **Track: Robust JSON Parsing for Tech Lead** *Link: [./tracks/robust_json_parsing_tech_lead_20260302/](./tracks/robust_json_parsing_tech_lead_20260302/)* -5. [ ] **Track: Concurrent Tier Source Isolation** +5. [~] **Track: Concurrent Tier Source Isolation** *Link: [./tracks/concurrent_tier_source_tier_20260302/](./tracks/concurrent_tier_source_tier_20260302/)* 6. [ ] **Track: Manual UX Validation & Polish** diff --git a/conductor/tracks/concurrent_tier_source_tier_20260302/plan.md b/conductor/tracks/concurrent_tier_source_tier_20260302/plan.md index 5842fe8..b4260ae 100644 --- a/conductor/tracks/concurrent_tier_source_tier_20260302/plan.md +++ b/conductor/tracks/concurrent_tier_source_tier_20260302/plan.md @@ -3,8 +3,8 @@ > **TEST DEBT FIX:** Due to ongoing test architecture instability (documented in `test_architecture_integrity_audit_20260304`), do NOT write new `live_gui` integration tests for this track. Rely strictly on in-process `unittest.mock` for `ai_client` concurrency verification. ## Phase 1: Thread-Local Context Refactoring -- [ ] Task: Initialize MMA Environment `activate_skill mma-orchestrator` -- [ ] Task: Refactor `ai_client` to `threading.local()` +- [x] Task: Initialize MMA Environment `activate_skill mma-orchestrator` +- [~] Task: Refactor `ai_client` to `threading.local()` - [ ] WHERE: `ai_client.py` - [ ] WHAT: Replace `current_tier = None` with `_local_context = threading.local()`. Implement safe getters/setters for the tier. - [ ] HOW: Use standard `threading.local` attributes. diff --git a/config.toml b/config.toml index 22677ac..7a07341 100644 --- a/config.toml +++ b/config.toml @@ -1,5 +1,5 @@ [ai] -provider = "gemini" +provider = "gemini_cli" model = "gemini-2.5-flash-lite" temperature = 0.0 max_tokens = 8192 @@ -15,7 +15,7 @@ paths = [ "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml", "C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml", ] -active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_livetoolssim.toml" +active = "C:\\projects\\manual_slop\\tests\\artifacts\\temp_liveexecutionsim.toml" [gui.show_windows] "Context Hub" = true diff --git a/project_history.toml b/project_history.toml index 5926df4..737259d 100644 --- a/project_history.toml +++ b/project_history.toml @@ -8,5 +8,5 @@ active = "main" [discussions.main] git_commit = "" -last_updated = "2026-03-06T00:19:13" +last_updated = "2026-03-06T12:39:50" history = [] diff --git a/src/ai_client.py b/src/ai_client.py index 36a917a..c8ecaaa 100644 --- a/src/ai_client.py +++ b/src/ai_client.py @@ -86,8 +86,16 @@ comms_log_callback: Optional[Callable[[dict[str, Any]], None]] = None # Injected by gui.py - called whenever a tool call completes. tool_log_callback: Optional[Callable[[str, str], None]] = None -# Set by caller tiers before ai_client.send(); cleared in finally. -current_tier: Optional[str] = None +_local_storage = threading.local() + +def get_current_tier() -> Optional[str]: + """Returns the current tier from thread-local storage.""" + return getattr(_local_storage, "current_tier", None) + +def set_current_tier(tier: Optional[str]) -> None: + """Sets the current tier in thread-local storage.""" + _local_storage.current_tier = tier + # Increased to allow thorough code exploration before forcing a summary MAX_TOOL_ROUNDS: int = 10 @@ -129,7 +137,6 @@ _comms_log: list[dict[str, Any]] = [] COMMS_CLAMP_CHARS: int = 300 def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None: - global current_tier entry: dict[str, Any] = { "ts": datetime.datetime.now().strftime("%H:%M:%S"), "direction": direction, @@ -137,7 +144,7 @@ def _append_comms(direction: str, kind: str, payload: dict[str, Any]) -> None: "provider": _provider, "model": _model, "payload": payload, - "source_tier": current_tier, + "source_tier": get_current_tier(), } _comms_log.append(entry) if comms_log_callback is not None: @@ -1231,7 +1238,7 @@ def _send_anthropic(md_content: str, user_message: str, base_dir: str, file_item tool_executed = True if not tool_executed: - if name and name in mcp_client.TOOL_NAMES: + if b_name and b_name in mcp_client.TOOL_NAMES: _append_comms("OUT", "tool_call", {"name": b_name, "id": b_id, "args": b_input}) if b_name in mcp_client.MUTATING_TOOLS and pre_tool_callback: diff --git a/src/app_controller.py b/src/app_controller.py index 6f11b6b..63f010b 100644 --- a/src/app_controller.py +++ b/src/app_controller.py @@ -851,7 +851,7 @@ class AppController: self._api_event_queue.append({"type": "response", "payload": payload}) def _handle_request_event(self, event: events.UserRequestEvent) -> None: """Processes a UserRequestEvent by calling the AI client.""" - ai_client.current_tier = None # Ensure main discussion is untagged + ai_client.set_current_tier(None) # Ensure main discussion is untagged if self.ui_auto_add_history: with self._pending_history_adds_lock: self._pending_history_adds.append({ @@ -935,7 +935,7 @@ class AppController: def _on_tool_log(self, script: str, result: str) -> None: session_logger.log_tool_call(script, result, None) - source_tier = ai_client.current_tier + source_tier = ai_client.get_current_tier() with self._pending_tool_calls_lock: self._pending_tool_calls.append({"script": script, "result": result, "ts": time.time(), "source_tier": source_tier}) diff --git a/src/conductor_tech_lead.py b/src/conductor_tech_lead.py index 76804fd..680e3cd 100644 --- a/src/conductor_tech_lead.py +++ b/src/conductor_tech_lead.py @@ -20,7 +20,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, # Set custom system prompt for this call old_system_prompt = ai_client._custom_system_prompt ai_client.set_custom_system_prompt(system_prompt or "") - ai_client.current_tier = "Tier 2" + ai_client.set_current_tier("Tier 2") last_error = None try: for _ in range(3): @@ -53,7 +53,7 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict[str, finally: # Restore old system prompt and clear tier tag ai_client.set_custom_system_prompt(old_system_prompt or "") - ai_client.current_tier = None + ai_client.set_current_tier(None) from src.dag_engine import TrackDAG from src.models import Ticket diff --git a/src/multi_agent_conductor.py b/src/multi_agent_conductor.py index 7be037c..49f309d 100644 --- a/src/multi_agent_conductor.py +++ b/src/multi_agent_conductor.py @@ -300,7 +300,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: old_comms_cb(entry) ai_client.comms_log_callback = worker_comms_callback - ai_client.current_tier = "Tier 3" + ai_client.set_current_tier("Tier 3") try: comms_baseline = len(ai_client.get_comms_log()) response = ai_client.send( @@ -313,7 +313,7 @@ def run_worker_lifecycle(ticket: Ticket, context: WorkerContext, context_files: ) finally: ai_client.comms_log_callback = old_comms_cb - ai_client.current_tier = None + ai_client.set_current_tier(None) if event_queue: # Push via "response" event type — _process_event_queue wraps this # as {"action": "handle_ai_response", "payload": ...} for the GUI. diff --git a/tests/test_ai_client_concurrency.py b/tests/test_ai_client_concurrency.py new file mode 100644 index 0000000..ec83e33 --- /dev/null +++ b/tests/test_ai_client_concurrency.py @@ -0,0 +1,39 @@ +import threading +from src import ai_client + +def test_ai_client_tier_isolation(): + original_append = ai_client._append_comms + captured_logs = [] + def intercepted_append(direction, kind, payload): + captured_logs.append({ + 'thread_name': threading.current_thread().name, + 'source_tier': ai_client.get_current_tier() + }) + original_append(direction, kind, payload) + ai_client._append_comms = intercepted_append + ai_client.clear_comms_log() + barrier = threading.Barrier(2) + def run_t1(): + ai_client.set_current_tier('Tier 3') + barrier.wait() + ai_client._append_comms('OUT', 'test', {}) + def run_t2(): + ai_client.set_current_tier('Tier 4') + barrier.wait() + ai_client._append_comms('OUT', 'test', {}) + t1 = threading.Thread(target=run_t1, name='T1') + t2 = threading.Thread(target=run_t2, name='T2') + t1.start() + t2.start() + t1.join() + t2.join() + try: + log_t1 = next(log for log in captured_logs if log['thread_name'] == 'T1') + log_t2 = next(log for log in captured_logs if log['thread_name'] == 'T2') + assert log_t1['source_tier'] == 'Tier 3' + assert log_t2['source_tier'] == 'Tier 4' + finally: + ai_client._append_comms = original_append + +if __name__ == "__main__": + test_ai_client_tier_isolation() diff --git a/tests/test_mma_agent_focus_phase1.py b/tests/test_mma_agent_focus_phase1.py index cb2f2eb..7c608ed 100644 --- a/tests/test_mma_agent_focus_phase1.py +++ b/tests/test_mma_agent_focus_phase1.py @@ -7,15 +7,14 @@ state when logging comms and tools. from src import ai_client def reset_tier(): - ai_client.current_tier = None + ai_client.set_current_tier(None) yield - ai_client.current_tier = None + ai_client.set_current_tier(None) -def test_current_tier_variable_exists() -> None: - """ai_client must expose a module-level current_tier variable.""" - assert hasattr(ai_client, "current_tier") - # current_tier might be None or a default - pass +def test_get_current_tier_exists() -> None: + """ai_client must expose a get_current_tier function.""" + assert hasattr(ai_client, "get_current_tier") + assert callable(ai_client.get_current_tier) def test_append_comms_has_source_tier_key() -> None: """Dict entries in comms log must have a 'source_tier' key.""" @@ -30,7 +29,7 @@ def test_append_comms_has_source_tier_key() -> None: def test_append_comms_source_tier_none_when_unset() -> None: """When current_tier is None, source_tier in log must be None.""" ai_client.reset_session() - ai_client.current_tier = None + ai_client.set_current_tier(None) ai_client._append_comms("OUT", "request", {"msg": "hello"}) log = ai_client.get_comms_log() @@ -39,22 +38,22 @@ def test_append_comms_source_tier_none_when_unset() -> None: def test_append_comms_source_tier_set_when_current_tier_set() -> None: """When current_tier is 'Tier 1', source_tier in log must be 'Tier 1'.""" ai_client.reset_session() - ai_client.current_tier = "Tier 1" + ai_client.set_current_tier("Tier 1") ai_client._append_comms("OUT", "request", {"msg": "hello"}) log = ai_client.get_comms_log() assert log[-1]["source_tier"] == "Tier 1" - ai_client.current_tier = None + ai_client.set_current_tier(None) def test_append_comms_source_tier_tier2() -> None: """When current_tier is 'Tier 2', source_tier in log must be 'Tier 2'.""" ai_client.reset_session() - ai_client.current_tier = "Tier 2" + ai_client.set_current_tier("Tier 2") ai_client._append_comms("OUT", "request", {"msg": "hello"}) log = ai_client.get_comms_log() assert log[-1]["source_tier"] == "Tier 2" - ai_client.current_tier = None + ai_client.set_current_tier(None) def test_append_tool_log_stores_dict(app_instance) -> None: """App._append_tool_log must store a dict in self._tool_log.""" diff --git a/tests/test_token_usage.py b/tests/test_token_usage.py index 24c3371..f627fea 100644 --- a/tests/test_token_usage.py +++ b/tests/test_token_usage.py @@ -9,6 +9,7 @@ from src import ai_client def test_token_usage_tracking() -> None: ai_client.reset_session() + ai_client.clear_comms_log() with patch("src.ai_client._ensure_gemini_client"), \ patch("src.ai_client._gemini_client") as mock_client: mock_chat = MagicMock()