chore(mma): Checkpoint progress on visual simulation and UI refresh before sub-agent delegation
This commit is contained in:
@@ -123,6 +123,7 @@ class HookHandler(BaseHTTPRequestHandler):
|
||||
def get_mma():
|
||||
try:
|
||||
result["mma_status"] = getattr(app, "mma_status", "idle")
|
||||
result["ai_status"] = getattr(app, "ai_status", "idle")
|
||||
result["active_tier"] = getattr(app, "active_tier", None)
|
||||
result["active_track"] = getattr(app, "active_track", None)
|
||||
result["active_tickets"] = getattr(app, "active_tickets", [])
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
|
||||
## Phase 2: Epic & Track Verification
|
||||
- [x] Task: Write the simulation routine to trigger a new Epic and verify the Track Browser updates correctly. 605dfc3
|
||||
- [ ] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer.
|
||||
- [~] Task: Verify that selecting a newly generated track successfully loads its initial (empty) state into the DAG visualizer.
|
||||
|
||||
## Phase 3: DAG & Spawn Interception Verification
|
||||
- [ ] Task: Simulate the "Start Track" action and verify the DAG visualizer populates with tasks.
|
||||
|
||||
@@ -29,12 +29,14 @@ All tasks follow a strict lifecycle:
|
||||
- **Analyze Changes:** Use `get_git_diff` if the task involves modifying recently updated code.
|
||||
- **Minimize Token Burn:** Only use `read_file` with `start_line`/`end_line` for specific implementation details once target areas are identified.
|
||||
4. **Write Failing Tests (Red Phase):**
|
||||
- **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria.
|
||||
- **Pre-Delegation Checkpoint:** Before spawning a worker for dangerous or non-trivial changes, ensure your current progress is staged (`git add .`) or committed. This prevents losing iterations if a sub-agent incorrectly uses `git restore`.
|
||||
- **Delegate Test Creation:** Do NOT write test code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a prompt to create the necessary test files and unit tests based on the task criteria. (If repeating due to failures, pass `--failure-count X` to switch to a more capable model).
|
||||
- Take the code generated by the Worker and apply it.
|
||||
- **CRITICAL:** Run the tests and confirm that they fail as expected. This is the "Red" phase of TDD. Do not proceed until you have failing tests.
|
||||
|
||||
4. **Implement to Pass Tests (Green Phase):**
|
||||
- **Delegate Implementation:** Do NOT write the implementation code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a highly specific prompt to write the minimum amount of application code necessary to make the failing tests pass.
|
||||
- **Pre-Delegation Checkpoint:** Ensure current progress is staged or committed before delegating.
|
||||
- **Delegate Implementation:** Do NOT write the implementation code directly. Spawn a Tier 3 Worker (`python scripts/mma_exec.py --role tier3-worker "[PROMPT]"`) with a highly specific prompt to write the minimum amount of application code necessary to make the failing tests pass. (If repeating due to failures, pass `--failure-count X` to switch to a more capable model).
|
||||
- Take the code generated by the Worker and apply it.
|
||||
- Run the test suite again and confirm that all tests now pass. This is the "Green" phase.
|
||||
|
||||
|
||||
@@ -9,9 +9,9 @@ def generate_tickets(track_brief: str, module_skeletons: str) -> list[dict]:
|
||||
Breaks down a Track Brief and module skeletons into discrete Tier 3 Tickets.
|
||||
"""
|
||||
# 1. Set Tier 2 Model (Tech Lead - Flash)
|
||||
ai_client.set_provider('gemini', 'gemini-2.5-flash-lite')
|
||||
ai_client.reset_session()
|
||||
# 2. Construct Prompt
|
||||
if ai_client._model != 'mock':
|
||||
ai_client.set_provider('gemini', 'gemini-2.5-flash-lite')
|
||||
ai_client.reset_session() # 2. Construct Prompt
|
||||
system_prompt = mma_prompts.PROMPTS.get("tier2_sprint_planning")
|
||||
user_message = (
|
||||
f"### TRACK BRIEF:\n{track_brief}\n\n"
|
||||
|
||||
66
gui_2.py
66
gui_2.py
@@ -896,6 +896,8 @@ class App:
|
||||
user_data = task.get("user_data")
|
||||
if item == "btn_project_new_automated":
|
||||
self._cb_new_project_automated(user_data)
|
||||
elif item == "btn_mma_load_track":
|
||||
self._cb_load_track(user_data)
|
||||
elif item in self._clickable_actions:
|
||||
# Check if it's a method that accepts user_data
|
||||
import inspect
|
||||
@@ -1961,11 +1963,36 @@ class App:
|
||||
|
||||
def _cb_accept_tracks(self) -> None:
|
||||
def _bg_task():
|
||||
for track_data in self.proposed_tracks:
|
||||
self._start_track_logic(track_data)
|
||||
# Generate skeletons once
|
||||
self.ai_status = "Phase 2: Generating skeletons for all tracks..."
|
||||
parser = ASTParser(language="python")
|
||||
generated_skeletons = ""
|
||||
try:
|
||||
for i, file_path in enumerate(self.files):
|
||||
try:
|
||||
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
|
||||
abs_path = Path(self.ui_files_base_dir) / file_path
|
||||
if abs_path.exists() and abs_path.suffix == ".py":
|
||||
with open(abs_path, "r", encoding="utf-8") as f:
|
||||
code = f.read()
|
||||
generated_skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
|
||||
except Exception as e:
|
||||
print(f"Error parsing skeleton for {file_path}: {e}")
|
||||
except Exception as e:
|
||||
self.ai_status = f"Error generating skeletons: {e}"
|
||||
print(f"Error generating skeletons: {e}")
|
||||
return # Exit if skeleton generation fails
|
||||
|
||||
# Now loop through tracks and call _start_track_logic with generated skeletons
|
||||
total_tracks = len(self.proposed_tracks)
|
||||
for i, track_data in enumerate(self.proposed_tracks):
|
||||
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
|
||||
self.ai_status = f"Processing track {i+1} of {total_tracks}: '{title}'..."
|
||||
self._start_track_logic(track_data, skeletons_str=generated_skeletons) # Pass skeletons
|
||||
|
||||
with self._pending_gui_tasks_lock:
|
||||
self._pending_gui_tasks.append({'action': 'refresh_from_project'}) # Ensure UI refresh after tracks are started
|
||||
self.ai_status = "Tracks accepted and execution started."
|
||||
self.ai_status = f"All {total_tracks} tracks accepted and execution started."
|
||||
threading.Thread(target=_bg_task, daemon=True).start()
|
||||
|
||||
def _cb_start_track(self, user_data: Any = None) -> None:
|
||||
@@ -1980,24 +2007,29 @@ class App:
|
||||
threading.Thread(target=lambda: self._start_track_logic(track_data), daemon=True).start()
|
||||
self.ai_status = f"Track '{title}' started."
|
||||
|
||||
def _start_track_logic(self, track_data: dict[str, Any]) -> None:
|
||||
def _start_track_logic(self, track_data: dict[str, Any], skeletons_str: str | None = None) -> None:
|
||||
try:
|
||||
goal = track_data.get("goal", "")
|
||||
title = track_data.get("title") or track_data.get("goal", "Untitled Track")
|
||||
self.ai_status = f"Phase 2: Generating tickets for {title}..."
|
||||
# 1. Get skeletons for context
|
||||
parser = ASTParser(language="python")
|
||||
skeletons = ""
|
||||
for i, file_path in enumerate(self.files):
|
||||
try:
|
||||
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
|
||||
abs_path = Path(self.ui_files_base_dir) / file_path
|
||||
if abs_path.exists() and abs_path.suffix == ".py":
|
||||
with open(abs_path, "r", encoding="utf-8") as f:
|
||||
code = f.read()
|
||||
skeletons += f"\nFile: {file_path}\n{parser.get_skeleton(code)}\n"
|
||||
except Exception as e:
|
||||
print(f"Error parsing skeleton for {file_path}: {e}")
|
||||
|
||||
skeletons = "" # Initialize skeletons variable
|
||||
if skeletons_str is None: # Only generate if not provided
|
||||
# 1. Get skeletons for context
|
||||
parser = ASTParser(language="python")
|
||||
for i, file_path in enumerate(self.files):
|
||||
try:
|
||||
self.ai_status = f"Phase 2: Scanning files ({i+1}/{len(self.files)})..."
|
||||
abs_path = Path(self.ui_files_base_dir) / file_path
|
||||
if abs_path.exists() and abs_path.suffix == ".py":
|
||||
with open(abs_path, "r", encoding="utf-8") as f:
|
||||
code = f.read()
|
||||
skeletons += f"\\nFile: {file_path}\\n{parser.get_skeleton(code)}\\n"
|
||||
except Exception as e:
|
||||
print(f"Error parsing skeleton for {file_path}: {e}")
|
||||
else:
|
||||
skeletons = skeletons_str # Use provided skeletons
|
||||
|
||||
self.ai_status = "Phase 2: Calling Tech Lead..."
|
||||
raw_tickets = conductor_tech_lead.generate_tickets(goal, skeletons)
|
||||
if not raw_tickets:
|
||||
|
||||
@@ -79,7 +79,7 @@ DockId=0x0000000F,2
|
||||
|
||||
[Window][Theme]
|
||||
Pos=0,17
|
||||
Size=632,824
|
||||
Size=32,824
|
||||
Collapsed=0
|
||||
DockId=0x00000005,1
|
||||
|
||||
@@ -89,14 +89,14 @@ Size=900,700
|
||||
Collapsed=0
|
||||
|
||||
[Window][Diagnostics]
|
||||
Pos=634,17
|
||||
Size=911,643
|
||||
Pos=34,17
|
||||
Size=765,545
|
||||
Collapsed=0
|
||||
DockId=0x00000010,0
|
||||
|
||||
[Window][Context Hub]
|
||||
Pos=0,17
|
||||
Size=632,824
|
||||
Size=32,824
|
||||
Collapsed=0
|
||||
DockId=0x00000005,0
|
||||
|
||||
@@ -107,26 +107,26 @@ Collapsed=0
|
||||
DockId=0x0000000D,0
|
||||
|
||||
[Window][Discussion Hub]
|
||||
Pos=1547,17
|
||||
Size=879,1395
|
||||
Pos=801,17
|
||||
Size=879,1183
|
||||
Collapsed=0
|
||||
DockId=0x00000004,0
|
||||
|
||||
[Window][Operations Hub]
|
||||
Pos=634,17
|
||||
Size=911,643
|
||||
Pos=34,17
|
||||
Size=765,545
|
||||
Collapsed=0
|
||||
DockId=0x00000010,1
|
||||
|
||||
[Window][Files & Media]
|
||||
Pos=0,843
|
||||
Size=632,569
|
||||
Size=32,357
|
||||
Collapsed=0
|
||||
DockId=0x00000006,1
|
||||
|
||||
[Window][AI Settings]
|
||||
Pos=0,843
|
||||
Size=632,569
|
||||
Size=32,357
|
||||
Collapsed=0
|
||||
DockId=0x00000006,0
|
||||
|
||||
@@ -136,14 +136,14 @@ Size=416,325
|
||||
Collapsed=0
|
||||
|
||||
[Window][MMA Dashboard]
|
||||
Pos=634,662
|
||||
Size=911,750
|
||||
Pos=34,564
|
||||
Size=765,636
|
||||
Collapsed=0
|
||||
DockId=0x00000011,0
|
||||
|
||||
[Window][Log Management]
|
||||
Pos=1547,17
|
||||
Size=879,1395
|
||||
Pos=801,17
|
||||
Size=879,1183
|
||||
Collapsed=0
|
||||
DockId=0x00000004,1
|
||||
|
||||
@@ -173,7 +173,7 @@ Column 3 Weight=1.0000
|
||||
DockNode ID=0x00000008 Pos=3125,170 Size=593,1157 Split=Y
|
||||
DockNode ID=0x00000009 Parent=0x00000008 SizeRef=1029,147 Selected=0x0469CA7A
|
||||
DockNode ID=0x0000000A Parent=0x00000008 SizeRef=1029,145 Selected=0xDF822E02
|
||||
DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=2426,1395 Split=Y
|
||||
DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=1680,1183 Split=Y
|
||||
DockNode ID=0x0000000C Parent=0xAFC85805 SizeRef=1362,1041 Split=X Selected=0x5D11106F
|
||||
DockNode ID=0x00000003 Parent=0x0000000C SizeRef=1545,1183 Split=X
|
||||
DockNode ID=0x0000000B Parent=0x00000003 SizeRef=404,1186 Split=Y Selected=0xF4139CA2
|
||||
@@ -182,7 +182,7 @@ DockSpace ID=0xAFC85805 Window=0x079D3A04 Pos=0,17 Size=2426,1395 Sp
|
||||
DockNode ID=0x00000005 Parent=0x00000007 SizeRef=295,824 Selected=0xF4139CA2
|
||||
DockNode ID=0x00000006 Parent=0x00000007 SizeRef=295,724 CentralNode=1 Selected=0x7BD57D6A
|
||||
DockNode ID=0x0000000E Parent=0x00000002 SizeRef=911,858 Split=Y Selected=0x418C7449
|
||||
DockNode ID=0x00000010 Parent=0x0000000E SizeRef=868,545 Selected=0x418C7449
|
||||
DockNode ID=0x00000010 Parent=0x0000000E SizeRef=868,545 Selected=0xB4CBF21A
|
||||
DockNode ID=0x00000011 Parent=0x0000000E SizeRef=868,636 Selected=0x3AEC3498
|
||||
DockNode ID=0x00000001 Parent=0x0000000B SizeRef=1029,775 Selected=0x8B4EBFA6
|
||||
DockNode ID=0x0000000D Parent=0x00000003 SizeRef=435,1186 Selected=0x363E93D6
|
||||
|
||||
@@ -15,11 +15,13 @@ To ensure proper environment handling and logging, you MUST NOT call the `gemini
|
||||
|
||||
## 1. The Tier 3 Worker (Execution)
|
||||
When performing code modifications or implementing specific requirements:
|
||||
1. **DO NOT** perform large code writes yourself.
|
||||
2. **DO** construct a single, highly specific prompt with a clear objective.
|
||||
3. **DO** spawn a Tier 3 Worker.
|
||||
1. **Pre-Delegation Checkpoint:** For dangerous or non-trivial changes, ALWAYS stage your changes (`git add .`) or commit before delegating to a Tier 3 Worker. If the worker fails or runs `git restore`, you will lose all prior AI iterations for that file if it wasn't staged/committed.
|
||||
2. **DO NOT** perform large code writes yourself.
|
||||
3. **DO** construct a single, highly specific prompt with a clear objective.
|
||||
4. **DO** spawn a Tier 3 Worker.
|
||||
*Command:* `uv run python scripts/mma_exec.py --role tier3-worker "Implement [SPECIFIC_INSTRUCTION] in [FILE_PATH]. Follow TDD and return success status or code changes."`
|
||||
4. The Tier 3 Worker is stateless and has tool access for file I/O.
|
||||
5. **Handling Repeated Failures:** If a Tier 3 Worker fails multiple times on the same task, it may lack the necessary capability. You must track failures and retry with `--failure-count <N>` (e.g., `--failure-count 2`). This tells `mma_exec.py` to escalate the sub-agent to a more powerful reasoning model (like `gemini-3-flash`).
|
||||
6. The Tier 3 Worker is stateless and has tool access for file I/O.
|
||||
|
||||
## 2. The Tier 4 QA Agent (Diagnostics)
|
||||
If you run a test or command that fails with a significant error or large traceback:
|
||||
|
||||
@@ -57,13 +57,15 @@ def generate_skeleton(code: str) -> str:
|
||||
except Exception as e:
|
||||
return f"# Error generating skeleton: {e}\n{code}"
|
||||
|
||||
def get_model_for_role(role: str) -> str:
|
||||
def get_model_for_role(role: str, failure_count: int = 0) -> str:
|
||||
"""Returns the specific model to use for a given tier role."""
|
||||
if role == 'tier1-orchestrator' or role == 'tier1':
|
||||
return 'gemini-3.1-pro-preview'
|
||||
elif role == 'tier2-tech-lead' or role == 'tier2':
|
||||
return 'gemini-3-flash-preview'
|
||||
elif role == 'tier3-worker' or role == 'tier3':
|
||||
if failure_count > 1:
|
||||
return 'gemini-3-flash'
|
||||
return 'gemini-2.5-flash-lite'
|
||||
elif role == 'tier4-qa' or role == 'tier4':
|
||||
return 'gemini-2.5-flash-lite'
|
||||
@@ -124,8 +126,8 @@ def get_dependencies(filepath: str) -> list[str]:
|
||||
print(f"Error getting dependencies for {filepath}: {e}")
|
||||
return []
|
||||
|
||||
def execute_agent(role: str, prompt: str, docs: list[str], debug: bool = False) -> str:
|
||||
model = get_model_for_role(role)
|
||||
def execute_agent(role: str, prompt: str, docs: list[str], debug: bool = False, failure_count: int = 0) -> str:
|
||||
model = get_model_for_role(role, failure_count)
|
||||
# Advanced Context: Dependency skeletons for Tier 3
|
||||
injected_context = ""
|
||||
# Whitelist of modules that sub-agents have "unfettered" (full) access to.
|
||||
@@ -249,6 +251,12 @@ def create_parser() -> argparse.ArgumentParser:
|
||||
action="store_true",
|
||||
help="Enable debug logging"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--failure-count",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Number of times this task has failed previously"
|
||||
)
|
||||
parser.add_argument(
|
||||
"prompt",
|
||||
type=str,
|
||||
@@ -263,6 +271,7 @@ def main() -> None:
|
||||
role = args.role
|
||||
prompt = args.prompt
|
||||
debug = args.debug
|
||||
failure_count = args.failure_count
|
||||
docs = []
|
||||
if args.task_file and os.path.exists(args.task_file):
|
||||
with open(args.task_file, "rb") as f:
|
||||
@@ -272,6 +281,7 @@ def main() -> None:
|
||||
docs = task_data.get("docs", [])
|
||||
# Only override debug if it's explicitly set in the task file (optional)
|
||||
debug = task_data.get("debug", debug)
|
||||
failure_count = task_data.get("failure_count", failure_count)
|
||||
if not role or not prompt:
|
||||
parser.print_help()
|
||||
return
|
||||
@@ -283,8 +293,8 @@ def main() -> None:
|
||||
for ref in file_refs:
|
||||
if os.path.exists(ref) and ref not in docs:
|
||||
docs.append(ref)
|
||||
print(f"Executing role: {role} with docs: {docs} (debug={debug})")
|
||||
result = execute_agent(role, prompt, docs, debug=debug)
|
||||
print(f"Executing role: {role} with docs: {docs} (debug={debug}, failure_count={failure_count})")
|
||||
result = execute_agent(role, prompt, docs, debug=debug, failure_count=failure_count)
|
||||
print(result)
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -37,6 +37,6 @@ web_search = true
|
||||
fetch_url = true
|
||||
|
||||
[mma]
|
||||
epic = "Build a simple calculator"
|
||||
epic = "Develop a new feature"
|
||||
active_track_id = ""
|
||||
tracks = []
|
||||
|
||||
@@ -10,5 +10,5 @@ auto_add = true
|
||||
|
||||
[discussions.main]
|
||||
git_commit = ""
|
||||
last_updated = "2026-02-28T21:00:47"
|
||||
last_updated = "2026-02-28T21:27:02"
|
||||
history = []
|
||||
|
||||
@@ -9,45 +9,71 @@ sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
|
||||
from api_hook_client import ApiHookClient
|
||||
|
||||
@pytest.mark.integration
|
||||
def test_mma_epic_simulation(live_gui) -> None:
|
||||
def test_mma_complete_lifecycle(live_gui) -> None:
|
||||
"""
|
||||
Integration test for MMA epic simulation.
|
||||
Red Phase: asserts False.
|
||||
Tests the entire MMA lifecycle from epic planning to track loading and ticket verification
|
||||
in a single test case to avoid state dependency issues between separate test functions.
|
||||
"""
|
||||
client = ApiHookClient()
|
||||
assert client.wait_for_server(timeout=10)
|
||||
# Try selecting MMA Dashboard tab if applicable (using typical naming convention)
|
||||
try:
|
||||
client.select_tab('main_tab_bar', 'tab_mma')
|
||||
except Exception:
|
||||
pass
|
||||
# Set model to mock to avoid real API calls and timeouts
|
||||
|
||||
# 1. Set model to 'mock'.
|
||||
try:
|
||||
client.set_value('current_model', 'mock')
|
||||
except Exception:
|
||||
pass
|
||||
client.set_value('mma_epic_input', 'Build a simple calculator')
|
||||
except Exception as e:
|
||||
pytest.fail(f"Failed to set model to 'mock': {e}")
|
||||
|
||||
# 2. Enter epic and click 'Plan Epic'.
|
||||
client.set_value('mma_epic_input', 'Develop a new feature')
|
||||
client.click('btn_mma_plan_epic')
|
||||
|
||||
# 1. Poll for Proposed Tracks
|
||||
proposed_success = False
|
||||
for i in range(30):
|
||||
# 3. Wait for 'proposed_tracks'.
|
||||
proposed_tracks_found = False
|
||||
for _ in range(60): # Poll for up to 60 seconds
|
||||
status = client.get_mma_status()
|
||||
print(f"Polling status: {status}")
|
||||
# Assuming 'ai_status' might be a key within the status dictionary. If not, this needs adjustment.
|
||||
print(f"Polling ai_status: {status.get('ai_status', 'N/A')}")
|
||||
if status and status.get('proposed_tracks') and len(status['proposed_tracks']) > 0:
|
||||
proposed_success = True
|
||||
proposed_tracks_found = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert proposed_success, "Failed to generate proposed tracks."
|
||||
assert proposed_tracks_found, "Failed to find proposed tracks after planning epic."
|
||||
|
||||
# 2. Accept Proposed Tracks
|
||||
# 4. Click 'Accept' to start tracks.
|
||||
client.click('btn_mma_accept_tracks')
|
||||
|
||||
# 3. Poll for Final Tracks
|
||||
tracks_success = False
|
||||
for i in range(30):
|
||||
# 5. Wait for 'tracks' list to populate.
|
||||
tracks_populated = False
|
||||
for _ in range(30): # Poll for up to 30 seconds
|
||||
status = client.get_mma_status()
|
||||
if status and status.get('tracks') and len(status['tracks']) > 0:
|
||||
tracks_success = True
|
||||
tracks_populated = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert tracks_success, "Failed to generate at least one track."
|
||||
assert tracks_populated, "Failed to populate tracks list after accepting proposed tracks."
|
||||
|
||||
# 6. Verify that one of the new tracks can be loaded and its tickets appear in 'active_tickets'.
|
||||
status_after_tracks = client.get_mma_status()
|
||||
assert status_after_tracks is not None, "Failed to get MMA status after tracks populated."
|
||||
tracks_list = status_after_tracks.get('tracks')
|
||||
assert tracks_list is not None and len(tracks_list) > 0, "Tracks list is empty or not found."
|
||||
|
||||
track_id_to_load = tracks_list[0]['id']
|
||||
print(f"Attempting to load track with ID: {track_id_to_load}")
|
||||
|
||||
# Load the first track
|
||||
client.click('btn_mma_load_track', user_data=track_id_to_load)
|
||||
|
||||
# Poll until 'active_track' is not None and 'active_tickets' are present
|
||||
active_track_and_tickets_found = False
|
||||
for _ in range(60): # Poll for up to 60 seconds
|
||||
status = client.get_mma_status()
|
||||
if status and status.get('active_track') == track_id_to_load and \
|
||||
'active_tickets' in status and len(status['active_tickets']) > 0:
|
||||
active_track_and_tickets_found = True
|
||||
break
|
||||
time.sleep(1)
|
||||
assert active_track_and_tickets_found, f"Timed out waiting for track {track_id_to_load} to load and populate active tickets."
|
||||
|
||||
print(f"Successfully loaded and verified track ID: {track_id_to_load} with active tickets.")
|
||||
|
||||
Reference in New Issue
Block a user