144 lines
5.1 KiB
Python
144 lines
5.1 KiB
Python
import time
|
|
from api_hook_client import ApiHookClient
|
|
from simulation.user_agent import UserSimAgent
|
|
|
|
class WorkflowSimulator:
|
|
def __init__(self, hook_client: ApiHookClient) -> None:
|
|
self.client = hook_client
|
|
self.user_agent = UserSimAgent(hook_client)
|
|
|
|
def setup_new_project(self, name: str, git_dir: str, project_path: str = None) -> None:
|
|
print(f"Setting up new project: {name}")
|
|
if project_path:
|
|
self.client.click("btn_project_new_automated", user_data=project_path)
|
|
else:
|
|
self.client.click("btn_project_new")
|
|
time.sleep(1)
|
|
self.client.set_value("project_git_dir", git_dir)
|
|
self.client.click("btn_project_save")
|
|
time.sleep(1)
|
|
# Force state deterministic for tests
|
|
self.client.set_value("auto_add_history", True)
|
|
|
|
def create_discussion(self, name: str) -> None:
|
|
print(f"Creating discussion: {name}")
|
|
self.client.set_value("disc_new_name_input", name)
|
|
self.client.click("btn_disc_create")
|
|
self.client.select_list_item('disc_listbox', name)
|
|
time.sleep(2)
|
|
|
|
def switch_discussion(self, name: str) -> None:
|
|
print(f"Switching to discussion: {name}")
|
|
self.client.select_list_item("disc_listbox", name)
|
|
time.sleep(1)
|
|
|
|
def load_prior_log(self) -> None:
|
|
print("Loading prior log")
|
|
self.client.click("btn_load_log")
|
|
# This usually opens a file dialog which we can't easily automate from here
|
|
# without more hooks, but we can verify the button click.
|
|
time.sleep(1)
|
|
|
|
def truncate_history(self, pairs: int) -> None:
|
|
print(f"Truncating history to {pairs} pairs")
|
|
self.client.set_value("disc_truncate_pairs", pairs)
|
|
self.client.click("btn_disc_truncate")
|
|
time.sleep(1)
|
|
|
|
def run_discussion_turn(self, user_message: str = None) -> dict | None:
|
|
self.run_discussion_turn_async(user_message)
|
|
# Wait for AI
|
|
return self.wait_for_ai_response()
|
|
|
|
def run_discussion_turn_async(self, user_message: str = None) -> None:
|
|
if user_message is None:
|
|
# Generate from AI history
|
|
session = self.client.get_session()
|
|
entries = session.get('session', {}).get('entries', [])
|
|
user_message = self.user_agent.generate_response(entries)
|
|
active_disc = self.client.get_value("active_discussion")
|
|
print(f"[DEBUG] Current active discussion in GUI: {active_disc}")
|
|
print(f"\n[USER]: {user_message}")
|
|
self.client.set_value("ai_input", user_message)
|
|
self.client.click("btn_gen_send")
|
|
|
|
def wait_for_ai_response(self, timeout: int = 60) -> dict | None:
|
|
print("Waiting for AI response...", end="", flush=True)
|
|
|
|
start_time = time.time()
|
|
last_debug_time = 0
|
|
stalled_start_time = None
|
|
|
|
# Statuses that indicate the system is still actively processing the AI request
|
|
busy_indicators = [
|
|
"thinking", "streaming", "sending", "running powershell",
|
|
"awaiting ai", "fetching", "searching"
|
|
]
|
|
|
|
was_busy = False
|
|
|
|
while time.time() - start_time < timeout:
|
|
elapsed = time.time() - start_time
|
|
status = (self.client.get_value("ai_status") or "idle").lower()
|
|
|
|
is_busy = any(indicator in status for indicator in busy_indicators)
|
|
if is_busy:
|
|
was_busy = True
|
|
|
|
# Always fetch latest entries
|
|
session_data = self.client.get_session() or {}
|
|
entries = session_data.get('session', {}).get('entries', [])
|
|
|
|
# Find the last entry that is NOT role 'System'
|
|
non_system_entries = [e for e in entries if e.get('role') != 'System']
|
|
last_entry = non_system_entries[-1] if non_system_entries else {}
|
|
last_role = last_entry.get('role', 'none')
|
|
|
|
# AI entries for return value
|
|
current_ai_entries = [e for e in entries if e.get('role') == 'AI']
|
|
last_ai_entry = current_ai_entries[-1] if current_ai_entries else {}
|
|
|
|
if elapsed - last_debug_time >= 5:
|
|
roles = [e.get("role") for e in entries]
|
|
print(f"\n[DEBUG] {elapsed:.1f}s - status: '{status}', roles: {roles}")
|
|
last_debug_time = elapsed
|
|
|
|
if "error" in status:
|
|
print(f"\n[ABORT] GUI reported error status: {status}")
|
|
return last_ai_entry if last_ai_entry else {"role": "AI", "content": f"ERROR: {status}"}
|
|
|
|
# Turn completion logic:
|
|
# 1. Transition: we were busy and now we are not, and the last role is AI.
|
|
# 2. Fallback: we are idle/done and the last role is AI, after some initial delay.
|
|
is_complete = False
|
|
if was_busy and not is_busy and last_role == 'AI':
|
|
is_complete = True
|
|
elif status in ("idle", "done") and last_role == 'AI' and elapsed > 2:
|
|
is_complete = True
|
|
|
|
if is_complete:
|
|
content = last_ai_entry.get('content', '')
|
|
print(f"\n[AI]: {content[:100]}...")
|
|
return last_ai_entry
|
|
|
|
if non_system_entries:
|
|
# Stall detection for 'Tool' results
|
|
if last_role == 'Tool' and not is_busy:
|
|
if stalled_start_time is None:
|
|
stalled_start_time = time.time()
|
|
elif time.time() - stalled_start_time > 5:
|
|
print("\n[STALL DETECTED] Turn stalled with Tool result. Clicking 'btn_gen_send' to continue.")
|
|
self.client.click("btn_gen_send")
|
|
stalled_start_time = time.time()
|
|
else:
|
|
stalled_start_time = None
|
|
|
|
# Maintain the 'thinking/streaming' wait loop
|
|
time.sleep(1)
|
|
print(".", end="", flush=True)
|
|
|
|
print("\nTimeout waiting for AI")
|
|
active_disc = self.client.get_value("active_discussion")
|
|
print(f"[DEBUG] Active discussion in GUI at timeout: {active_disc}")
|
|
return None
|