04d723e420
Phase 2 of any_type_componentization_20260621. Promotes NormalizedResponse
+ OpenAICompatibleRequest from src/openai_compatible.py to typed
dataclasses. The 17 Any sites become 5 dataclasses:
NEW src/openai_schemas.py (138 lines):
- ToolCallFunction dataclass (name, arguments)
- ToolCall dataclass (id, function: ToolCallFunction, type='function')
- ChatMessage dataclass (role, content, tool_calls, tool_call_id, name)
- UsageStats dataclass (input_tokens, output_tokens, cache_read_*, cache_creation_*)
- NormalizedResponse dataclass (text, tool_calls: tuple, usage, raw_response: Any)
- OpenAICompatibleRequest dataclass (messages: list[ChatMessage], model, ...)
NEW tests/test_openai_schemas.py (19 tests, all pass):
- ToolCallFunction, ToolCall, ChatMessage round-trips
- UsageStats field access + frozen=True semantics
- NormalizedResponse.to_legacy_dict preserves shape
- raw_response stays Any (Pattern 3 preserved)
- tools field stays list[dict[str, Any]] for Phase 1 ToolSpec follow-up
MODIFIED src/openai_compatible.py:
- Removed inline NormalizedResponse + OpenAICompatibleRequest definitions
- Re-imported from src.openai_schemas
- _send_blocking: tool_calls -> tuple[ToolCall, ...]; usage_*_tokens -> UsageStats
- _send_streaming: same migration
- send_openai_compatible: messages_dicts = [m.to_dict() for m in request.messages]
- Exception handler: empty NormalizedResponse uses UsageStats
- All NormalizedResponse consumers still work (legacy dict shape preserved)
Verified:
uv run pytest tests/test_openai_schemas.py tests/test_mcp_tool_specs.py tests/test_audit_dataclass_coverage.py tests/test_type_aliases.py tests/test_mcp_client_beads.py tests/test_mcp_client_paths.py tests/test_arch_boundary_phase2.py --timeout=60
64 passed in 6.28s
182 lines
6.0 KiB
Python
182 lines
6.0 KiB
Python
"""OpenAI-compatible API client for the Manual Slop ai_client layer.
|
|
|
|
Provides `send_openai_compatible(client, request, *, capabilities)` which
|
|
calls any OpenAI-compatible chat completion endpoint and returns a
|
|
`NormalizedResponse` (re-exported from src.openai_schemas).
|
|
|
|
CONVENTION: 1-space indentation. NO COMMENTS.
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Callable, Optional
|
|
|
|
from openai import (
|
|
APIConnectionError,
|
|
APIStatusError,
|
|
AuthenticationError,
|
|
BadRequestError,
|
|
OpenAIError,
|
|
PermissionDeniedError,
|
|
RateLimitError,
|
|
)
|
|
|
|
from src.openai_schemas import (
|
|
ChatMessage,
|
|
NormalizedResponse,
|
|
OpenAICompatibleRequest,
|
|
ToolCall,
|
|
ToolCallFunction,
|
|
UsageStats,
|
|
)
|
|
from src.result_types import ErrorInfo, ErrorKind, Result
|
|
|
|
__all__ = [
|
|
"ChatMessage",
|
|
"NormalizedResponse",
|
|
"OpenAICompatibleRequest",
|
|
"ToolCall",
|
|
"ToolCallFunction",
|
|
"UsageStats",
|
|
]
|
|
|
|
|
|
def _to_typed_tool_call(tc: Any) -> ToolCall:
|
|
return ToolCall(
|
|
id=getattr(tc, "id", "") or "",
|
|
type=getattr(tc, "type", "function"),
|
|
function=ToolCallFunction(
|
|
name=getattr(tc.function, "name", "") or "",
|
|
arguments=getattr(tc.function, "arguments", "{}") or "{}",
|
|
),
|
|
)
|
|
|
|
|
|
def _to_dict_tool_call(tc: ToolCall) -> dict[str, Any]:
|
|
return tc.to_dict()
|
|
|
|
|
|
def _classify_openai_compatible_error(exc: Exception, source: str = "openai_compatible") -> ErrorInfo:
|
|
if isinstance(exc, RateLimitError):
|
|
return ErrorInfo(kind=ErrorKind.RATE_LIMIT, message=str(exc), source=source, original=exc)
|
|
if isinstance(exc, AuthenticationError) or isinstance(exc, PermissionDeniedError):
|
|
return ErrorInfo(kind=ErrorKind.AUTH, message=str(exc), source=source, original=exc)
|
|
if isinstance(exc, APIConnectionError):
|
|
return ErrorInfo(kind=ErrorKind.NETWORK, message=str(exc), source=source, original=exc)
|
|
if isinstance(exc, APIStatusError):
|
|
code = getattr(exc, "status_code", 0)
|
|
if code == 402:
|
|
return ErrorInfo(kind=ErrorKind.BALANCE, message=str(exc), source=source, original=exc)
|
|
if code == 429:
|
|
return ErrorInfo(kind=ErrorKind.RATE_LIMIT, message=str(exc), source=source, original=exc)
|
|
if code in (401, 403):
|
|
return ErrorInfo(kind=ErrorKind.AUTH, message=str(exc), source=source, original=exc)
|
|
if code in (500, 502, 503, 504):
|
|
return ErrorInfo(kind=ErrorKind.NETWORK, message=str(exc), source=source, original=exc)
|
|
if isinstance(exc, BadRequestError):
|
|
return ErrorInfo(kind=ErrorKind.QUOTA, message=str(exc), source=source, original=exc)
|
|
return ErrorInfo(kind=ErrorKind.UNKNOWN, message=str(exc), source=source, original=exc)
|
|
|
|
|
|
def send_openai_compatible(
|
|
client: Any,
|
|
request: OpenAICompatibleRequest,
|
|
*,
|
|
capabilities: Any,
|
|
) -> Result[NormalizedResponse]:
|
|
messages_dicts = [m.to_dict() for m in request.messages]
|
|
kwargs: dict[str, Any] = {
|
|
"model": request.model,
|
|
"messages": messages_dicts,
|
|
"temperature": request.temperature,
|
|
"top_p": request.top_p,
|
|
"max_tokens": request.max_tokens,
|
|
"stream": request.stream,
|
|
}
|
|
if request.tools is not None:
|
|
kwargs["tools"] = request.tools
|
|
kwargs["tool_choice"] = request.tool_choice
|
|
if request.extra_body:
|
|
kwargs["extra_body"] = request.extra_body
|
|
try:
|
|
if request.stream:
|
|
response = _send_streaming(client, kwargs, request.stream_callback)
|
|
else:
|
|
response = _send_blocking(client, kwargs)
|
|
return Result(data=response)
|
|
except OpenAIError as exc:
|
|
empty_resp = NormalizedResponse(
|
|
text="",
|
|
tool_calls=(),
|
|
usage=UsageStats(input_tokens=0, output_tokens=0),
|
|
raw_response=None,
|
|
)
|
|
return Result(data=empty_resp, errors=[_classify_openai_compatible_error(exc, source="openai_compatible")])
|
|
|
|
|
|
def _send_blocking(client: Any, kwargs: dict[str, Any]) -> NormalizedResponse:
|
|
resp = client.chat.completions.create(**kwargs)
|
|
msg = resp.choices[0].message
|
|
tool_calls_raw = msg.tool_calls or []
|
|
tool_calls: tuple[ToolCall, ...] = tuple(_to_typed_tool_call(tc) for tc in tool_calls_raw)
|
|
usage = getattr(resp, "usage", None)
|
|
return NormalizedResponse(
|
|
text=msg.content or "",
|
|
tool_calls=tool_calls,
|
|
usage=UsageStats(
|
|
input_tokens=int(getattr(usage, "prompt_tokens", 0) or 0),
|
|
output_tokens=int(getattr(usage, "completion_tokens", 0) or 0),
|
|
),
|
|
raw_response=resp,
|
|
)
|
|
|
|
|
|
def _send_streaming(client: Any, kwargs: dict[str, Any], callback: Optional[Callable[[str], None]]) -> NormalizedResponse:
|
|
kwargs_stream = dict(kwargs)
|
|
kwargs_stream["stream"] = True
|
|
kwargs_stream["stream_options"] = {"include_usage": True}
|
|
chunks_iter = client.chat.completions.create(**kwargs_stream)
|
|
text_parts: list[str] = []
|
|
tool_calls_acc: dict[int, dict[str, Any]] = {}
|
|
usage_input = 0
|
|
usage_output = 0
|
|
for chunk in chunks_iter:
|
|
for choice in getattr(chunk, "choices", []) or []:
|
|
delta = getattr(choice, "delta", None)
|
|
if delta is None:
|
|
continue
|
|
if delta.content:
|
|
text_parts.append(delta.content)
|
|
if callback:
|
|
callback(delta.content)
|
|
for tc in getattr(delta, "tool_calls", None) or []:
|
|
idx = getattr(tc, "index", 0)
|
|
if idx not in tool_calls_acc:
|
|
tool_calls_acc[idx] = {"id": None, "type": "function", "function": {"name": None, "arguments": ""}}
|
|
if getattr(tc, "id", None):
|
|
tool_calls_acc[idx]["id"] = tc.id
|
|
if getattr(tc, "function", None):
|
|
if tc.function.name:
|
|
tool_calls_acc[idx]["function"]["name"] = tc.function.name
|
|
if tc.function.arguments:
|
|
tool_calls_acc[idx]["function"]["arguments"] += tc.function.arguments
|
|
chunk_usage = getattr(chunk, "usage", None)
|
|
if chunk_usage is not None:
|
|
usage_input = int(getattr(chunk_usage, "prompt_tokens", 0) or 0)
|
|
usage_output = int(getattr(chunk_usage, "completion_tokens", 0) or 0)
|
|
tool_calls_typed: tuple[ToolCall, ...] = tuple(
|
|
ToolCall(
|
|
id=acc["id"] or "",
|
|
type=acc["type"],
|
|
function=ToolCallFunction(
|
|
name=acc["function"]["name"] or "",
|
|
arguments=acc["function"]["arguments"] or "{}",
|
|
),
|
|
)
|
|
for acc in (tool_calls_acc[k] for k in sorted(tool_calls_acc.keys()))
|
|
)
|
|
return NormalizedResponse(
|
|
text="".join(text_parts),
|
|
tool_calls=tool_calls_typed,
|
|
usage=UsageStats(input_tokens=usage_input, output_tokens=usage_output),
|
|
raw_response=None,
|
|
) |