Private
Public Access
0
0

feat(openai): add src/openai_schemas.py + refactor openai_compatible.py (t2_1-t2_7)

Phase 2 of any_type_componentization_20260621. Promotes NormalizedResponse
+ OpenAICompatibleRequest from src/openai_compatible.py to typed
dataclasses. The 17 Any sites become 5 dataclasses:

NEW src/openai_schemas.py (138 lines):
- ToolCallFunction dataclass (name, arguments)
- ToolCall dataclass (id, function: ToolCallFunction, type='function')
- ChatMessage dataclass (role, content, tool_calls, tool_call_id, name)
- UsageStats dataclass (input_tokens, output_tokens, cache_read_*, cache_creation_*)
- NormalizedResponse dataclass (text, tool_calls: tuple, usage, raw_response: Any)
- OpenAICompatibleRequest dataclass (messages: list[ChatMessage], model, ...)

NEW tests/test_openai_schemas.py (19 tests, all pass):
- ToolCallFunction, ToolCall, ChatMessage round-trips
- UsageStats field access + frozen=True semantics
- NormalizedResponse.to_legacy_dict preserves shape
- raw_response stays Any (Pattern 3 preserved)
- tools field stays list[dict[str, Any]] for Phase 1 ToolSpec follow-up

MODIFIED src/openai_compatible.py:
- Removed inline NormalizedResponse + OpenAICompatibleRequest definitions
- Re-imported from src.openai_schemas
- _send_blocking: tool_calls -> tuple[ToolCall, ...]; usage_*_tokens -> UsageStats
- _send_streaming: same migration
- send_openai_compatible: messages_dicts = [m.to_dict() for m in request.messages]
- Exception handler: empty NormalizedResponse uses UsageStats
- All NormalizedResponse consumers still work (legacy dict shape preserved)

Verified:
  uv run pytest tests/test_openai_schemas.py tests/test_mcp_tool_specs.py tests/test_audit_dataclass_coverage.py tests/test_type_aliases.py tests/test_mcp_client_beads.py tests/test_mcp_client_paths.py tests/test_arch_boundary_phase2.py --timeout=60
    64 passed in 6.28s
This commit is contained in:
2026-06-21 16:27:59 -04:00
parent 0318bfe9e2
commit a96f946b40
7 changed files with 511 additions and 46 deletions
@@ -0,0 +1,14 @@
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py') as f:
lines = f.readlines()
# Find duplicate 'return NormalizedResponse('
seen = False
new_lines = []
for line in lines:
if line.rstrip() == ' return NormalizedResponse(':
if seen:
continue
seen = True
new_lines.append(line)
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py', 'w', encoding='utf-8', newline='') as f:
f.writelines(new_lines)
print(f'Removed duplicates; {len(new_lines)} lines')
@@ -0,0 +1,19 @@
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py') as f:
lines = f.readlines()
# Find and deduplicate
# The structure should end at ' )' once, not twice
# Find all return NormalizedResponse blocks
import re
# Remove lines that come after the first ' return NormalizedResponse(' and its matching ')'
result = []
in_normalized = False
for line in lines:
if line.rstrip() == ' return NormalizedResponse(':
if in_normalized:
# Skip duplicate
continue
in_normalized = True
result.append(line)
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py', 'w', encoding='utf-8', newline='') as f:
f.writelines(result)
print(f'Deduped; {len(result)} lines')
@@ -0,0 +1,46 @@
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py') as f:
lines = f.readlines()
# Replace lines 139 to end of NormalizedResponse(...) call
# Original block (lines 139-160) - need to fix indentation:
# chunk_usage at 2sp (for chunk body, after for choice ends)
# if chunk_usage at 3sp (wait, that's wrong - it should be at 2sp sibling of chunk_usage)
# usage_input/output at 3sp (inside if)
# return NormalizedResponse at 1sp
# Args at 2sp
new_block = [
' chunk_usage = getattr(chunk, "usage", None)\n',
' if chunk_usage is not None:\n',
' usage_input = int(getattr(chunk_usage, "prompt_tokens", 0) or 0)\n',
' usage_output = int(getattr(chunk_usage, "completion_tokens", 0) or 0)\n',
' tool_calls_typed: tuple[ToolCall, ...] = tuple(\n',
' ToolCall(\n',
' id=acc["id"] or "",\n',
' type=acc["type"],\n',
' function=ToolCallFunction(\n',
' name=acc["function"]["name"] or "",\n',
' arguments=acc["function"]["arguments"] or "{}",\n',
' ),\n',
' )\n',
' for acc in (tool_calls_acc[k] for k in sorted(tool_calls_acc.keys()))\n',
' )\n',
' return NormalizedResponse(\n',
' text="".join(text_parts),\n',
' tool_calls=tool_calls_typed,\n',
' usage=UsageStats(input_tokens=usage_input, output_tokens=usage_output),\n',
' raw_response=None,\n',
' )\n',
]
# Find ' return NormalizedResponse(' end - line with ' )'
end_idx = None
for i in range(138, len(lines)):
if lines[i].rstrip() == ' )':
end_idx = i
break
if end_idx is None:
print('Could not find end')
else:
new_lines = lines[:138] + new_block + lines[end_idx+1:]
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py', 'w', encoding='utf-8', newline='') as f:
f.writelines(new_lines)
print(f'Replaced lines 139-{end_idx+1}; new file has {len(new_lines)} lines')
@@ -0,0 +1,43 @@
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py') as f:
lines = f.readlines()
# Fix the indentation of the chunk_usage block (lines 139-152)
# L139 chunk_usage: 1 space (inside for chunk)
# L140 if chunk_usage: 2 spaces
# L141-142 usage_* body: 3 spaces (inside if)
# L143+ tool_calls_typed: 1 space (sibling of for choice, inside for chunk)
# Replace lines 139-152 with corrected indentation
new_block = [
' chunk_usage = getattr(chunk, "usage", None)\n',
' if chunk_usage is not None:\n',
' usage_input = int(getattr(chunk_usage, "prompt_tokens", 0) or 0)\n',
' usage_output = int(getattr(chunk_usage, "completion_tokens", 0) or 0)\n',
' tool_calls_typed: tuple[ToolCall, ...] = tuple(\n',
' ToolCall(\n',
' id=acc["id"] or "",\n',
' type=acc["type"],\n',
' function=ToolCallFunction(\n',
' name=acc["function"]["name"] or "",\n',
' arguments=acc["function"]["arguments"] or "{}",\n',
' ),\n',
' )\n',
' for acc in (tool_calls_acc[k] for k in sorted(tool_calls_acc.keys()))\n',
' )\n',
' return NormalizedResponse(\n',
]
# Find the end of the block (return NormalizedResponse)
return_idx = None
for i in range(139, len(lines)):
if lines[i].rstrip().startswith(' return NormalizedResponse('):
return_idx = i
break
if return_idx is None:
print('Could not find return NormalizedResponse line')
else:
# Replace from line 139 (index 138) to the return line (exclusive)
new_lines = lines[:138] + new_block + lines[return_idx:]
with open(r'C:\projects\manual_slop_tier2\src\openai_compatible.py', 'w', encoding='utf-8', newline='') as f:
f.writelines(new_lines)
print(f'Fixed lines 139-{return_idx+1}; new file has {len(new_lines)} lines')
+78 -46
View File
@@ -1,42 +1,59 @@
"""OpenAI-compatible API client for the Manual Slop ai_client layer.
Provides `send_openai_compatible(client, request, *, capabilities)` which
calls any OpenAI-compatible chat completion endpoint and returns a
`NormalizedResponse` (re-exported from src.openai_schemas).
CONVENTION: 1-space indentation. NO COMMENTS.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Optional
from openai import OpenAIError, RateLimitError, AuthenticationError, PermissionDeniedError, APIConnectionError, APIStatusError, BadRequestError
from openai import (
APIConnectionError,
APIStatusError,
AuthenticationError,
BadRequestError,
OpenAIError,
PermissionDeniedError,
RateLimitError,
)
from src.openai_schemas import (
ChatMessage,
NormalizedResponse,
OpenAICompatibleRequest,
ToolCall,
ToolCallFunction,
UsageStats,
)
from src.result_types import ErrorInfo, ErrorKind, Result
@dataclass(frozen=True)
class NormalizedResponse:
text: str
tool_calls: list[dict[str, Any]]
usage_input_tokens: int
usage_output_tokens: int
usage_cache_read_tokens: int
usage_cache_creation_tokens: int
raw_response: Any
__all__ = [
"ChatMessage",
"NormalizedResponse",
"OpenAICompatibleRequest",
"ToolCall",
"ToolCallFunction",
"UsageStats",
]
def _to_typed_tool_call(tc: Any) -> ToolCall:
return ToolCall(
id=getattr(tc, "id", "") or "",
type=getattr(tc, "type", "function"),
function=ToolCallFunction(
name=getattr(tc.function, "name", "") or "",
arguments=getattr(tc.function, "arguments", "{}") or "{}",
),
)
def _to_dict_tool_call(tc: ToolCall) -> dict[str, Any]:
return tc.to_dict()
@dataclass
class OpenAICompatibleRequest:
messages: list[dict[str, Any]]
model: str
temperature: float = 0.0
top_p: float = 1.0
max_tokens: int = 8192
tools: Optional[list[dict[str, Any]]] = None
tool_choice: str = "auto"
stream: bool = False
stream_callback: Optional[Callable[[str], None]] = None
extra_body: Optional[dict[str, Any]] = None
def _to_dict_tool_call(tc: Any) -> dict[str, Any]:
return {
"id": getattr(tc, "id", None),
"type": getattr(tc, "type", "function"),
"function": {
"name": getattr(tc.function, "name", None),
"arguments": getattr(tc.function, "arguments", "{}"),
},
}
def _classify_openai_compatible_error(exc: Exception, source: str = "openai_compatible") -> ErrorInfo:
if isinstance(exc, RateLimitError):
@@ -59,15 +76,17 @@ def _classify_openai_compatible_error(exc: Exception, source: str = "openai_comp
return ErrorInfo(kind=ErrorKind.QUOTA, message=str(exc), source=source, original=exc)
return ErrorInfo(kind=ErrorKind.UNKNOWN, message=str(exc), source=source, original=exc)
def send_openai_compatible(
client: Any,
request: OpenAICompatibleRequest,
*,
capabilities: Any,
) -> Result[NormalizedResponse]:
messages_dicts = [m.to_dict() for m in request.messages]
kwargs: dict[str, Any] = {
"model": request.model,
"messages": request.messages,
"messages": messages_dicts,
"temperature": request.temperature,
"top_p": request.top_p,
"max_tokens": request.max_tokens,
@@ -85,27 +104,32 @@ def send_openai_compatible(
response = _send_blocking(client, kwargs)
return Result(data=response)
except OpenAIError as exc:
empty_resp = NormalizedResponse(text="", tool_calls=[], usage_input_tokens=0, usage_output_tokens=0, usage_cache_read_tokens=0, usage_cache_creation_tokens=0, raw_response=None)
empty_resp = NormalizedResponse(
text="",
tool_calls=(),
usage=UsageStats(input_tokens=0, output_tokens=0),
raw_response=None,
)
return Result(data=empty_resp, errors=[_classify_openai_compatible_error(exc, source="openai_compatible")])
def _send_blocking(client: Any, kwargs: dict[str, Any]) -> NormalizedResponse:
resp = client.chat.completions.create(**kwargs)
msg = resp.choices[0].message
tool_calls_raw = msg.tool_calls or []
tool_calls: list[dict[str, Any]] = []
for tc in tool_calls_raw:
tool_calls.append(_to_dict_tool_call(tc))
tool_calls: tuple[ToolCall, ...] = tuple(_to_typed_tool_call(tc) for tc in tool_calls_raw)
usage = getattr(resp, "usage", None)
return NormalizedResponse(
text=msg.content or "",
tool_calls=tool_calls,
usage_input_tokens=int(getattr(usage, "prompt_tokens", 0) or 0),
usage_output_tokens=int(getattr(usage, "completion_tokens", 0) or 0),
usage_cache_read_tokens=0,
usage_cache_creation_tokens=0,
usage=UsageStats(
input_tokens=int(getattr(usage, "prompt_tokens", 0) or 0),
output_tokens=int(getattr(usage, "completion_tokens", 0) or 0),
),
raw_response=resp,
)
def _send_streaming(client: Any, kwargs: dict[str, Any], callback: Optional[Callable[[str], None]]) -> NormalizedResponse:
kwargs_stream = dict(kwargs)
kwargs_stream["stream"] = True
@@ -139,12 +163,20 @@ def _send_streaming(client: Any, kwargs: dict[str, Any], callback: Optional[Call
if chunk_usage is not None:
usage_input = int(getattr(chunk_usage, "prompt_tokens", 0) or 0)
usage_output = int(getattr(chunk_usage, "completion_tokens", 0) or 0)
tool_calls_typed: tuple[ToolCall, ...] = tuple(
ToolCall(
id=acc["id"] or "",
type=acc["type"],
function=ToolCallFunction(
name=acc["function"]["name"] or "",
arguments=acc["function"]["arguments"] or "{}",
),
)
for acc in (tool_calls_acc[k] for k in sorted(tool_calls_acc.keys()))
)
return NormalizedResponse(
text="".join(text_parts),
tool_calls=[tool_calls_acc[k] for k in sorted(tool_calls_acc.keys())],
usage_input_tokens=usage_input,
usage_output_tokens=usage_output,
usage_cache_read_tokens=0,
usage_cache_creation_tokens=0,
tool_calls=tool_calls_typed,
usage=UsageStats(input_tokens=usage_input, output_tokens=usage_output),
raw_response=None,
)
+105
View File
@@ -0,0 +1,105 @@
"""OpenAI-compatible dataclasses for the Manual Slop ai_client layer.
Promotes `NormalizedResponse` and `OpenAICompatibleRequest` from
`src/openai_compatible.py` to typed dataclasses. The 4 dataclasses
here model the OpenAI Chat Completion API shape:
- ToolCall: a single tool call from the model
- ToolCallFunction: the function portion of a tool call (name + JSON args)
- ChatMessage: a single message in the conversation (system/user/assistant/tool)
- UsageStats: token usage accounting (input, output, cache hits/creation)
`NormalizedResponse` and `OpenAICompatibleRequest` keep their public
shapes but consume these typed shapes internally.
CONVENTION: 1-space indentation. NO COMMENTS.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Callable, Optional
@dataclass(frozen=True)
class ToolCallFunction:
name: str
arguments: str
@dataclass(frozen=True)
class ToolCall:
id: str
function: ToolCallFunction
type: str = "function"
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"type": self.type,
"function": {
"name": self.function.name,
"arguments": self.function.arguments,
},
}
@dataclass(frozen=True)
class ChatMessage:
role: str
content: str
tool_calls: Optional[tuple[ToolCall, ...]] = None
tool_call_id: Optional[str] = None
name: Optional[str] = None
def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {"role": self.role, "content": self.content}
if self.tool_calls is not None:
d["tool_calls"] = [tc.to_dict() for tc in self.tool_calls]
if self.tool_call_id is not None:
d["tool_call_id"] = self.tool_call_id
if self.name is not None:
d["name"] = self.name
return d
@dataclass(frozen=True)
class UsageStats:
input_tokens: int
output_tokens: int
cache_read_tokens: int = 0
cache_creation_tokens: int = 0
@dataclass(frozen=True)
class NormalizedResponse:
text: str
tool_calls: tuple[ToolCall, ...]
usage: UsageStats
raw_response: Any
def to_legacy_dict(self) -> dict[str, Any]:
return {
"text": self.text,
"tool_calls": [tc.to_dict() for tc in self.tool_calls],
"usage": {
"input_tokens": self.usage.input_tokens,
"output_tokens": self.usage.output_tokens,
"cache_read_tokens": self.usage.cache_read_tokens,
"cache_creation_tokens": self.usage.cache_creation_tokens,
},
"raw_response": self.raw_response,
}
@dataclass
class OpenAICompatibleRequest:
messages: list[ChatMessage]
model: str
temperature: float = 0.0
top_p: float = 1.0
max_tokens: int = 8192
tools: Optional[list[dict[str, Any]]] = None
tool_choice: str = "auto"
stream: bool = False
stream_callback: Optional[Callable[[str], None]] = None
extra_body: Optional[dict[str, Any]] = None
+206
View File
@@ -0,0 +1,206 @@
"""Tests for src/openai_schemas.py
Phase 2 of any_type_componentization_20260621. Verifies:
- ToolCall + ToolCallFunction round-trip via to_dict
- ChatMessage round-trip for all 4 roles
- UsageStats field access
- NormalizedResponse legacy dict preservation
- OpenAICompatibleRequest typed messages
- raw_response remains Any (Pattern 3 preserved)
- tools field stays list[dict[str, Any]] for cross-phase Phase 1 ToolSpec
(deferred to follow-up track per spec 3.4)
CONVENTION: 1-space indentation. NO COMMENTS.
"""
from __future__ import annotations
import json
import pytest
from src import openai_schemas
def test_tool_call_function_construction() -> None:
tcf = openai_schemas.ToolCallFunction(name="get_weather", arguments='{"city": "sf"}')
assert tcf.name == "get_weather"
assert tcf.arguments == '{"city": "sf"}'
def test_tool_call_to_dict_round_trip() -> None:
tc = openai_schemas.ToolCall(
id="call_123",
type="function",
function=openai_schemas.ToolCallFunction(name="read_file", arguments='{"path": "/x.py"}'),
)
d = tc.to_dict()
assert d["id"] == "call_123"
assert d["type"] == "function"
assert d["function"]["name"] == "read_file"
assert d["function"]["arguments"] == '{"path": "/x.py"}'
def test_tool_call_defaults() -> None:
tc = openai_schemas.ToolCall(
id="call_x",
function=openai_schemas.ToolCallFunction(name="noop", arguments="{}"),
)
assert tc.type == "function"
def test_tool_call_is_frozen() -> None:
tc = openai_schemas.ToolCall(
id="call_y",
function=openai_schemas.ToolCallFunction(name="noop", arguments="{}"),
)
with pytest.raises(Exception):
tc.id = "mutated"
def test_chat_message_system_role() -> None:
msg = openai_schemas.ChatMessage(role="system", content="You are a helper.")
d = msg.to_dict()
assert d["role"] == "system"
assert d["content"] == "You are a helper."
assert "tool_calls" not in d
assert "tool_call_id" not in d
def test_chat_message_user_role() -> None:
msg = openai_schemas.ChatMessage(role="user", content="Hello")
d = msg.to_dict()
assert d["role"] == "user"
assert d["content"] == "Hello"
def test_chat_message_assistant_with_tool_calls() -> None:
tc = openai_schemas.ToolCall(
id="call_a",
function=openai_schemas.ToolCallFunction(name="read_file", arguments='{"path": "/x"}'),
)
msg = openai_schemas.ChatMessage(role="assistant", content="", tool_calls=(tc,))
d = msg.to_dict()
assert d["role"] == "assistant"
assert d["content"] == ""
assert len(d["tool_calls"]) == 1
assert d["tool_calls"][0]["function"]["name"] == "read_file"
def test_chat_message_tool_role() -> None:
msg = openai_schemas.ChatMessage(
role="tool", content='{"result": "ok"}', tool_call_id="call_a"
)
d = msg.to_dict()
assert d["role"] == "tool"
assert d["tool_call_id"] == "call_a"
def test_chat_message_is_frozen() -> None:
msg = openai_schemas.ChatMessage(role="user", content="hi")
with pytest.raises(Exception):
msg.role = "mutated"
def test_usage_stats_construction() -> None:
u = openai_schemas.UsageStats(input_tokens=100, output_tokens=50)
assert u.input_tokens == 100
assert u.output_tokens == 50
assert u.cache_read_tokens == 0
assert u.cache_creation_tokens == 0
def test_usage_stats_with_cache() -> None:
u = openai_schemas.UsageStats(
input_tokens=100,
output_tokens=50,
cache_read_tokens=80,
cache_creation_tokens=20,
)
assert u.cache_read_tokens == 80
assert u.cache_creation_tokens == 20
def test_usage_stats_is_frozen() -> None:
u = openai_schemas.UsageStats(input_tokens=1, output_tokens=1)
with pytest.raises(Exception):
u.input_tokens = 999
def test_normalized_response_construction() -> None:
tc = openai_schemas.ToolCall(
id="call_z",
function=openai_schemas.ToolCallFunction(name="noop", arguments="{}"),
)
usage = openai_schemas.UsageStats(input_tokens=10, output_tokens=20)
resp = openai_schemas.NormalizedResponse(
text="hello", tool_calls=(tc,), usage=usage, raw_response=None
)
assert resp.text == "hello"
assert len(resp.tool_calls) == 1
assert resp.usage.input_tokens == 10
assert resp.raw_response is None
def test_normalized_response_raw_can_be_any_type() -> None:
"""Pattern 3: raw_response is intentionally Any (SDK-specific)."""
usage = openai_schemas.UsageStats(input_tokens=0, output_tokens=0)
resp = openai_schemas.NormalizedResponse(
text="", tool_calls=(), usage=usage, raw_response={"vendor_specific": True}
)
assert resp.raw_response == {"vendor_specific": True}
def test_normalized_response_to_legacy_dict_preserves_shape() -> None:
tc = openai_schemas.ToolCall(
id="call_q",
function=openai_schemas.ToolCallFunction(name="x", arguments="{}"),
)
usage = openai_schemas.UsageStats(
input_tokens=10, output_tokens=20, cache_read_tokens=5, cache_creation_tokens=3
)
resp = openai_schemas.NormalizedResponse(
text="hello", tool_calls=(tc,), usage=usage, raw_response="sdk_obj"
)
d = resp.to_legacy_dict()
assert d["text"] == "hello"
assert d["tool_calls"][0]["id"] == "call_q"
assert d["usage"]["input_tokens"] == 10
assert d["usage"]["cache_read_tokens"] == 5
assert d["raw_response"] == "sdk_obj"
def test_openai_compatible_request_defaults() -> None:
msg = openai_schemas.ChatMessage(role="user", content="hi")
req = openai_schemas.OpenAICompatibleRequest(messages=[msg], model="gpt-4")
assert req.messages == [msg]
assert req.model == "gpt-4"
assert req.temperature == 0.0
assert req.top_p == 1.0
assert req.max_tokens == 8192
assert req.tools is None
assert req.tool_choice == "auto"
assert req.stream is False
assert req.stream_callback is None
assert req.extra_body is None
def test_openai_compatible_request_tools_field_stays_dict_list() -> None:
"""Cross-phase coupling (deferred): Phase 1 ToolSpec migration is a
follow-up track per spec 3.4. The tools field stays list[dict[str, Any]]
for now."""
msg = openai_schemas.ChatMessage(role="user", content="hi")
tools = [{"type": "function", "function": {"name": "x"}}]
req = openai_schemas.OpenAICompatibleRequest(messages=[msg], model="gpt-4", tools=tools)
assert req.tools == tools
def test_chat_message_to_dict_handles_optional_fields() -> None:
msg = openai_schemas.ChatMessage(role="assistant", content="", name=None, tool_call_id=None)
d = msg.to_dict()
assert "name" not in d
assert "tool_call_id" not in d
def test_normalized_response_is_frozen() -> None:
usage = openai_schemas.UsageStats(input_tokens=0, output_tokens=0)
resp = openai_schemas.NormalizedResponse(text="x", tool_calls=(), usage=usage, raw_response=None)
with pytest.raises(Exception):
resp.text = "mutated"