test: Add thinking trace parsing tests

This commit is contained in:
2026-03-13 21:53:17 -04:00
parent c5a406eff8
commit ea7b3ae3ae
3 changed files with 166 additions and 3 deletions

View File

@@ -132,7 +132,18 @@ def parse_history_entries(history_strings: list[str], roles: list[str]) -> list[
return entries
@dataclass
@dataclass
class ThinkingSegment:
content: str
marker: str # 'thinking', 'thought', or 'Thinking:'
def to_dict(self) -> Dict[str, Any]:
return {"content": self.content, "marker": self.marker}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "ThinkingSegment":
return cls(content=data["content"], marker=data["marker"])
@dataclass
class Ticket:
id: str
@@ -239,8 +250,6 @@ class Track:
)
@dataclass
@dataclass
@dataclass
class WorkerContext:
ticket_id: str

53
src/thinking_parser.py Normal file
View File

@@ -0,0 +1,53 @@
import re
from typing import List, Tuple
from src.models import ThinkingSegment
def parse_thinking_trace(text: str) -> Tuple[List[ThinkingSegment], str]:
"""
Parses thinking segments from text and returns (segments, response_content).
Support extraction of thinking traces from <thinking>...</thinking>, <thought>...</thought>,
and blocks prefixed with Thinking:.
"""
segments = []
# 1. Extract <thinking> and <thought> tags
current_text = text
# Combined pattern for tags
tag_pattern = re.compile(r'<(thinking|thought)>(.*?)</\1>', re.DOTALL | re.IGNORECASE)
def extract_tags(txt: str) -> Tuple[List[ThinkingSegment], str]:
found_segments = []
def replace_func(match):
marker = match.group(1).lower()
content = match.group(2).strip()
found_segments.append(ThinkingSegment(content=content, marker=marker))
return ""
remaining = tag_pattern.sub(replace_func, txt)
return found_segments, remaining
tag_segments, remaining = extract_tags(current_text)
segments.extend(tag_segments)
# 2. Extract Thinking: prefix
# This usually appears at the start of a block and ends with a double newline or a response marker.
thinking_colon_pattern = re.compile(r'(?:^|\n)Thinking:\s*(.*?)(?:\n\n|\nResponse:|\nAnswer:|$)', re.DOTALL | re.IGNORECASE)
def extract_colon_blocks(txt: str) -> Tuple[List[ThinkingSegment], str]:
found_segments = []
def replace_func(match):
content = match.group(1).strip()
if content:
found_segments.append(ThinkingSegment(content=content, marker="Thinking:"))
return "\n\n"
res = thinking_colon_pattern.sub(replace_func, txt)
return found_segments, res
colon_segments, final_remaining = extract_colon_blocks(remaining)
segments.extend(colon_segments)
return segments, final_remaining.strip()