checkpoint: massive refactor

This commit is contained in:
2026-02-28 09:06:45 -05:00
parent f2512c30e9
commit d36632c21a
149 changed files with 16255 additions and 17722 deletions

View File

@@ -6,74 +6,59 @@ from api_hook_client import ApiHookClient
from simulation.workflow_sim import WorkflowSimulator
def main():
client = ApiHookClient()
print("=== Manual Slop: Live UX Walkthrough ===")
print("Connecting to GUI...")
if not client.wait_for_server(timeout=10):
print("Error: Could not connect to GUI. Ensure it is running with --enable-test-hooks")
return
sim = WorkflowSimulator(client)
# 1. Start Clean
print("\n[Action] Resetting Session...")
client.click("btn_reset")
time.sleep(2)
# 2. Project Scaffolding
project_name = f"LiveTest_{int(time.time())}"
# Use actual project dir for realism
git_dir = os.path.abspath(".")
project_path = os.path.join(git_dir, "tests", f"{project_name}.toml")
print(f"\n[Action] Scaffolding Project: {project_name} at {project_path}")
sim.setup_new_project(project_name, git_dir, project_path)
# Enable auto-add so results appear in history automatically
client.set_value("auto_add_history", True)
time.sleep(1)
# 3. Discussion Loop (3 turns for speed, but logic supports more)
turns = [
"Hi! I want to create a simple python script called 'hello.py' that prints the current date and time. Can you write it for me?",
"That looks great. Can you also add a feature to print the name of the operating system?",
"Excellent. Now, please create a requirements.txt file with 'requests' in it."
]
for i, msg in enumerate(turns):
print(f"\n--- Turn {i+1} ---")
# Switch to Comms Log to see the send
client.select_tab("operations_tabs", "tab_comms")
sim.run_discussion_turn(msg)
# Check thinking indicator
state = client.get_indicator_state("thinking_indicator")
if state.get('shown'):
print("[Status] Thinking indicator is visible.")
# Switch to Tool Log halfway through wait
time.sleep(2)
client.select_tab("operations_tabs", "tab_tool")
# Wait for AI response if not already finished
# (run_discussion_turn already waits, so we just observe)
# 4. History Management
print("\n[Action] Creating new discussion thread...")
sim.create_discussion("Refinement")
print("\n[Action] Switching back to Default...")
sim.switch_discussion("Default")
# 5. Manual Sign-off Simulation
print("\n=== Walkthrough Complete ===")
print("Please verify the following in the GUI:")
print("1. The project metadata reflects the new project.")
print("2. The discussion history contains the 3 turns.")
print("3. The 'Refinement' discussion exists in the list.")
print("\nWalkthrough finished successfully.")
client = ApiHookClient()
print("=== Manual Slop: Live UX Walkthrough ===")
print("Connecting to GUI...")
if not client.wait_for_server(timeout=10):
print("Error: Could not connect to GUI. Ensure it is running with --enable-test-hooks")
return
sim = WorkflowSimulator(client)
# 1. Start Clean
print("\n[Action] Resetting Session...")
client.click("btn_reset")
time.sleep(2)
# 2. Project Scaffolding
project_name = f"LiveTest_{int(time.time())}"
# Use actual project dir for realism
git_dir = os.path.abspath(".")
project_path = os.path.join(git_dir, "tests", f"{project_name}.toml")
print(f"\n[Action] Scaffolding Project: {project_name} at {project_path}")
sim.setup_new_project(project_name, git_dir, project_path)
# Enable auto-add so results appear in history automatically
client.set_value("auto_add_history", True)
time.sleep(1)
# 3. Discussion Loop (3 turns for speed, but logic supports more)
turns = [
"Hi! I want to create a simple python script called 'hello.py' that prints the current date and time. Can you write it for me?",
"That looks great. Can you also add a feature to print the name of the operating system?",
"Excellent. Now, please create a requirements.txt file with 'requests' in it."
]
for i, msg in enumerate(turns):
print(f"\n--- Turn {i+1} ---")
# Switch to Comms Log to see the send
client.select_tab("operations_tabs", "tab_comms")
sim.run_discussion_turn(msg)
# Check thinking indicator
state = client.get_indicator_state("thinking_indicator")
if state.get('shown'):
print("[Status] Thinking indicator is visible.")
# Switch to Tool Log halfway through wait
time.sleep(2)
client.select_tab("operations_tabs", "tab_tool")
# Wait for AI response if not already finished
# (run_discussion_turn already waits, so we just observe)
# 4. History Management
print("\n[Action] Creating new discussion thread...")
sim.create_discussion("Refinement")
print("\n[Action] Switching back to Default...")
sim.switch_discussion("Default")
# 5. Manual Sign-off Simulation
print("\n=== Walkthrough Complete ===")
print("Please verify the following in the GUI:")
print("1. The project metadata reflects the new project.")
print("2. The discussion history contains the 3 turns.")
print("3. The 'Refinement' discussion exists in the list.")
print("\nWalkthrough finished successfully.")
if __name__ == "__main__":
main()
main()

View File

@@ -9,49 +9,42 @@ from api_hook_client import ApiHookClient
from simulation.user_agent import UserSimAgent
def main():
client = ApiHookClient()
print("Waiting for hook server...")
if not client.wait_for_server(timeout=5):
print("Hook server not found. Start GUI with --enable-test-hooks")
return
sim_agent = UserSimAgent(client)
# 1. Reset session to start clean
print("Resetting session...")
client.click("btn_reset")
time.sleep(2) # Give it time to clear
# 2. Initial message
initial_msg = "Hello! I want to create a simple python script that prints 'Hello World'. Can you help me?"
client = ApiHookClient()
print("Waiting for hook server...")
if not client.wait_for_server(timeout=5):
print("Hook server not found. Start GUI with --enable-test-hooks")
return
sim_agent = UserSimAgent(client)
# 1. Reset session to start clean
print("Resetting session...")
client.click("btn_reset")
time.sleep(2) # Give it time to clear
# 2. Initial message
initial_msg = "Hello! I want to create a simple python script that prints 'Hello World'. Can you help me?"
print(f"
[USER]: {initial_msg}")
client.set_value("ai_input", initial_msg)
client.click("btn_gen_send")
# 3. Wait for AI response
print("Waiting for AI response...", end="", flush=True)
last_entry_count = 0
for _ in range(60): # 60 seconds max
time.sleep(1)
print(".", end="", flush=True)
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
if len(entries) > last_entry_count:
# Something happened
last_entry = entries[-1]
if last_entry.get('role') == 'AI' and last_entry.get('content'):
print(f"
[USER]: {initial_msg}")
client.set_value("ai_input", initial_msg)
client.click("btn_gen_send")
# 3. Wait for AI response
print("Waiting for AI response...", end="", flush=True)
last_entry_count = 0
for _ in range(60): # 60 seconds max
time.sleep(1)
print(".", end="", flush=True)
session = client.get_session()
entries = session.get('session', {}).get('entries', [])
if len(entries) > last_entry_count:
# Something happened
last_entry = entries[-1]
if last_entry.get('role') == 'AI' and last_entry.get('content'):
print(f"
[AI]: {last_entry.get('content')[:100]}...")
print("
Ping-pong successful!")
return
last_entry_count = len(entries)
[AI]: {last_entry.get('content')[:100]}...")
print("
Timeout waiting for AI response")
Ping-pong successful!")
return
last_entry_count = len(entries)
print("
Timeout waiting for AI response")
if __name__ == "__main__":
main()
main()

View File

@@ -4,35 +4,30 @@ import time
from simulation.sim_base import BaseSimulation, run_sim
class AISettingsSimulation(BaseSimulation):
def run(self):
print("\n--- Running AI Settings Simulation (Gemini Only) ---")
# 1. Verify initial model
provider = self.client.get_value("current_provider")
model = self.client.get_value("current_model")
print(f"[Sim] Initial Provider: {provider}, Model: {model}")
assert provider == "gemini", f"Expected gemini, got {provider}"
# 2. Switch to another Gemini model
other_gemini = "gemini-1.5-flash"
print(f"[Sim] Switching to {other_gemini}...")
self.client.set_value("current_model", other_gemini)
time.sleep(2)
# Verify
new_model = self.client.get_value("current_model")
print(f"[Sim] Updated Model: {new_model}")
assert new_model == other_gemini, f"Expected {other_gemini}, got {new_model}"
# 3. Switch back to flash-lite
target_model = "gemini-2.5-flash-lite"
print(f"[Sim] Switching back to {target_model}...")
self.client.set_value("current_model", target_model)
time.sleep(2)
final_model = self.client.get_value("current_model")
print(f"[Sim] Final Model: {final_model}")
assert final_model == target_model, f"Expected {target_model}, got {final_model}"
def run(self):
print("\n--- Running AI Settings Simulation (Gemini Only) ---")
# 1. Verify initial model
provider = self.client.get_value("current_provider")
model = self.client.get_value("current_model")
print(f"[Sim] Initial Provider: {provider}, Model: {model}")
assert provider == "gemini", f"Expected gemini, got {provider}"
# 2. Switch to another Gemini model
other_gemini = "gemini-1.5-flash"
print(f"[Sim] Switching to {other_gemini}...")
self.client.set_value("current_model", other_gemini)
time.sleep(2)
# Verify
new_model = self.client.get_value("current_model")
print(f"[Sim] Updated Model: {new_model}")
assert new_model == other_gemini, f"Expected {other_gemini}, got {new_model}"
# 3. Switch back to flash-lite
target_model = "gemini-2.5-flash-lite"
print(f"[Sim] Switching back to {target_model}...")
self.client.set_value("current_model", target_model)
time.sleep(2)
final_model = self.client.get_value("current_model")
print(f"[Sim] Final Model: {final_model}")
assert final_model == target_model, f"Expected {target_model}, got {final_model}"
if __name__ == "__main__":
run_sim(AISettingsSimulation)
run_sim(AISettingsSimulation)

View File

@@ -9,80 +9,75 @@ from simulation.workflow_sim import WorkflowSimulator
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
class BaseSimulation:
def __init__(self, client: ApiHookClient = None):
if client is None:
self.client = ApiHookClient()
else:
self.client = client
self.sim = WorkflowSimulator(self.client)
self.project_path = None
def __init__(self, client: ApiHookClient = None):
if client is None:
self.client = ApiHookClient()
else:
self.client = client
self.sim = WorkflowSimulator(self.client)
self.project_path = None
def setup(self, project_name="SimProject"):
print(f"\n[BaseSim] Connecting to GUI...")
if not self.client.wait_for_server(timeout=5):
raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks")
def setup(self, project_name="SimProject"):
print(f"\n[BaseSim] Connecting to GUI...")
if not self.client.wait_for_server(timeout=5):
raise RuntimeError("Could not connect to GUI. Ensure it is running with --enable-test-hooks")
print("[BaseSim] Resetting session...")
self.client.click("btn_reset")
time.sleep(0.5)
git_dir = os.path.abspath(".")
self.project_path = os.path.abspath(f"tests/temp_{project_name.lower()}.toml")
if os.path.exists(self.project_path):
os.remove(self.project_path)
print(f"[BaseSim] Scaffolding Project: {project_name}")
self.sim.setup_new_project(project_name, git_dir, self.project_path)
# Standard test settings
self.client.set_value("auto_add_history", True)
self.client.set_value("current_provider", "gemini")
self.client.set_value("current_model", "gemini-2.5-flash-lite")
time.sleep(0.2)
print("[BaseSim] Resetting session...")
self.client.click("btn_reset")
time.sleep(0.5)
def teardown(self):
if self.project_path and os.path.exists(self.project_path):
# We keep it for debugging if it failed, but usually we'd clean up
# os.remove(self.project_path)
pass
print("[BaseSim] Teardown complete.")
git_dir = os.path.abspath(".")
self.project_path = os.path.abspath(f"tests/temp_{project_name.lower()}.toml")
if os.path.exists(self.project_path):
os.remove(self.project_path)
def get_value(self, tag):
return self.client.get_value(tag)
print(f"[BaseSim] Scaffolding Project: {project_name}")
self.sim.setup_new_project(project_name, git_dir, self.project_path)
# Standard test settings
self.client.set_value("auto_add_history", True)
self.client.set_value("current_provider", "gemini")
self.client.set_value("current_model", "gemini-2.5-flash-lite")
time.sleep(0.2)
def wait_for_event(self, event_type, timeout=5):
return self.client.wait_for_event(event_type, timeout)
def teardown(self):
if self.project_path and os.path.exists(self.project_path):
# We keep it for debugging if it failed, but usually we'd clean up
# os.remove(self.project_path)
pass
print("[BaseSim] Teardown complete.")
def assert_panel_visible(self, panel_tag, msg=None):
# This assumes we have a hook to check panel visibility or just check if an element in it exists
# For now, we'll check if we can get a value from an element that should be in that panel
# or use a specific hook if available.
# Actually, let's just check if get_indicator_state or similar works for generic tags.
pass
def get_value(self, tag):
return self.client.get_value(tag)
def wait_for_event(self, event_type, timeout=5):
return self.client.wait_for_event(event_type, timeout)
def assert_panel_visible(self, panel_tag, msg=None):
# This assumes we have a hook to check panel visibility or just check if an element in it exists
# For now, we'll check if we can get a value from an element that should be in that panel
# or use a specific hook if available.
# Actually, let's just check if get_indicator_state or similar works for generic tags.
pass
def wait_for_element(self, tag, timeout=2):
start = time.time()
while time.time() - start < timeout:
try:
# If we can get_value without error, it's likely there
self.client.get_value(tag)
return True
except:
time.sleep(0.1)
return False
def wait_for_element(self, tag, timeout=2):
start = time.time()
while time.time() - start < timeout:
try:
# If we can get_value without error, it's likely there
self.client.get_value(tag)
return True
except:
time.sleep(0.1)
return False
def run_sim(sim_class):
"""Helper to run a simulation class standalone."""
sim = sim_class()
try:
sim.setup()
sim.run()
print(f"\n[SUCCESS] {sim_class.__name__} completed successfully.")
except Exception as e:
print(f"\n[FAILURE] {sim_class.__name__} failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
sim.teardown()
"""Helper to run a simulation class standalone."""
sim = sim_class()
try:
sim.setup()
sim.run()
print(f"\n[SUCCESS] {sim_class.__name__} completed successfully.")
except Exception as e:
print(f"\n[FAILURE] {sim_class.__name__} failed: {e}")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
sim.teardown()

View File

@@ -4,78 +4,67 @@ import time
from simulation.sim_base import BaseSimulation, run_sim
class ContextSimulation(BaseSimulation):
def run(self):
print("\n--- Running Context & Chat Simulation ---")
# 1. Test Discussion Creation
disc_name = f"TestDisc_{int(time.time())}"
print(f"[Sim] Creating discussion: {disc_name}")
self.sim.create_discussion(disc_name)
time.sleep(1)
# Verify it's in the list
session = self.client.get_session()
# The session structure usually has discussions listed somewhere, or we can check the listbox
# For now, we'll trust the click and check the session update
# 2. Test File Aggregation & Context Refresh
print("[Sim] Testing context refresh and token budget...")
proj = self.client.get_project()
# Add many files to ensure we cross the 1% threshold (~9000 tokens)
import glob
all_py = [os.path.basename(f) for f in glob.glob("*.py")]
for f in all_py:
if f not in proj['project']['files']['paths']:
proj['project']['files']['paths'].append(f)
# Update project via hook
self.client.post_project(proj['project'])
time.sleep(1)
# Trigger MD Only to refresh context and token budget
print("[Sim] Clicking MD Only...")
self.client.click("btn_md_only")
time.sleep(5)
# Verify status
proj_updated = self.client.get_project()
status = self.client.get_value("ai_status")
print(f"[Sim] Status: {status}")
assert "md written" in status, f"Expected 'md written' in status, got {status}"
# Verify token budget
pct = self.client.get_value("token_budget_pct")
current = self.client.get_value("token_budget_current")
print(f"[Sim] Token budget pct: {pct}, current={current}")
# We'll just warn if it's 0 but the MD was written, as it might be a small context
if pct == 0:
print("[Sim] WARNING: token_budget_pct is 0. This might be due to small context or estimation failure.")
# 3. Test Chat Turn
msg = "What is the current date and time? Answer in one sentence."
print(f"[Sim] Sending message: {msg}")
self.sim.run_discussion_turn(msg)
# 4. Verify History
print("[Sim] Verifying history...")
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# We expect at least 2 entries (User and AI)
assert len(entries) >= 2, f"Expected at least 2 entries, found {len(entries)}"
assert entries[-2]['role'] == 'User', "Expected second to last entry to be User"
assert entries[-1]['role'] == 'AI', "Expected last entry to be AI"
print(f"[Sim] AI responded: {entries[-1]['content'][:50]}...")
# 5. Test History Truncation
print("[Sim] Testing history truncation...")
self.sim.truncate_history(1)
time.sleep(1)
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Truncating to 1 pair means 2 entries max (if it's already at 2, it might not change,
# but if we had more, it would).
assert len(entries) <= 2, f"Expected <= 2 entries after truncation, found {len(entries)}"
def run(self):
print("\n--- Running Context & Chat Simulation ---")
# 1. Test Discussion Creation
disc_name = f"TestDisc_{int(time.time())}"
print(f"[Sim] Creating discussion: {disc_name}")
self.sim.create_discussion(disc_name)
time.sleep(1)
# Verify it's in the list
session = self.client.get_session()
# The session structure usually has discussions listed somewhere, or we can check the listbox
# For now, we'll trust the click and check the session update
# 2. Test File Aggregation & Context Refresh
print("[Sim] Testing context refresh and token budget...")
proj = self.client.get_project()
# Add many files to ensure we cross the 1% threshold (~9000 tokens)
import glob
all_py = [os.path.basename(f) for f in glob.glob("*.py")]
for f in all_py:
if f not in proj['project']['files']['paths']:
proj['project']['files']['paths'].append(f)
# Update project via hook
self.client.post_project(proj['project'])
time.sleep(1)
# Trigger MD Only to refresh context and token budget
print("[Sim] Clicking MD Only...")
self.client.click("btn_md_only")
time.sleep(5)
# Verify status
proj_updated = self.client.get_project()
status = self.client.get_value("ai_status")
print(f"[Sim] Status: {status}")
assert "md written" in status, f"Expected 'md written' in status, got {status}"
# Verify token budget
pct = self.client.get_value("token_budget_pct")
current = self.client.get_value("token_budget_current")
print(f"[Sim] Token budget pct: {pct}, current={current}")
# We'll just warn if it's 0 but the MD was written, as it might be a small context
if pct == 0:
print("[Sim] WARNING: token_budget_pct is 0. This might be due to small context or estimation failure.")
# 3. Test Chat Turn
msg = "What is the current date and time? Answer in one sentence."
print(f"[Sim] Sending message: {msg}")
self.sim.run_discussion_turn(msg)
# 4. Verify History
print("[Sim] Verifying history...")
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# We expect at least 2 entries (User and AI)
assert len(entries) >= 2, f"Expected at least 2 entries, found {len(entries)}"
assert entries[-2]['role'] == 'User', "Expected second to last entry to be User"
assert entries[-1]['role'] == 'AI', "Expected last entry to be AI"
print(f"[Sim] AI responded: {entries[-1]['content'][:50]}...")
# 5. Test History Truncation
print("[Sim] Testing history truncation...")
self.sim.truncate_history(1)
time.sleep(1)
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Truncating to 1 pair means 2 entries max (if it's already at 2, it might not change,
# but if we had more, it would).
assert len(entries) <= 2, f"Expected <= 2 entries after truncation, found {len(entries)}"
if __name__ == "__main__":
run_sim(ContextSimulation)
run_sim(ContextSimulation)

View File

@@ -4,76 +4,66 @@ import time
from simulation.sim_base import BaseSimulation, run_sim
class ExecutionSimulation(BaseSimulation):
def setup(self, project_name="SimProject"):
super().setup(project_name)
if os.path.exists("hello.ps1"):
os.remove("hello.ps1")
def setup(self, project_name="SimProject"):
super().setup(project_name)
if os.path.exists("hello.ps1"):
os.remove("hello.ps1")
def run(self):
print("\n--- Running Execution & Modals Simulation ---")
# 1. Trigger script generation (Async so we don't block on the wait loop)
msg = "Create a hello.ps1 script that prints 'Simulation Test' and execute it."
print(f"[Sim] Sending message to trigger script: {msg}")
self.sim.run_discussion_turn_async(msg)
# 2. Monitor for events and text responses
print("[Sim] Monitoring for script approvals and AI text...")
start_wait = time.time()
approved_count = 0
success = False
consecutive_errors = 0
while time.time() - start_wait < 90:
# Check for error status (be lenient with transients)
status = self.client.get_value("ai_status")
if status and status.lower().startswith("error"):
consecutive_errors += 1
if consecutive_errors >= 3:
print(f"[ABORT] Execution simulation aborted due to persistent GUI error: {status}")
break
else:
consecutive_errors = 0
# Check for script confirmation event
ev = self.client.wait_for_event("script_confirmation_required", timeout=1)
if ev:
print(f"[Sim] Approving script #{approved_count+1}: {ev.get('script', '')[:50]}...")
self.client.click("btn_approve_script")
approved_count += 1
# Give more time if we just approved a script
start_wait = time.time()
# Check if AI has responded with text yet
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Debug: log last few roles/content
if entries:
last_few = entries[-3:]
print(f"[Sim] Waiting... Last {len(last_few)} roles: {[e.get('role') for e in last_few]}")
if any(e.get('role') == 'AI' and e.get('content') for e in entries):
# Double check content for our keyword
for e in entries:
if e.get('role') == 'AI' and "Simulation Test" in e.get('content', ''):
print("[Sim] AI responded with expected text. Success.")
success = True
break
if success: break
# Also check if output is already in history via tool role
for e in entries:
if e.get('role') in ['Tool', 'Function'] and "Simulation Test" in e.get('content', ''):
print(f"[Sim] Expected output found in {e.get('role')} results. Success.")
success = True
break
if success: break
time.sleep(1.0)
assert success, "Failed to observe script execution output or AI confirmation text"
print(f"[Sim] Final check: approved {approved_count} scripts.")
def run(self):
print("\n--- Running Execution & Modals Simulation ---")
# 1. Trigger script generation (Async so we don't block on the wait loop)
msg = "Create a hello.ps1 script that prints 'Simulation Test' and execute it."
print(f"[Sim] Sending message to trigger script: {msg}")
self.sim.run_discussion_turn_async(msg)
# 2. Monitor for events and text responses
print("[Sim] Monitoring for script approvals and AI text...")
start_wait = time.time()
approved_count = 0
success = False
consecutive_errors = 0
while time.time() - start_wait < 90:
# Check for error status (be lenient with transients)
status = self.client.get_value("ai_status")
if status and status.lower().startswith("error"):
consecutive_errors += 1
if consecutive_errors >= 3:
print(f"[ABORT] Execution simulation aborted due to persistent GUI error: {status}")
break
else:
consecutive_errors = 0
# Check for script confirmation event
ev = self.client.wait_for_event("script_confirmation_required", timeout=1)
if ev:
print(f"[Sim] Approving script #{approved_count+1}: {ev.get('script', '')[:50]}...")
self.client.click("btn_approve_script")
approved_count += 1
# Give more time if we just approved a script
start_wait = time.time()
# Check if AI has responded with text yet
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Debug: log last few roles/content
if entries:
last_few = entries[-3:]
print(f"[Sim] Waiting... Last {len(last_few)} roles: {[e.get('role') for e in last_few]}")
if any(e.get('role') == 'AI' and e.get('content') for e in entries):
# Double check content for our keyword
for e in entries:
if e.get('role') == 'AI' and "Simulation Test" in e.get('content', ''):
print("[Sim] AI responded with expected text. Success.")
success = True
break
if success: break
# Also check if output is already in history via tool role
for e in entries:
if e.get('role') in ['Tool', 'Function'] and "Simulation Test" in e.get('content', ''):
print(f"[Sim] Expected output found in {e.get('role')} results. Success.")
success = True
break
if success: break
time.sleep(1.0)
assert success, "Failed to observe script execution output or AI confirmation text"
print(f"[Sim] Final check: approved {approved_count} scripts.")
if __name__ == "__main__":
run_sim(ExecutionSimulation)
run_sim(ExecutionSimulation)

View File

@@ -4,44 +4,37 @@ import time
from simulation.sim_base import BaseSimulation, run_sim
class ToolsSimulation(BaseSimulation):
def run(self):
print("\n--- Running Tools Simulation ---")
# 1. Trigger list_directory tool
msg = "List the files in the current directory."
print(f"[Sim] Sending message to trigger tool: {msg}")
self.sim.run_discussion_turn(msg)
# 2. Wait for AI to execute tool
print("[Sim] Waiting for tool execution...")
time.sleep(5) # Give it some time
# 3. Verify Tool Log
# We need a hook to get the tool log
# In gui_2.py, there is _on_tool_log which appends to self._tool_log
# We need a hook to read self._tool_log
# 4. Trigger read_file tool
msg = "Read the first 10 lines of aggregate.py."
print(f"[Sim] Sending message to trigger tool: {msg}")
self.sim.run_discussion_turn(msg)
# 5. Wait and Verify
print("[Sim] Waiting for tool execution...")
time.sleep(5)
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Tool outputs are usually in the conversation history as 'Tool' role or similar
tool_outputs = [e for e in entries if e.get('role') in ['Tool', 'Function']]
print(f"[Sim] Found {len(tool_outputs)} tool outputs in history.")
# Actually in Gemini history, they might be nested.
# But our GUI disc_entries list usually has them as separate entries or
# they are part of the AI turn.
# Let's check if the AI mentions it in its response
last_ai_msg = entries[-1]['content']
print(f"[Sim] Final AI Response: {last_ai_msg[:100]}...")
def run(self):
print("\n--- Running Tools Simulation ---")
# 1. Trigger list_directory tool
msg = "List the files in the current directory."
print(f"[Sim] Sending message to trigger tool: {msg}")
self.sim.run_discussion_turn(msg)
# 2. Wait for AI to execute tool
print("[Sim] Waiting for tool execution...")
time.sleep(5) # Give it some time
# 3. Verify Tool Log
# We need a hook to get the tool log
# In gui_2.py, there is _on_tool_log which appends to self._tool_log
# We need a hook to read self._tool_log
# 4. Trigger read_file tool
msg = "Read the first 10 lines of aggregate.py."
print(f"[Sim] Sending message to trigger tool: {msg}")
self.sim.run_discussion_turn(msg)
# 5. Wait and Verify
print("[Sim] Waiting for tool execution...")
time.sleep(5)
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
# Tool outputs are usually in the conversation history as 'Tool' role or similar
tool_outputs = [e for e in entries if e.get('role') in ['Tool', 'Function']]
print(f"[Sim] Found {len(tool_outputs)} tool outputs in history.")
# Actually in Gemini history, they might be nested.
# But our GUI disc_entries list usually has them as separate entries or
# they are part of the AI turn.
# Let's check if the AI mentions it in its response
last_ai_msg = entries[-1]['content']
print(f"[Sim] Final AI Response: {last_ai_msg[:100]}...")
if __name__ == "__main__":
run_sim(ToolsSimulation)
run_sim(ToolsSimulation)

View File

@@ -3,48 +3,45 @@ import random
import ai_client
class UserSimAgent:
def __init__(self, hook_client, model="gemini-2.5-flash-lite"):
self.hook_client = hook_client
self.model = model
self.system_prompt = (
"You are a software engineer testing an AI coding assistant called 'Manual Slop'. "
"You want to build a small Python project and verify the assistant's capabilities. "
"Keep your responses concise and human-like. "
"Do not use markdown blocks for your main message unless you are providing code."
)
def __init__(self, hook_client, model="gemini-2.5-flash-lite"):
self.hook_client = hook_client
self.model = model
self.system_prompt = (
"You are a software engineer testing an AI coding assistant called 'Manual Slop'. "
"You want to build a small Python project and verify the assistant's capabilities. "
"Keep your responses concise and human-like. "
"Do not use markdown blocks for your main message unless you are providing code."
)
def generate_response(self, conversation_history):
"""
def generate_response(self, conversation_history):
"""
Generates a human-like response based on the conversation history.
conversation_history: list of dicts with 'role' and 'content'
"""
# Format history for ai_client
# ai_client expects md_content and user_message.
# It handles its own internal history.
# We want the 'User AI' to have context of what the 'Assistant AI' said.
# For now, let's just use the last message from Assistant as the prompt.
last_ai_msg = ""
for entry in reversed(conversation_history):
if entry.get('role') == 'AI':
last_ai_msg = entry.get('content', '')
break
# We need to set a custom system prompt for the User Simulator
try:
ai_client.set_custom_system_prompt(self.system_prompt)
# We'll use a blank md_content for now as the 'User' doesn't need to read its own files
# via the same mechanism, but we could provide it if needed.
response = ai_client.send(md_content="", user_message=last_ai_msg)
finally:
ai_client.set_custom_system_prompt("")
return response
# Format history for ai_client
# ai_client expects md_content and user_message.
# It handles its own internal history.
# We want the 'User AI' to have context of what the 'Assistant AI' said.
# For now, let's just use the last message from Assistant as the prompt.
last_ai_msg = ""
for entry in reversed(conversation_history):
if entry.get('role') == 'AI':
last_ai_msg = entry.get('content', '')
break
# We need to set a custom system prompt for the User Simulator
try:
ai_client.set_custom_system_prompt(self.system_prompt)
# We'll use a blank md_content for now as the 'User' doesn't need to read its own files
# via the same mechanism, but we could provide it if needed.
response = ai_client.send(md_content="", user_message=last_ai_msg)
finally:
ai_client.set_custom_system_prompt("")
return response
def perform_action_with_delay(self, action_func, *args, **kwargs):
"""
def perform_action_with_delay(self, action_func, *args, **kwargs):
"""
Executes an action with a human-like delay.
"""
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
return action_func(*args, **kwargs)
delay = random.uniform(0.5, 2.0)
time.sleep(delay)
return action_func(*args, **kwargs)

View File

@@ -4,84 +4,80 @@ from api_hook_client import ApiHookClient
from simulation.user_agent import UserSimAgent
class WorkflowSimulator:
def __init__(self, hook_client: ApiHookClient):
self.client = hook_client
self.user_agent = UserSimAgent(hook_client)
def __init__(self, hook_client: ApiHookClient):
self.client = hook_client
self.user_agent = UserSimAgent(hook_client)
def setup_new_project(self, name, git_dir, project_path=None):
print(f"Setting up new project: {name}")
if project_path:
self.client.click("btn_project_new_automated", user_data=project_path)
else:
self.client.click("btn_project_new")
time.sleep(1)
self.client.set_value("project_git_dir", git_dir)
self.client.click("btn_project_save")
time.sleep(1)
def setup_new_project(self, name, git_dir, project_path=None):
print(f"Setting up new project: {name}")
if project_path:
self.client.click("btn_project_new_automated", user_data=project_path)
else:
self.client.click("btn_project_new")
time.sleep(1)
self.client.set_value("project_git_dir", git_dir)
self.client.click("btn_project_save")
time.sleep(1)
def create_discussion(self, name):
print(f"Creating discussion: {name}")
self.client.set_value("disc_new_name_input", name)
self.client.click("btn_disc_create")
time.sleep(1)
def create_discussion(self, name):
print(f"Creating discussion: {name}")
self.client.set_value("disc_new_name_input", name)
self.client.click("btn_disc_create")
time.sleep(1)
def switch_discussion(self, name):
print(f"Switching to discussion: {name}")
self.client.select_list_item("disc_listbox", name)
time.sleep(1)
def switch_discussion(self, name):
print(f"Switching to discussion: {name}")
self.client.select_list_item("disc_listbox", name)
time.sleep(1)
def load_prior_log(self):
print("Loading prior log")
self.client.click("btn_load_log")
# This usually opens a file dialog which we can't easily automate from here
# without more hooks, but we can verify the button click.
time.sleep(1)
def load_prior_log(self):
print("Loading prior log")
self.client.click("btn_load_log")
# This usually opens a file dialog which we can't easily automate from here
# without more hooks, but we can verify the button click.
time.sleep(1)
def truncate_history(self, pairs):
print(f"Truncating history to {pairs} pairs")
self.client.set_value("disc_truncate_pairs", pairs)
self.client.click("btn_disc_truncate")
time.sleep(1)
def truncate_history(self, pairs):
print(f"Truncating history to {pairs} pairs")
self.client.set_value("disc_truncate_pairs", pairs)
self.client.click("btn_disc_truncate")
time.sleep(1)
def run_discussion_turn(self, user_message=None):
self.run_discussion_turn_async(user_message)
# Wait for AI
return self.wait_for_ai_response()
def run_discussion_turn(self, user_message=None):
self.run_discussion_turn_async(user_message)
# Wait for AI
return self.wait_for_ai_response()
def run_discussion_turn_async(self, user_message=None):
if user_message is None:
# Generate from AI history
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
user_message = self.user_agent.generate_response(entries)
def run_discussion_turn_async(self, user_message=None):
if user_message is None:
# Generate from AI history
session = self.client.get_session()
entries = session.get('session', {}).get('entries', [])
user_message = self.user_agent.generate_response(entries)
print(f"\n[USER]: {user_message}")
self.client.set_value("ai_input", user_message)
self.client.click("btn_gen_send")
print(f"\n[USER]: {user_message}")
self.client.set_value("ai_input", user_message)
self.client.click("btn_gen_send")
def wait_for_ai_response(self, timeout=60):
print("Waiting for AI response...", end="", flush=True)
start_time = time.time()
last_count = len(self.client.get_session().get('session', {}).get('entries', []))
while time.time() - start_time < timeout:
# Check for error status first
status = self.client.get_value("ai_status")
if status and status.lower().startswith("error"):
print(f"\n[ABORT] GUI reported error status: {status}")
return {"role": "AI", "content": f"ERROR: {status}"}
time.sleep(1)
print(".", end="", flush=True)
entries = self.client.get_session().get('session', {}).get('entries', [])
if len(entries) > last_count:
last_entry = entries[-1]
if last_entry.get('role') == 'AI' and last_entry.get('content'):
content = last_entry.get('content')
print(f"\n[AI]: {content[:100]}...")
if "error" in content.lower() or "blocked" in content.lower():
print(f"[WARN] AI response appears to contain an error message.")
return last_entry
print("\nTimeout waiting for AI")
return None
def wait_for_ai_response(self, timeout=60):
print("Waiting for AI response...", end="", flush=True)
start_time = time.time()
last_count = len(self.client.get_session().get('session', {}).get('entries', []))
while time.time() - start_time < timeout:
# Check for error status first
status = self.client.get_value("ai_status")
if status and status.lower().startswith("error"):
print(f"\n[ABORT] GUI reported error status: {status}")
return {"role": "AI", "content": f"ERROR: {status}"}
time.sleep(1)
print(".", end="", flush=True)
entries = self.client.get_session().get('session', {}).get('entries', [])
if len(entries) > last_count:
last_entry = entries[-1]
if last_entry.get('role') == 'AI' and last_entry.get('content'):
content = last_entry.get('content')
print(f"\n[AI]: {content[:100]}...")
if "error" in content.lower() or "blocked" in content.lower():
print(f"[WARN] AI response appears to contain an error message.")
return last_entry
print("\nTimeout waiting for AI")
return None