feat(api): implement phase 4 headless refinement and verification

This commit is contained in:
2026-03-11 23:17:57 -04:00
parent 930b833055
commit 036c2f360a
3 changed files with 304 additions and 181 deletions

View File

@@ -81,201 +81,207 @@ def _serialize_for_api(obj: Any) -> Any:
class HookHandler(BaseHTTPRequestHandler):
"""Handles incoming HTTP requests for the API hooks."""
def do_GET(self) -> None:
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == "/api/project":
from src import project_manager
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == "/api/session":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries", [])
if lock:
with lock: entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
elif self.path == "/api/performance":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
metrics = {}
perf = _get_app_attr(app, "perf_monitor")
if perf: metrics = perf.get_metrics()
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
elif self.path == "/api/events":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
events = []
if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, "_api_event_queue")
try:
app = self.server.app
session_logger.log_api_hook("GET", self.path, "")
if self.path == "/status":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
elif self.path == "/api/project":
from src import project_manager
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
flat = project_manager.flat_config(_get_app_attr(app, "project"))
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
elif self.path == "/api/session":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
lock = _get_app_attr(app, "_disc_entries_lock")
entries = _get_app_attr(app, "disc_entries", [])
if lock:
with lock:
with lock: entries_snapshot = list(entries)
else:
entries_snapshot = list(entries)
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
elif self.path == "/api/performance":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
metrics = {}
perf = _get_app_attr(app, "perf_monitor")
if perf: metrics = perf.get_metrics()
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
elif self.path == "/api/events":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
events = []
if _has_app_attr(app, "_api_event_queue"):
lock = _get_app_attr(app, "_api_event_queue_lock")
queue = _get_app_attr(app, "_api_event_queue")
if lock:
with lock:
events = list(queue)
queue.clear()
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path.startswith("/api/gui/value/"):
field_tag = self.path.split("/")[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
gettable = _get_app_attr(app, "_gettable_fields", {})
combined = {**settable, **gettable}
if field_tag in combined:
attr = combined[field_tag]
result["value"] = _get_app_attr(app, attr, None)
else:
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
sys.stderr.flush()
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
events = list(queue)
queue.clear()
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
elif self.path.startswith("/api/gui/value/"):
field_tag = self.path.split("/")[-1]
event = threading.Event()
result = {"value": None}
def get_val():
try:
settable = _get_app_attr(app, "_settable_fields", {})
gettable = _get_app_attr(app, "_gettable_fields", {})
combined = {**settable, **gettable}
if field_tag in combined:
attr = combined[field_tag]
result["value"] = _get_app_attr(app, attr, None)
else:
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
sys.stderr.flush()
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
if event.wait(timeout=10):
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/mma_status":
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/diagnostics":
event = threading.Event()
result = {}
def check_all():
try:
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/state':
event = threading.Event()
result = {}
def get_state():
try:
gettable = _get_app_attr(app, "_gettable_fields", {})
for key, attr in gettable.items():
val = _get_app_attr(app, attr, None)
result[key] = _serialize_for_api(val)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
if event.wait(timeout=10):
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/mma/workers":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/mma_status":
event = threading.Event()
result = {}
def get_mma():
try:
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
result["active_tier"] = _get_app_attr(app, "active_tier", None)
at = _get_app_attr(app, "active_track", None)
result["active_track"] = at.id if hasattr(at, "id") else at
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
result["pending_spawn"] = result["pending_mma_spawn_approval"]
result["tracks"] = _get_app_attr(app, "tracks", [])
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
if event.wait(timeout=10):
mma_streams = _get_app_attr(app, "mma_streams", {})
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
elif self.path == "/api/context/state":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == "/api/gui/diagnostics":
event = threading.Event()
result = {}
def check_all():
try:
status = _get_app_attr(app, "ai_status", "idle")
result["thinking"] = status in ["sending...", "running powershell..."]
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
if event.wait(timeout=10):
files = _get_app_attr(app, "files", [])
screenshots = _get_app_attr(app, "screenshots", [])
self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8"))
elif self.path == "/api/metrics/financial":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
else:
self.send_response(504)
self.end_headers()
elif self.path == '/api/gui/state':
event = threading.Event()
result = {}
def get_state():
try:
gettable = _get_app_attr(app, "_gettable_fields", {})
for key, attr in gettable.items():
val = _get_app_attr(app, attr, None)
result[key] = _serialize_for_api(val)
finally: event.set()
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
tasks = _get_app_attr(app, "_pending_gui_tasks")
if lock and tasks is not None:
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
if event.wait(timeout=10):
usage = _get_app_attr(app, "mma_tier_usage", {})
metrics = {}
for tier, data in usage.items():
model = data.get("model", "")
in_t = data.get("input", 0)
out_t = data.get("output", 0)
cost = cost_tracker.estimate_cost(model, in_t, out_t)
metrics[tier] = {**data, "estimated_cost": cost}
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
elif self.path == "/api/system/telemetry":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
self.wfile.write(json.dumps(result).encode("utf-8"))
threads = [t.name for t in threading.enumerate()]
queue_size = 0
if _has_app_attr(app, "_api_event_queue"):
queue = _get_app_attr(app, "_api_event_queue")
if queue: queue_size = len(queue)
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
else:
self.send_response(504)
self.send_response(404)
self.end_headers()
elif self.path == "/api/mma/workers":
self.send_response(200)
except Exception as e:
self.send_response(500)
self.send_header("Content-Type", "application/json")
self.end_headers()
mma_streams = _get_app_attr(app, "mma_streams", {})
self.wfile.write(json.dumps({"workers": _serialize_for_api(mma_streams)}).encode("utf-8"))
elif self.path == "/api/context/state":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
files = _get_app_attr(app, "files", [])
screenshots = _get_app_attr(app, "screenshots", [])
self.wfile.write(json.dumps({"files": files, "screenshots": screenshots}).encode("utf-8"))
elif self.path == "/api/metrics/financial":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
usage = _get_app_attr(app, "mma_tier_usage", {})
metrics = {}
for tier, data in usage.items():
model = data.get("model", "")
in_t = data.get("input", 0)
out_t = data.get("output", 0)
cost = cost_tracker.estimate_cost(model, in_t, out_t)
metrics[tier] = {**data, "estimated_cost": cost}
self.wfile.write(json.dumps({"financial": metrics}).encode("utf-8"))
elif self.path == "/api/system/telemetry":
self.send_response(200)
self.send_header("Content-Type", "application/json")
self.end_headers()
threads = [t.name for t in threading.enumerate()]
queue_size = 0
if _has_app_attr(app, "_api_event_queue"):
queue = _get_app_attr(app, "_api_event_queue")
if queue: queue_size = len(queue)
self.wfile.write(json.dumps({"threads": threads, "event_queue_size": queue_size}).encode("utf-8"))
else:
self.send_response(404)
self.end_headers()
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
def do_POST(self) -> None:
app = self.server.app