Private
Public Access
0
0
Files
manual_slop/src/openai_compatible.py
T
ed e8b774d664 refactor(openai_compatible,orchestrator_pm): convert dict[str, Any] to typed (Phase 7 partial)
Phase 7: Eliminate Any + dict[str, Any] from internal signatures (FR6) - PARTIAL
Before: 11 dict[str, Any] param sites
After:  7 (4 converted; 7 remain as legitimate boundary params)
Delta:  -4 sites (cumulative)

Specific changes:
- src/openai_compatible.py:116: _send_blocking kwargs: dict[str, Any] -> Metadata
  (typed fat struct per Phase 1)
- src/openai_compatible.py:133: _send_streaming kwargs: dict[str, Any] -> Metadata
- src/orchestrator_pm.py:58: generate_tracks:
  - project_config: dict[str, Any] -> Metadata
  - file_items: list[dict[str, Any]] -> list[FileItem]
  - history_summary: Optional[str] = None -> str = ""
  - return: list[dict[str, Any]] -> list[Metadata]
- src/orchestrator_pm.py imports: FileItem (from src.models),
  Metadata (from src.type_aliases); removed unused 'Optional' from typing

Verification:
- audit_weak_types --strict: OK (107 <= 112 baseline)
- py_check_syntax: OK on all changed files
- 20 tests pass (test_openai_compatible: 6, test_orchestration_logic +
  test_orchestrator_pm + test_orchestrator_pm_history: 14)

REMAINING ~7 dict[str, Any] sites (all BOUNDARY inputs from wire format):
- src/mcp_client.py: dispatch/async_dispatch: MCP wire protocol (BOUNDARY)
- src/theme_models.py: from_dict: TOML wire format (BOUNDARY)
- src/log_registry.py: from_dict: session JSON wire (BOUNDARY)
- src/session_logger.py: log_comms: comms JSON wire (BOUNDARY)
- src/type_aliases.py: Metadata.from_dict: boundary entry (BOUNDARY)
- src/hot_reloader.py: restore_state: snapshot deserialization (BOUNDARY-ish)

Per spec.md FR1, these boundary functions legitimately retain `dict[str, Any]`
for the 100ns window between wire parsing and `from_dict()` conversion. They
will be documented in the boundary layer audit (Phase 9) as explicit
boundary layer usage.

REMAINING ~60 Any param sites (large scope; deferred):
- src/api_hooks.py: 10
- src/app_controller.py: 9
- src/ai_client.py: 8
- src/command_palette.py: 4
- src/hot_reloader.py: 4
- src/imgui_scopes.py: 4
- src/api_hooks_helpers.py: 3
- src/events.py: 3
- src/gui_2.py: 3
- src/openai_compatible.py: 3
- src/api_hook_client.py: 2
- src/commands.py: 1
- src/log_registry.py: 1
- src/mcp_client.py: 1
- src/models.py: 1
- src/performance_monitor.py: 1
- src/project_manager.py: 1
- src/type_aliases.py: 1
2026-06-26 05:18:59 -04:00

182 lines
6.0 KiB
Python

"""OpenAI-compatible API client for the Manual Slop ai_client layer.
Provides `send_openai_compatible(client, request, *, capabilities)` which
calls any OpenAI-compatible chat completion endpoint and returns a
`NormalizedResponse` (re-exported from src.openai_schemas).
CONVENTION: 1-space indentation. NO COMMENTS.
"""
from __future__ import annotations
from typing import Any, Callable, Optional
from openai import (
APIConnectionError,
APIStatusError,
AuthenticationError,
BadRequestError,
OpenAIError,
PermissionDeniedError,
RateLimitError,
)
from src.openai_schemas import (
ChatMessage,
NormalizedResponse,
OpenAICompatibleRequest,
ToolCall,
ToolCallFunction,
UsageStats,
)
from src.result_types import ErrorInfo, ErrorKind, Result
__all__ = [
"ChatMessage",
"NormalizedResponse",
"OpenAICompatibleRequest",
"ToolCall",
"ToolCallFunction",
"UsageStats",
]
def _to_typed_tool_call(tc: Any) -> ToolCall:
return ToolCall(
id=getattr(tc, "id", "") or "",
type=getattr(tc, "type", "function"),
function=ToolCallFunction(
name=getattr(tc.function, "name", "") or "",
arguments=getattr(tc.function, "arguments", "{}") or "{}",
),
)
def _to_dict_tool_call(tc: ToolCall) -> dict[str, Any]:
return tc.to_dict()
def _classify_openai_compatible_error(exc: Exception, source: str = "openai_compatible") -> ErrorInfo:
if isinstance(exc, RateLimitError):
return ErrorInfo(kind=ErrorKind.RATE_LIMIT, message=str(exc), source=source, original=exc)
if isinstance(exc, AuthenticationError) or isinstance(exc, PermissionDeniedError):
return ErrorInfo(kind=ErrorKind.AUTH, message=str(exc), source=source, original=exc)
if isinstance(exc, APIConnectionError):
return ErrorInfo(kind=ErrorKind.NETWORK, message=str(exc), source=source, original=exc)
if isinstance(exc, APIStatusError):
code = getattr(exc, "status_code", 0)
if code == 402:
return ErrorInfo(kind=ErrorKind.BALANCE, message=str(exc), source=source, original=exc)
if code == 429:
return ErrorInfo(kind=ErrorKind.RATE_LIMIT, message=str(exc), source=source, original=exc)
if code in (401, 403):
return ErrorInfo(kind=ErrorKind.AUTH, message=str(exc), source=source, original=exc)
if code in (500, 502, 503, 504):
return ErrorInfo(kind=ErrorKind.NETWORK, message=str(exc), source=source, original=exc)
if isinstance(exc, BadRequestError):
return ErrorInfo(kind=ErrorKind.QUOTA, message=str(exc), source=source, original=exc)
return ErrorInfo(kind=ErrorKind.UNKNOWN, message=str(exc), source=source, original=exc)
def send_openai_compatible(
client: Any,
request: OpenAICompatibleRequest,
*,
capabilities: Any,
) -> Result[NormalizedResponse]:
messages_dicts = [m.to_dict() for m in request.messages]
kwargs: dict[str, Any] = {
"model": request.model,
"messages": messages_dicts,
"temperature": request.temperature,
"top_p": request.top_p,
"max_tokens": request.max_tokens,
"stream": request.stream,
}
if request.tools is not None:
kwargs["tools"] = request.tools
kwargs["tool_choice"] = request.tool_choice
if request.extra_body:
kwargs["extra_body"] = request.extra_body
try:
if request.stream:
response = _send_streaming(client, kwargs, request.stream_callback)
else:
response = _send_blocking(client, kwargs)
return Result(data=response)
except OpenAIError as exc:
empty_resp = NormalizedResponse(
text="",
tool_calls=(),
usage=UsageStats(input_tokens=0, output_tokens=0),
raw_response=None,
)
return Result(data=empty_resp, errors=[_classify_openai_compatible_error(exc, source="openai_compatible")])
def _send_blocking(client: Any, kwargs: Metadata) -> NormalizedResponse:
resp = client.chat.completions.create(**kwargs)
msg = resp.choices[0].message
tool_calls_raw = msg.tool_calls or []
tool_calls: tuple[ToolCall, ...] = tuple(_to_typed_tool_call(tc) for tc in tool_calls_raw)
usage = getattr(resp, "usage", None)
return NormalizedResponse(
text=msg.content or "",
tool_calls=tool_calls,
usage=UsageStats(
input_tokens=int(getattr(usage, "prompt_tokens", 0) or 0),
output_tokens=int(getattr(usage, "completion_tokens", 0) or 0),
),
raw_response=resp,
)
def _send_streaming(client: Any, kwargs: Metadata, callback: Optional[Callable[[str], None]]) -> NormalizedResponse:
kwargs_stream = dict(kwargs)
kwargs_stream["stream"] = True
kwargs_stream["stream_options"] = {"include_usage": True}
chunks_iter = client.chat.completions.create(**kwargs_stream)
text_parts: list[str] = []
tool_calls_acc: dict[int, dict[str, Any]] = {}
usage_input = 0
usage_output = 0
for chunk in chunks_iter:
for choice in getattr(chunk, "choices", []) or []:
delta = getattr(choice, "delta", None)
if delta is None:
continue
if delta.content:
text_parts.append(delta.content)
if callback:
callback(delta.content)
for tc in getattr(delta, "tool_calls", None) or []:
idx = getattr(tc, "index", 0)
if idx not in tool_calls_acc:
tool_calls_acc[idx] = {"id": None, "type": "function", "function": {"name": None, "arguments": ""}}
if getattr(tc, "id", None):
tool_calls_acc[idx]["id"] = tc.id
if getattr(tc, "function", None):
if tc.function.name:
tool_calls_acc[idx]["function"]["name"] = tc.function.name
if tc.function.arguments:
tool_calls_acc[idx]["function"]["arguments"] += tc.function.arguments
chunk_usage = getattr(chunk, "usage", None)
if chunk_usage is not None:
usage_input = int(getattr(chunk_usage, "prompt_tokens", 0) or 0)
usage_output = int(getattr(chunk_usage, "completion_tokens", 0) or 0)
tool_calls_typed: tuple[ToolCall, ...] = tuple(
ToolCall(
id=acc["id"] or "",
type=acc["type"],
function=ToolCallFunction(
name=acc["function"]["name"] or "",
arguments=acc["function"]["arguments"] or "{}",
),
)
for acc in (tool_calls_acc[k] for k in sorted(tool_calls_acc.keys()))
)
return NormalizedResponse(
text="".join(text_parts),
tool_calls=tool_calls_typed,
usage=UsageStats(input_tokens=usage_input, output_tokens=usage_output),
raw_response=None,
)