526 lines
21 KiB
Python
526 lines
21 KiB
Python
from __future__ import annotations
|
|
import json
|
|
import threading
|
|
import uuid
|
|
import sys
|
|
from http.server import ThreadingHTTPServer, BaseHTTPRequestHandler
|
|
from typing import Any
|
|
import logging
|
|
from src import session_logger
|
|
"""
|
|
API Hooks - REST API for external automation and state inspection.
|
|
|
|
This module implements the HookServer, which exposes internal application state to external HTTP requests on port 8999 using Python's
|
|
ThreadingHTTPServer. All endpoints are thread-safe and reads that pass through lock-guarded lists,
|
|
while stateful reads use the GUI thread trampoline pattern.
|
|
|
|
Architecture:
|
|
- HookServer: ThreadingHTTPServer with app reference
|
|
- HookHandler: BaseHTTPRequestHandler per request
|
|
- Request handling uses trampoline pattern for GUI state reads
|
|
- GUI Thread Trampoline: Create threading.Event + result dict
|
|
- Push callback to `_pending_gui_tasks`
|
|
- Wait for event (timeout)
|
|
- Return result as JSON
|
|
|
|
Thread Safety:
|
|
- All reads use lock-protected lists
|
|
- All state mutations happen on the GUI thread
|
|
- The module does to maintain separation between App and AppController
|
|
|
|
Configuration:
|
|
- `--enable-test-hooks`: Required for Hook API to be available
|
|
- `gemini_cli` provider: Hook API is automatically available for synchronous HITL
|
|
|
|
See Also:
|
|
- docs/guide_tools.md for full API reference
|
|
- api_hook_client.py for the client implementation
|
|
"""
|
|
|
|
def _get_app_attr(app: Any, name: str, default: Any = None) -> Any:
|
|
if hasattr(app, name):
|
|
val = getattr(app, name)
|
|
return val
|
|
if hasattr(app, 'controller') and hasattr(app.controller, name):
|
|
val = getattr(app.controller, name)
|
|
return val
|
|
return default
|
|
|
|
def _has_app_attr(app: Any, name: str) -> bool:
|
|
if hasattr(app, name): return True
|
|
if hasattr(app, 'controller') and hasattr(app.controller, name): return True
|
|
return False
|
|
|
|
def _set_app_attr(app: Any, name: str, value: Any) -> None:
|
|
if hasattr(app, name):
|
|
setattr(app, name, value)
|
|
elif hasattr(app, 'controller'):
|
|
setattr(app.controller, name, value)
|
|
else:
|
|
setattr(app, name, value)
|
|
|
|
class HookServerInstance(ThreadingHTTPServer):
|
|
"""Custom HTTPServer that carries a reference to the main App instance."""
|
|
def __init__(self, server_address: tuple[str, int], RequestHandlerClass: type, app: Any) -> None:
|
|
super().__init__(server_address, RequestHandlerClass)
|
|
self.app = app
|
|
|
|
def _serialize_for_api(obj: Any) -> Any:
|
|
if hasattr(obj, "to_dict"):
|
|
return obj.to_dict()
|
|
if isinstance(obj, list):
|
|
return [_serialize_for_api(x) for x in obj]
|
|
if isinstance(obj, dict):
|
|
return {k: _serialize_for_api(v) for k, v in obj.items()}
|
|
return obj
|
|
|
|
class HookHandler(BaseHTTPRequestHandler):
|
|
"""Handles incoming HTTP requests for the API hooks."""
|
|
def do_GET(self) -> None:
|
|
app = self.server.app
|
|
session_logger.log_api_hook("GET", self.path, "")
|
|
if self.path == "/status":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
elif self.path == "/api/project":
|
|
from src import project_manager
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
flat = project_manager.flat_config(_get_app_attr(app, "project"))
|
|
self.wfile.write(json.dumps({"project": flat}).encode("utf-8"))
|
|
elif self.path == "/api/session":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
lock = _get_app_attr(app, "_disc_entries_lock")
|
|
entries = _get_app_attr(app, "disc_entries", [])
|
|
if lock:
|
|
with lock: entries_snapshot = list(entries)
|
|
else:
|
|
entries_snapshot = list(entries)
|
|
self.wfile.write(json.dumps({"session": {"entries": entries_snapshot}}).encode("utf-8"))
|
|
elif self.path == "/api/performance":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
metrics = {}
|
|
perf = _get_app_attr(app, "perf_monitor")
|
|
if perf: metrics = perf.get_metrics()
|
|
self.wfile.write(json.dumps({"performance": metrics}).encode("utf-8"))
|
|
elif self.path == "/api/events":
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
events = []
|
|
if _has_app_attr(app, "_api_event_queue"):
|
|
lock = _get_app_attr(app, "_api_event_queue_lock")
|
|
queue = _get_app_attr(app, "_api_event_queue")
|
|
if lock:
|
|
with lock:
|
|
events = list(queue)
|
|
queue.clear()
|
|
else:
|
|
events = list(queue)
|
|
queue.clear()
|
|
self.wfile.write(json.dumps({"events": events}).encode("utf-8"))
|
|
elif self.path.startswith("/api/gui/value/"):
|
|
field_tag = self.path.split("/")[-1]
|
|
event = threading.Event()
|
|
result = {"value": None}
|
|
def get_val():
|
|
try:
|
|
settable = _get_app_attr(app, "_settable_fields", {})
|
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
|
combined = {**settable, **gettable}
|
|
if field_tag in combined:
|
|
attr = combined[field_tag]
|
|
result["value"] = _get_app_attr(app, attr, None)
|
|
else:
|
|
sys.stderr.write(f"[DEBUG] Hook API: field {field_tag} not found in settable or gettable\n")
|
|
sys.stderr.flush()
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/gui/mma_status":
|
|
event = threading.Event()
|
|
result = {}
|
|
def get_mma():
|
|
try:
|
|
result["mma_status"] = _get_app_attr(app, "mma_status", "idle")
|
|
result["ai_status"] = _get_app_attr(app, "ai_status", "idle")
|
|
result["active_tier"] = _get_app_attr(app, "active_tier", None)
|
|
at = _get_app_attr(app, "active_track", None)
|
|
result["active_track"] = at.id if hasattr(at, "id") else at
|
|
result["active_tickets"] = _get_app_attr(app, "active_tickets", [])
|
|
result["mma_step_mode"] = _get_app_attr(app, "mma_step_mode", False)
|
|
result["pending_tool_approval"] = _get_app_attr(app, "_pending_ask_dialog", False)
|
|
result["pending_script_approval"] = _get_app_attr(app, "_pending_dialog", None) is not None
|
|
result["pending_mma_step_approval"] = _get_app_attr(app, "_pending_mma_approval", None) is not None
|
|
result["pending_mma_spawn_approval"] = _get_app_attr(app, "_pending_mma_spawn", None) is not None
|
|
result["pending_approval"] = result["pending_mma_step_approval"] or result["pending_tool_approval"]
|
|
result["pending_spawn"] = result["pending_mma_spawn_approval"]
|
|
result["tracks"] = _get_app_attr(app, "tracks", [])
|
|
result["proposed_tracks"] = _get_app_attr(app, "proposed_tracks", [])
|
|
result["mma_streams"] = _get_app_attr(app, "mma_streams", {})
|
|
result["mma_tier_usage"] = _get_app_attr(app, "mma_tier_usage", {})
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_mma})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/gui/diagnostics":
|
|
event = threading.Event()
|
|
result = {}
|
|
def check_all():
|
|
try:
|
|
status = _get_app_attr(app, "ai_status", "idle")
|
|
result["thinking"] = status in ["sending...", "running powershell..."]
|
|
result["live"] = status in ["running powershell...", "fetching url...", "searching web...", "powershell done, awaiting AI..."]
|
|
result["prior"] = _get_app_attr(app, "is_viewing_prior_session", False)
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": check_all})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == '/api/gui/state':
|
|
event = threading.Event()
|
|
result = {}
|
|
def get_state():
|
|
try:
|
|
gettable = _get_app_attr(app, "_gettable_fields", {})
|
|
for key, attr in gettable.items():
|
|
val = _get_app_attr(app, attr, None)
|
|
result[key] = _serialize_for_api(val)
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_state})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
|
|
def do_POST(self) -> None:
|
|
app = self.server.app
|
|
content_length = int(self.headers.get("Content-Length", 0))
|
|
body = self.rfile.read(content_length)
|
|
body_str = body.decode("utf-8") if body else ""
|
|
session_logger.log_api_hook("POST", self.path, body_str)
|
|
try:
|
|
data = json.loads(body_str) if body_str else {}
|
|
if self.path == "/api/project":
|
|
project = _get_app_attr(app, "project")
|
|
_set_app_attr(app, "project", data.get("project", project))
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
|
|
elif self.path.startswith("/api/confirm/"):
|
|
action_id = self.path.split("/")[-1]
|
|
approved = data.get("approved", False)
|
|
resolve_func = _get_app_attr(app, "resolve_pending_action")
|
|
if resolve_func:
|
|
success = resolve_func(action_id, approved)
|
|
if success:
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(500)
|
|
self.end_headers()
|
|
elif self.path == "/api/session":
|
|
lock = _get_app_attr(app, "_disc_entries_lock")
|
|
entries = _get_app_attr(app, "disc_entries")
|
|
new_entries = data.get("session", {}).get("entries", entries)
|
|
if lock:
|
|
with lock: _set_app_attr(app, "disc_entries", new_entries)
|
|
else:
|
|
_set_app_attr(app, "disc_entries", new_entries)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "updated"}).encode("utf-8"))
|
|
elif self.path == "/api/gui":
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append(data)
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "queued"}).encode("utf-8"))
|
|
elif self.path == "/api/gui/value":
|
|
field_tag = data.get("field")
|
|
event = threading.Event()
|
|
result = {"value": None}
|
|
def get_val():
|
|
try:
|
|
settable = _get_app_attr(app, "_settable_fields", {})
|
|
if field_tag in settable:
|
|
attr = settable[field_tag]
|
|
result["value"] = _get_app_attr(app, attr, None)
|
|
finally: event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": get_val})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/trigger":
|
|
sys.stderr.write(f"[DEBUG] /api/patch/trigger called with data: {data}\n")
|
|
sys.stderr.flush()
|
|
patch_text = data.get("patch_text", "")
|
|
file_paths = data.get("file_paths", [])
|
|
sys.stderr.write(f"[DEBUG] patch_text length: {len(patch_text)}, files: {file_paths}\n")
|
|
sys.stderr.flush()
|
|
event = threading.Event()
|
|
result = {"status": "queued"}
|
|
def trigger_patch():
|
|
try:
|
|
sys.stderr.write(f"[DEBUG] trigger_patch callback executing...\n")
|
|
sys.stderr.flush()
|
|
app._pending_patch_text = patch_text
|
|
app._pending_patch_files = file_paths
|
|
app._show_patch_modal = True
|
|
sys.stderr.write(f"[DEBUG] Set patch modal: show={app._show_patch_modal}, text={'yes' if app._pending_patch_text else 'no'}\n")
|
|
sys.stderr.flush()
|
|
result["status"] = "ok"
|
|
except Exception as e:
|
|
sys.stderr.write(f"[DEBUG] trigger_patch error: {e}\n")
|
|
sys.stderr.flush()
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": trigger_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/apply":
|
|
event = threading.Event()
|
|
result = {"status": "done"}
|
|
def apply_patch():
|
|
try:
|
|
if hasattr(app, "_apply_pending_patch"):
|
|
app._apply_pending_patch()
|
|
else:
|
|
result["status"] = "no_method"
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": apply_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/reject":
|
|
event = threading.Event()
|
|
result = {"status": "done"}
|
|
def reject_patch():
|
|
try:
|
|
app._show_patch_modal = False
|
|
app._pending_patch_text = None
|
|
app._pending_patch_files = []
|
|
except Exception as e:
|
|
result["status"] = "error"
|
|
result["error"] = str(e)
|
|
finally:
|
|
event.set()
|
|
lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if lock and tasks is not None:
|
|
with lock: tasks.append({"action": "custom_callback", "callback": reject_patch})
|
|
if event.wait(timeout=10):
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps(result).encode("utf-8"))
|
|
else:
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/patch/status":
|
|
sys.stderr.write(f"[DEBUG] /api/patch/status called\n")
|
|
sys.stderr.flush()
|
|
show_modal = _get_app_attr(app, "_show_patch_modal", False)
|
|
patch_text = _get_app_attr(app, "_pending_patch_text", None)
|
|
patch_files = _get_app_attr(app, "_pending_patch_files", [])
|
|
sys.stderr.write(f"[DEBUG] patch status: show_modal={show_modal}, patch_text={patch_text is not None}, files={patch_files}\n")
|
|
sys.stderr.flush()
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({
|
|
"show_modal": show_modal,
|
|
"patch_text": patch_text,
|
|
"file_paths": patch_files
|
|
}).encode("utf-8"))
|
|
elif self.path == "/api/ask":
|
|
request_id = str(uuid.uuid4())
|
|
event = threading.Event()
|
|
pending_asks = _get_app_attr(app, "_pending_asks")
|
|
if pending_asks is None:
|
|
pending_asks = {}
|
|
_set_app_attr(app, "_pending_asks", pending_asks)
|
|
ask_responses = _get_app_attr(app, "_ask_responses")
|
|
if ask_responses is None:
|
|
ask_responses = {}
|
|
_set_app_attr(app, "_ask_responses", ask_responses)
|
|
pending_asks[request_id] = event
|
|
event_queue_lock = _get_app_attr(app, "_api_event_queue_lock")
|
|
event_queue = _get_app_attr(app, "_api_event_queue")
|
|
if event_queue is not None:
|
|
if event_queue_lock:
|
|
with event_queue_lock: event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
|
|
else:
|
|
event_queue.append({"type": "ask_received", "request_id": request_id, "data": data})
|
|
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if gui_tasks is not None:
|
|
if gui_tasks_lock:
|
|
with gui_tasks_lock: gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
|
|
else:
|
|
gui_tasks.append({"type": "ask", "request_id": request_id, "data": data})
|
|
if event.wait(timeout=60.0):
|
|
response_data = ask_responses.get(request_id)
|
|
if request_id in ask_responses: del ask_responses[request_id]
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok", "response": response_data}).encode("utf-8"))
|
|
else:
|
|
if request_id in pending_asks: del pending_asks[request_id]
|
|
self.send_response(504)
|
|
self.end_headers()
|
|
elif self.path == "/api/ask/respond":
|
|
request_id = data.get("request_id")
|
|
response_data = data.get("response")
|
|
pending_asks = _get_app_attr(app, "_pending_asks")
|
|
ask_responses = _get_app_attr(app, "_ask_responses")
|
|
if request_id and pending_asks and request_id in pending_asks:
|
|
ask_responses[request_id] = response_data
|
|
event = pending_asks[request_id]
|
|
event.set()
|
|
del pending_asks[request_id]
|
|
gui_tasks_lock = _get_app_attr(app, "_pending_gui_tasks_lock")
|
|
gui_tasks = _get_app_attr(app, "_pending_gui_tasks")
|
|
if gui_tasks is not None:
|
|
if gui_tasks_lock:
|
|
with gui_tasks_lock: gui_tasks.append({"action": "clear_ask", "request_id": request_id})
|
|
else:
|
|
gui_tasks.append({"action": "clear_ask", "request_id": request_id})
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"status": "ok"}).encode("utf-8"))
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
else:
|
|
self.send_response(404)
|
|
self.end_headers()
|
|
except Exception as e:
|
|
self.send_response(500)
|
|
self.send_header("Content-Type", "application/json")
|
|
self.end_headers()
|
|
self.wfile.write(json.dumps({"error": str(e)}).encode("utf-8"))
|
|
|
|
|
|
def log_message(self, format: str, *args: Any) -> None:
|
|
logging.info("Hook API: " + format % args)
|
|
|
|
class HookServer:
|
|
def __init__(self, app: Any, port: int = 8999) -> None:
|
|
self.app = app
|
|
self.port = port
|
|
self.server = None
|
|
self.thread = None
|
|
|
|
def start(self) -> None:
|
|
if self.thread and self.thread.is_alive():
|
|
return
|
|
is_gemini_cli = _get_app_attr(self.app, 'current_provider', '') == 'gemini_cli'
|
|
if not _get_app_attr(self.app, 'test_hooks_enabled', False) and not is_gemini_cli:
|
|
return
|
|
if not _has_app_attr(self.app, '_pending_gui_tasks'): _set_app_attr(self.app, '_pending_gui_tasks', [])
|
|
if not _has_app_attr(self.app, '_pending_gui_tasks_lock'): _set_app_attr(self.app, '_pending_gui_tasks_lock', threading.Lock())
|
|
if not _has_app_attr(self.app, '_pending_asks'): _set_app_attr(self.app, '_pending_asks', {})
|
|
if not _has_app_attr(self.app, '_ask_responses'): _set_app_attr(self.app, '_ask_responses', {})
|
|
if not _has_app_attr(self.app, '_api_event_queue'): _set_app_attr(self.app, '_api_event_queue', [])
|
|
if not _has_app_attr(self.app, '_api_event_queue_lock'): _set_app_attr(self.app, '_api_event_queue_lock', threading.Lock())
|
|
self.server = HookServerInstance(('127.0.0.1', self.port), HookHandler, self.app)
|
|
self.thread = threading.Thread(target=self.server.serve_forever, daemon=True)
|
|
self.thread.start()
|
|
logging.info(f"Hook server started on port {self.port}")
|
|
|
|
def stop(self) -> None:
|
|
if self.server:
|
|
self.server.shutdown()
|
|
self.server.server_close()
|
|
if self.thread:
|
|
self.thread.join()
|
|
logging.info("Hook server stopped")
|