import time import os from api_hook_client import ApiHookClient from simulation.user_agent import UserSimAgent class WorkflowSimulator: def __init__(self, hook_client: ApiHookClient): self.client = hook_client self.user_agent = UserSimAgent(hook_client) def setup_new_project(self, name, git_dir, project_path=None): print(f"Setting up new project: {name}") if project_path: self.client.click("btn_project_new_automated", user_data=project_path) else: self.client.click("btn_project_new") time.sleep(1) self.client.set_value("project_git_dir", git_dir) self.client.click("btn_project_save") time.sleep(1) def create_discussion(self, name): print(f"Creating discussion: {name}") self.client.set_value("disc_new_name_input", name) self.client.click("btn_disc_create") time.sleep(1) def switch_discussion(self, name): print(f"Switching to discussion: {name}") self.client.select_list_item("disc_listbox", name) time.sleep(1) def load_prior_log(self): print("Loading prior log") self.client.click("btn_load_log") # This usually opens a file dialog which we can't easily automate from here # without more hooks, but we can verify the button click. time.sleep(1) def truncate_history(self, pairs): print(f"Truncating history to {pairs} pairs") self.client.set_value("disc_truncate_pairs", pairs) self.client.click("btn_disc_truncate") time.sleep(1) def run_discussion_turn(self, user_message=None): self.run_discussion_turn_async(user_message) # Wait for AI return self.wait_for_ai_response() def run_discussion_turn_async(self, user_message=None): if user_message is None: # Generate from AI history session = self.client.get_session() entries = session.get('session', {}).get('entries', []) user_message = self.user_agent.generate_response(entries) print(f"\n[USER]: {user_message}") self.client.set_value("ai_input", user_message) self.client.click("btn_gen_send") def wait_for_ai_response(self, timeout=60): print("Waiting for AI response...", end="", flush=True) start_time = time.time() last_count = len(self.client.get_session().get('session', {}).get('entries', [])) while time.time() - start_time < timeout: # Check for error status first status = self.client.get_value("ai_status") if status and status.lower().startswith("error"): print(f"\n[ABORT] GUI reported error status: {status}") return {"role": "AI", "content": f"ERROR: {status}"} time.sleep(1) print(".", end="", flush=True) entries = self.client.get_session().get('session', {}).get('entries', []) if len(entries) > last_count: last_entry = entries[-1] if last_entry.get('role') == 'AI' and last_entry.get('content'): content = last_entry.get('content') print(f"\n[AI]: {content[:100]}...") if "error" in content.lower() or "blocked" in content.lower(): print(f"[WARN] AI response appears to contain an error message.") return last_entry print("\nTimeout waiting for AI") return None