Per user 'a bunch of docs just committed had redundant content across files. Can we do a reduction of that and instead map references to other files?' This commit reduces content duplication across 9 files. The canonical sources are kept as detailed references; the other files now point to them. Reductions (table replaced with 'see canonical' reference): 1. data_oriented_design.md §9: the 4-dim memory table (canonical: conductor/code_styleguides/agent_memory_dimensions.md §0) 2. guide_agent_memory_dimensions.md §0: the 4-dim memory table (canonical: conductor/code_styleguides/agent_memory_dimensions.md §0) 3. guide_caching_strategy.md §1: the 12-layer model (canonical: conductor/code_styleguides/cache_friendly_context.md §1) 4. guide_ai_client.md 'Cache strategy' section: the 12-layer model recap (canonical: conductor/code_styleguides/cache_friendly_context.md §1) 5. guide_knowledge_curation.md §1: the 5 category file details (canonical: conductor/code_styleguides/knowledge_artifacts.md §1) 6. product-guidelines.md 'Memory Dimensions' section: the 4-dim table (canonical: conductor/code_styleguides/agent_memory_dimensions.md §0) 7. guide_mma.md '4 memory dimensions' section: the MMA scope table (canonical: conductor/code_styleguides/agent_memory_dimensions.md §0) 8. docs/AGENTS.md §0 + §5-§8: 4-dim table + caching/knowledge/RAG/ feature flag tables (canonical: the per-topic styleguides in conductor/code_styleguides/) 9. AGENTS.md 'Code Styleguides' section: the 6-styleguide list (canonical: docs/AGENTS.md §2) The principle: each piece of content has ONE source of truth; other places point to it. The data-oriented way. Files retain their narrative flow and the 'what this is' intros, but the detailed tables are now in their canonical home. Net effect: -2100 bytes across 9 files (without losing any information - the canonical sources are unchanged). The 'cross-references' sections are kept; the duplicated content is removed.
30 KiB
MMA: 4-Tier Multi-Model Agent Orchestration
Top | Architecture | Tools & IPC | Simulations
Overview
The MMA (Multi-Model Agent) system is a hierarchical task decomposition and execution engine. A high-level "epic" is broken into tracks, tracks are decomposed into tickets with dependency relationships, and tickets are executed by stateless workers with human-in-the-loop approval at every destructive boundary.
Tier 1: Orchestrator — product alignment, epic → tracks
Tier 2: Tech Lead — track → tickets (DAG), architectural oversight
Tier 3: Worker — stateless TDD implementation per ticket
Tier 4: QA — stateless error analysis, no fixes
Data Structures (models.py)
Ticket
The atomic unit of work. All MMA execution revolves around transitioning tickets through their state machine.
@dataclass
class Ticket:
id: str # e.g., "T-001"
description: str # Human-readable task description
status: str # "todo" | "in_progress" | "completed" | "blocked"
assigned_to: str # Tier assignment: "tier3-worker", "tier4-qa"
target_file: Optional[str] = None # File this ticket modifies
context_requirements: List[str] = field() # Files needed for context injection
depends_on: List[str] = field() # Ticket IDs that must complete first
blocked_reason: Optional[str] = None # Why this ticket is blocked
step_mode: bool = False # If True, requires manual approval before execution
persona_id: Optional[str] = None # Per-ticket persona override; see Persona Application
retry_count: int = 0 # Increments on failure; drives model escalation
model_override: Optional[str] = None # If set, bypasses persona/model_list selection
def mark_blocked(self, reason: str) -> None # Sets status="blocked", stores reason
def mark_complete(self) -> None # Sets status="completed"
def to_dict(self) -> Dict[str, Any]
@classmethod
def from_dict(cls, data) -> "Ticket"
Status state machine:
todo ──> in_progress ──> completed
| |
v v
blocked blocked
Track
A collection of tickets with a shared goal.
@dataclass
class Track:
id: str # Track identifier
description: str # Track-level brief
tickets: List[Ticket] = field() # Ordered list of tickets
def get_executable_tickets(self) -> List[Ticket]
# Returns all 'todo' tickets whose depends_on are all 'completed'
WorkerContext
@dataclass
class WorkerContext:
ticket_id: str # Which ticket this worker is processing
model_name: str # LLM model to use (e.g., "gemini-2.5-flash-lite")
messages: List[dict] # Conversation history for this worker
persona_id: Optional[str] = None # Per-worker persona (set in run_worker_lifecycle)
tool_preset: Optional[str] = None # Fallback tool preset if persona has none
DAG Engine (dag_engine.py)
Two classes: TrackDAG (graph) and ExecutionEngine (state machine).
TrackDAG
class TrackDAG:
def __init__(self, tickets: List[Ticket]):
self.tickets = tickets
self.ticket_map = {t.id: t for t in tickets} # O(1) lookup by ID
get_ready_tasks(): Returns tickets where status == 'todo' AND all depends_on have status == 'completed'. Missing dependencies are treated as NOT completed (fail-safe).
has_cycle(): Iterative DFS cycle detection using an explicit stack of (node_id, is_backtracking) tuples plus a path set (no recursion):
def has_cycle(self) -> bool:
with get_monitor().scope("dag_has_cycle"):
visited = set()
for start_ticket in self.tickets:
if start_ticket.id in visited:
continue
stack = [(start_ticket.id, False)] # (id, is_backtracking)
path = set()
while stack:
node_id, is_backtracking = stack.pop()
if is_backtracking:
path.remove(node_id)
continue
if node_id in path: return True # back-edge -> cycle
if node_id in visited: continue
visited.add(node_id)
path.add(node_id)
stack.append((node_id, True)) # post-visit marker
ticket = self.ticket_map.get(node_id)
if ticket:
for neighbor_id in ticket.depends_on:
stack.append((neighbor_id, False))
return False
topological_sort(): Kahn's algorithm (BFS-based, in-degree counter), not DFS post-order. Cycle detection is implicit — if len(result) < len(self.tickets) after the BFS drain, a ValueError("Dependency cycle detected") is raised. Returns a list of ticket ID strings in dependency order.
ExecutionEngine
class ExecutionEngine:
def __init__(self, dag: TrackDAG, auto_queue: bool = False):
self.dag = dag
self.auto_queue = auto_queue
tick() — the heartbeat. On each call:
- Calls
dag.cascade_blocks()to propagateblockedstatus from any blocked ticket to its transitivetododependents. - Returns
dag.get_ready_tasks()— the list of tickets that aretodowith all dependenciescompleted.
tick() does NOT promote tickets to in_progress. The auto-promotion (status = "in_progress") happens in the caller — ConductorEngine.run() at src/multi_agent_conductor.py — not in tick(). auto_queue is therefore a parameter that the ConductorEngine consults in its own loop; ExecutionEngine.tick() itself only returns the ready list. Step-mode approval also happens in ConductorEngine.run() via approve_task(); the engine never moves a todo ticket on its own.
approve_task(task_id): Manually transitions todo → in_progress if all dependencies are met.
update_task_status(task_id, status): Force-sets status (used by workers to mark completed or blocked).
WorkerPool (multi_agent_conductor.py)
Bounded concurrent worker pool with semaphore gating.
class WorkerPool:
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self._active: dict[str, threading.Thread] = {}
self._lock = threading.Lock()
self._semaphore = threading.Semaphore(max_workers)
Key Methods:
spawn(ticket_id, target, args)— Spawns a worker thread if pool has capacity. ReturnsNoneif full.join_all(timeout)— Waits for all active workers to complete.get_active_count()— Returns current number of active workers.is_full()— ReturnsTrueif at capacity.
Thread Safety: All state mutations are protected by _lock. The semaphore ensures at most max_workers threads execute concurrently.
Configuration: max_workers is loaded from config.toml → [mma].max_workers (default: 4).
ConductorEngine (multi_agent_conductor.py)
The Tier 2 orchestrator. Owns the execution loop that drives tickets through the DAG.
class ConductorEngine:
def __init__(
self,
track: Track,
event_queue: Optional[events.AsyncEventQueue] = None,
auto_queue: bool = False,
max_workers: int = 4,
):
self.track = track
self.event_queue = event_queue
self.tier_usage = {
"Tier 1": {"input": 0, "output": 0, "model": "gemini-3.1-pro-preview", "tool_preset": None, "persona": None},
"Tier 2": {"input": 0, "output": 0, "model": "gemini-3-flash-preview", "tool_preset": None, "persona": None},
"Tier 3": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
"Tier 4": {"input": 0, "output": 0, "model": "gemini-2.5-flash-lite", "tool_preset": None, "persona": None},
}
self.dag = TrackDAG(self.track.tickets)
self.engine = ExecutionEngine(self.dag, auto_queue=auto_queue)
self.pool = WorkerPool(max_workers=max_workers)
self._workers_lock = threading.Lock()
self._active_workers: dict[str, threading.Thread] = {}
self._abort_events: dict[str, threading.Event] = {}
self._pause_event: threading.Event = threading.Event()
self._tier_usage_lock = threading.Lock()
self._dirty: bool = True
max_workers is NOT read from config.toml by ConductorEngine itself — it is supplied by the caller. The 3 call sites in AppController (at src/app_controller.py:4132-4133, 4145-4146, 4223-4224) all read config.toml → [mma].max_workers and pass it in. The default in the constructor signature is 4.
Per-tier tier_usage schema (each tier entry):
| Key | Type | Purpose |
|---|---|---|
input |
int |
Cumulative input tokens for this tier |
output |
int |
Cumulative output tokens for this tier |
model |
str |
Default model name (overridable per ticket via model_override or persona) |
tool_preset |
Optional[str] |
Active tool preset name (set via set_tool_preset or persona) |
persona |
Optional[str] |
Active persona name (set when a ticket's persona is applied) |
State Broadcast (_push_state)
On every state change, the engine pushes the full orchestration state to the GUI via AsyncEventQueue:
async def _push_state(self, status="running", active_tier=None):
payload = {
"status": status, # "running" | "done" | "blocked"
"active_tier": active_tier, # e.g., "Tier 2 (Tech Lead)", "Tier 3 (Worker): T-001"
"tier_usage": self.tier_usage,
"track": {"id": self.track.id, "title": self.track.description},
"tickets": [asdict(t) for t in self.track.tickets]
}
await self.event_queue.put("mma_state_update", payload)
This payload is consumed by the GUI's _process_pending_gui_tasks handler for "mma_state_update", which updates mma_status, active_tier, mma_tier_usage, active_tickets, and active_track.
Ticket Ingestion (parse_json_tickets)
Parses a JSON array of ticket dicts (from Tier 2 LLM output) into Ticket objects, appends to self.track.tickets, then rebuilds the TrackDAG and ExecutionEngine.
Main Execution Loop (run)
async def run(self):
while True:
ready_tasks = self.engine.tick()
if not ready_tasks:
if all tickets completed:
await self._push_state("done")
break
if any in_progress:
await asyncio.sleep(1) # Waiting for async workers
continue
else:
await self._push_state("blocked")
break
for ticket in ready_tasks:
if in_progress or (auto_queue and not step_mode):
ticket.status = "in_progress"
await self._push_state("running", f"Tier 3 (Worker): {ticket.id}")
# Create worker context
context = WorkerContext(
ticket_id=ticket.id,
model_name="gemini-2.5-flash-lite",
messages=[]
)
# Execute in thread pool (blocking AI call)
await loop.run_in_executor(
None, run_worker_lifecycle, ticket, context, ...
)
await self._push_state("running", "Tier 2 (Tech Lead)")
elif todo and (step_mode or not auto_queue):
await self._push_state("running", f"Awaiting Approval: {ticket.id}")
await asyncio.sleep(1) # Pause for HITL approval
Tier 2: Tech Lead (conductor_tech_lead.py)
The Tier 2 AI call converts a high-level Track brief into discrete Tier 3 tickets.
generate_tickets(track_brief, module_skeletons) -> list[dict]
def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
user_message = (
f"### TRACK BRIEF:\n{track_brief}\n\n"
f"### MODULE SKELETONS:\n{module_skeletons}\n\n"
"Please generate the implementation tickets for this track."
)
# Temporarily override system prompt
old_system_prompt = ai_client._custom_system_prompt
ai_client.set_custom_system_prompt(system_prompt)
try:
response = ai_client.send(md_content="", user_message=user_message)
# Multi-layer JSON extraction:
# 1. Try ```json ... ``` blocks
# 2. Try ``` ... ``` blocks
# 3. Regex search for [ { ... } ] pattern
tickets = json.loads(json_match)
return tickets
finally:
ai_client.set_custom_system_prompt(old_system_prompt)
The JSON extraction is defensive — handles markdown code fences, bare JSON, and regex fallback for embedded arrays.
topological_sort(tickets: list[dict]) -> list[dict]
Convenience wrapper: converts raw dicts to Ticket objects, builds a TrackDAG, calls dag.topological_sort(), returns the original dicts reordered by sorted IDs.
Tier 3: Worker Lifecycle (run_worker_lifecycle)
This free function executes a single ticket. Key behaviors:
Context Amnesia
ai_client.reset_session() # Each ticket starts with a clean slate
No conversational bleed between tickets. Every worker is stateless.
Context Injection
For context_requirements files:
- First file:
parser.get_curated_view(content)— full skeleton with@core_logicand[HOT]bodies preserved. - Subsequent files:
parser.get_skeleton(content)— cheaper, signatures + docstrings only.
Prompt Construction
user_message = (
f"You are assigned to Ticket {ticket.id}.\n"
f"Task Description: {ticket.description}\n"
f"\nContext Files:\n{context_injection}\n"
"Please complete this task. If you are blocked and cannot proceed, "
"start your response with 'BLOCKED' and explain why."
)
HITL Clutch Integration
If event_queue is provided, confirm_spawn() is called before executing, allowing the user to:
- Read the prompt and context.
- Edit both the prompt and context markdown.
- Approve, reject, or abort the entire track.
The confirm_spawn function uses the dialog_container pattern:
- Create
dialog_container = [None](mutable container for thread communication). - Push
"mma_spawn_approval"task to event queue with the container. - Poll
dialog_container[0]every 100ms for up to 60 seconds. - When the GUI fills in the dialog, call
.wait()to get the result. - Returns
(approved, modified_prompt, modified_context).
Persona Application
When a ticket has persona_id set (or a tier-level persona is active), run_worker_lifecycle resolves the persona from PersonaManager and applies it before the AI call:
# Apply Persona if specified
persona = None
if context.persona_id:
pm = PersonaManager(...)
personas = pm.load_all()
if context.persona_id in personas:
persona = personas[context.persona_id]
if persona.system_prompt:
ai_client.set_custom_system_prompt(persona.system_prompt)
if persona.bias_profile:
ai_client.set_bias_profile(persona.bias_profile)
if persona.preferred_models:
preferred_models = persona.preferred_models
if persona.tool_preset:
persona_tool_preset = persona.tool_preset
# Apply tool preset: use persona's tool_preset if available, otherwise fall back to context.tool_preset
effective_tool_preset = persona_tool_preset or context.tool_preset
A single persona may override:
system_prompt— replaces the default system prompt for the workerbias_profile— influences tool selection via semantic nudgingpreferred_models— list used for model escalation (replaces the defaultmodels_list)tool_preset— applied viaset_tool_preset(); takes precedence over the ticket'scontext.tool_presetaggregation_strategy— sets the file aggregation strategy (auto/full/summarize/skeleton) for the worker's context
Resolution order at model selection time (in run_worker_lifecycle):
ticket.model_override(if set) — used unconditionallypersona.preferred_models(if persona applied) — first item is the initial modelticket.retry_count-indexed entry in the resolvedmodels_list— escalates on retries
If the persona fails to load (file not found, parse error), the worker logs a warning and falls back to the default model list. The persona is not a hard failure point.
See guide_personas.md (placeholder; written in Task 10) for the full persona schema, scope inheritance rules, and editor modal.
Tier 4: QA Error Analysis
Stateless error analysis. Invoked via the qa_callback parameter in shell_runner.run_powershell() when a command fails.
def run_tier4_analysis(error_message: str) -> str:
"""Stateless Tier 4 QA analysis of an error message."""
# Uses a dedicated system prompt for error triage
# Returns analysis text (root cause, suggested fix)
# Does NOT modify any code — analysis only
Integrated directly into the shell execution pipeline: if qa_callback is provided and the command has non-zero exit or stderr output, the callback result is appended to the tool output as QA ANALYSIS:\n<result>.
Cross-System Data Flow
The full MMA lifecycle from epic to completion:
- Tier 1 (Orchestrator): User enters an epic description in the GUI. Creates a
Trackwith a brief. - Tier 2 (Tech Lead):
conductor_tech_lead.generate_tickets()callsai_client.send()with thetier2_sprint_planningprompt, producing a JSON ticket list. - Ingestion:
ConductorEngine.parse_json_tickets()ingests the JSON, buildsTicketobjects, constructsTrackDAG+ExecutionEngine. - Execution loop:
ConductorEngine.run()enters the async loop, callingengine.tick()each iteration. - Worker dispatch: For each ready ticket,
run_worker_lifecycle()is called in a thread executor. It usesai_client.send()with MCP tools (dispatched throughmcp_client.dispatch()). - Security enforcement: MCP tools enforce the allowlist via
_resolve_and_check()on every filesystem operation. - State broadcast:
_push_state()→AsyncEventQueue→ GUI renders DAG + ticket status. - External visibility:
ApiHookClient.get_mma_status()queries the Hook API for the full orchestration state. - HITL gates:
confirm_spawn()pushes to event queue → GUI renders dialog → user approves/edits →dialog_container[0].wait()returns the decision.
Token Firewalling
Each tier operates within its own token budget:
- Tier 3 workers use lightweight models (default:
gemini-2.5-flash-lite) and receive only the files listed incontext_requirements. - Context Amnesia ensures no accumulated history bleeds between tickets.
- Tier 2 tracks cumulative
tier_usageper tier:{"input": N, "output": N, "model": ..., "tool_preset": ..., "persona": ...}for token cost monitoring and persona attribution. - First file vs subsequent files: The first
context_requirementsfile gets a curated view (preserving hot paths); subsequent files get only skeletons. - RAG augmentation is caller-injected: The ConductorEngine does not own a RAG engine. The caller (typically
AppControllerfor the main discussion, or the GUI's RAG panel for project-wide queries) is responsible for instantiating anRAGEngineand passing it through toai_client.send(rag_engine=...)for each worker call. See guide_architecture.md for the dispatch flow.
Abort Event Propagation
Workers can be killed mid-execution via abort events:
# In ConductorEngine.__init__:
self._abort_events: dict[str, threading.Event] = {}
# When spawning a worker:
self._abort_events[ticket.id] = threading.Event()
# To kill a worker:
def kill_worker(self, ticket_id: str) -> None:
if ticket_id in self._abort_events:
self._abort_events[ticket_id].set() # Signal abort
thread = self._active_workers.get(ticket_id)
if thread:
thread.join(timeout=1.0) # Wait for graceful shutdown
Abort Check Points in run_worker_lifecycle:
- Before major work — checked immediately after
ai_client.reset_session() - During clutch_callback — checked before each tool execution
- After blocking send() — checked after AI call returns
When abort is detected, the ticket status is set to "killed" and the worker exits immediately.
Pause/Resume Control
The engine supports pausing the entire orchestration pipeline:
def pause(self) -> None:
self._pause_event.set()
def resume(self) -> None:
self._pause_event.clear()
In the main run() loop:
while True:
if self._pause_event.is_set():
self._push_state(status="paused", active_tier="Paused")
time.sleep(0.5)
continue
# ... normal execution
This allows the user to pause execution without killing workers.
Model Escalation
Workers automatically escalate to more capable models on retry:
models_list = [
"gemini-2.5-flash-lite", # First attempt
"gemini-2.5-flash", # Second attempt
"gemini-3.1-pro-preview" # Third+ attempt
]
model_idx = min(ticket.retry_count, len(models_list) - 1)
model_name = models_list[model_idx]
The ticket.model_override field can bypass this logic with a specific model.
Track State Persistence
Track state can be persisted to disk via project_manager.py:
conductor/tracks/<track_id>/
spec.md # Track specification (human-authored)
plan.md # Implementation plan with checkbox tasks
metadata.json # Track metadata (id, type, status, timestamps)
state.toml # Structured TrackState with task list
project_manager.get_all_tracks(base_dir) scans the tracks directory with a three-tier metadata fallback:
state.toml(structuredTrackState) — counts tasks withstatus == "completed".metadata.json(legacy) — gets id/title/status only.plan.md(regex) — counts- [x]vs- [ ]checkboxes for progress.
Beads Integration (Roadmap)
Beads is a Dolt-backed issue tracking system. The src/beads_client.py module provides a Python client for bd CLI calls (bd_create, bd_list, bd_ready, bd_update). The client is functional but not yet integrated into the ConductorEngine execution loop.
Current state (as of 2026-06-02):
BeadsClientis instantiable; it detects whether a project's.beads/directory exists and falls back to no-op if not.- Tools
bd_create,bd_list,bd_ready,bd_updateare exposed via the MCP bridge (see guide_tools.md). - The ConductorEngine still writes track state to
conductor/tracks/<id>/(markdown-based), not to a Beads repo. - A project's TOML may specify a conductor directory override (
[conductor].dir) but does not yet support a Beads repository path.
Planned integration:
- The ConductorEngine's
parse_json_ticketswould optionally forward ingested tickets toBeadsClient.bd_createwhen Beads mode is active. save_track_statewould write to.beads/instead ofconductor/tracks/<id>/state.tomlwhen Beads is active.- The Visual DAG would query
bd_listfor real-time ticket status instead of the in-memoryTrackDAG.
See guide_beads.md (placeholder; written in Task 10) for the full Beads client API and the toolset exposed to agents.
Workspace Profile Auto-Switching (Roadmap)
The WorkspaceManager (src/workspace_manager.py) supports binding workspace profiles to MMA tier context. Currently, profiles can be saved and loaded manually; the auto-switch hook is implemented but not yet wired into ConductorEngine.
Current state:
WorkspaceProfile(named docking + window state) can be saved/loaded via the GUI.- Scope inheritance (Global vs Project) is supported.
- A
bind_to_context(context_id: str, profile_name: str)method exists onWorkspaceManager.
Planned integration:
- On Tier transition (
tier1 → tier2 → tier3),ConductorEnginewould callWorkspaceManager.bind_to_context("tier3", active_profile)to reshape the UI for the current cognitive load. - This is opt-in via
[conductor].auto_switch_profiles = trueinconfig.toml.
See guide_workspace_profiles.md (placeholder; written in Task 10) for the full profile schema.
See Also
- guide_architecture.md — Threading model that MMA's worker pool respects
- guide_multi_agent_conductor.md — The
multi_agent_conductor.py+dag_engine.pyruntime - guide_app_controller.md — How the AppController drives MMA via
_cb_start_track,_do_generate,_process_event_queue - guide_context_aggregation.md — The
aggregate.py:build_tier3_contextvariant used by MMA workers - guide_discussions.md — The Discussion system; MMA worker prompts are built from the active discussion
- conductor/tracks/nagent_review_20260608/report.md §9 — Deep-dive on the MMA sub-conversation pattern vs nagent's
<nagent-conversation>tag; the highest-priority future-track is to extract MMA'srun_worker_lifecycleinto a reusableSubConversationRunnerfor 1:1 discussions (per user-flagged want) - conductor/tracks/nagent_review_20260608/nagent_takeaways_20260608.md §3 and §10 — Actionable patterns for the SubConversationRunner; the design constraint that sub-agents return a concise artifact (not a full transcript) is baked into the recommendation
Addition (2026-06-12) — Delegation as context management, not parallelism
The nagent review (v2.3, §3.12) reframed delegation with a new lens: the reason to spawn a sub-conversation is to keep the parent's context clean. The fact that the child runs concurrently (sometimes) is incidental. Per nagent's bin/nagent:730: "Hand off when noisy: if this conversation is mostly stale tool output, distill goal/state/decisions into a sub-conversation prompt, delegate the rest, and tell your caller about the handoff. Never rewrite your own conversation file while running."
The reframing table:
| Long-lived agent abstractions | Disposable workers |
|---|---|
| Identity is central | Output artifact is central |
| Shared context gets noisy | Child context is isolated |
| Parent absorbs all exploration | Parent gets a concise result |
| Delegation implies personality | Delegation is context management |
How this applies to MMA
MMA already does this implicitly:
src/multi_agent_conductor.py:_spawn_workerruns each MMA worker as a fresh subprocess withai_client.reset_session()(Context Amnesia)- The worker returns a
Result[TaskOutput, ErrorInfo]to the parent (theConductorEngine) - The parent's
disc_entriesdoesn't accumulate the worker's intermediate reads/shell calls
The product implication for 1:1 discussions
The 1:1 discussion path has no sub-agent primitive today. The user types a prompt, the AI responds, the loop continues. If the user wants the AI to "investigate this file" or "look up this API," the answer has to come from the same conversation.
The product decision (user-flagged want). Add a SubConversationRunner for 1:1 discussions. Reuse MMA's mma_exec.py as the subprocess template. The sub-agent returns a concise artifact (the sub-agent's response) + token usage + exit code. The App inserts the result into the active discussion as a "User" role entry. The next LLM call sees it.
The SubConversationRunner shape (per the v2.3 §10.2 spec)
@dataclass
class SubConversationResult:
artifact: str # the sub-agent's response
tokens_in: int
tokens_out: int
exit_code: int
errors: list[ErrorInfo] # from the data_oriented_error_handling convention
class SubConversationRunner:
async def spawn(self, prompt: str, *, allowed_tools: list[str] = None, ...) -> SubConversationResult:
# Reuses mma_exec.py as the subprocess template
# Returns the child's <nagent-response> content + token usage
...
The design contract. The sub-agent's return type is SubConversationResult, not the full conversation. The parent gets a concise artifact, not a transcript. The sub-conversation folder is auto-archived after 7 days (consistent with log_pruner.py).
Addition (2026-06-12) — The 4 memory dimensions (the MMA scope)
The MMA tracks operate on disc_entries (the Discussion dim) and manual_slop.toml (the project config). They do NOT typically touch the Curation dim (per-track ticket specs) or the Knowledge dim (per-track session reports). They MAY touch the RAG dim if the ticket scope includes RAG integration (declared in metadata.json).
The MMA scope, in the 4-dim framework: the canonical 4-dim table is in conductor/code_styleguides/agent_memory_dimensions.md §0. The short version:
- Curation — per-ticket only (a ticket might add a
FileItemif the feature touches curation; not a default) - Discussion — YES (the MMA worker's prompt is built from the active discussion)
- RAG — per-ticket only (declared in
metadata.json) - Knowledge — per-track only (the track's session synthesis in
docs/reports/is the durable knowledge) The implication for MMA workers. MMA workers are given Context Amnesia (ai_client.reset_session()at the start ofrun_worker_lifecycle). The worker sees: - The ticket's prompt (the scoped work)
- The
manual_slop.toml [agent.context_files](the project context) - The
FileItemset per the ticket's scope - Optionally a
knowledge/digest.mdexcerpt (if the ticket scope includes knowledge injection)
The worker does NOT see:
- The full
disc_entrieshistory (per the Context Amnesia pattern) - The full
~/.manual_slop/knowledge/(only the digest excerpt) - The RAG index (unless the ticket scope explicitly opts in)