mirror of
https://github.com/oobabooga/text-generation-webui.git
synced 2026-03-26 23:34:39 +01:00
469 lines
18 KiB
Python
469 lines
18 KiB
Python
|
|
import json
|
||
|
|
import time
|
||
|
|
|
||
|
|
from modules import shared
|
||
|
|
|
||
|
|
|
||
|
|
def convert_request(body: dict) -> dict:
|
||
|
|
"""Transform Anthropic Messages API body into the dict that chat_completions_common expects."""
|
||
|
|
messages = []
|
||
|
|
|
||
|
|
# System message
|
||
|
|
system = body.get('system')
|
||
|
|
if system:
|
||
|
|
if isinstance(system, list):
|
||
|
|
# List of content blocks like [{"type":"text","text":"..."}]
|
||
|
|
text_parts = [block.get('text', '') for block in system if isinstance(block, dict) and block.get('type') == 'text']
|
||
|
|
system_text = '\n'.join(text_parts)
|
||
|
|
else:
|
||
|
|
system_text = str(system)
|
||
|
|
if system_text:
|
||
|
|
messages.append({"role": "system", "content": system_text})
|
||
|
|
|
||
|
|
# Convert messages
|
||
|
|
for msg in body.get('messages', []):
|
||
|
|
role = msg.get('role')
|
||
|
|
content = msg.get('content')
|
||
|
|
|
||
|
|
if isinstance(content, str):
|
||
|
|
messages.append({"role": role, "content": content})
|
||
|
|
continue
|
||
|
|
|
||
|
|
if not isinstance(content, list):
|
||
|
|
messages.append({"role": role, "content": str(content) if content else ""})
|
||
|
|
continue
|
||
|
|
|
||
|
|
if role == 'assistant':
|
||
|
|
# Split into text content, tool_calls, and skip thinking blocks
|
||
|
|
text_parts = []
|
||
|
|
tool_calls = []
|
||
|
|
for block in content:
|
||
|
|
btype = block.get('type')
|
||
|
|
if btype == 'text':
|
||
|
|
text_parts.append(block.get('text', ''))
|
||
|
|
elif btype == 'tool_use':
|
||
|
|
tool_calls.append({
|
||
|
|
"id": block.get('id', ''),
|
||
|
|
"type": "function",
|
||
|
|
"function": {
|
||
|
|
"name": block.get('name', ''),
|
||
|
|
"arguments": json.dumps(block.get('input', {}))
|
||
|
|
}
|
||
|
|
})
|
||
|
|
elif btype == 'thinking':
|
||
|
|
pass # Strip thinking blocks
|
||
|
|
|
||
|
|
assistant_msg = {"role": "assistant", "content": '\n'.join(text_parts) if text_parts else ""}
|
||
|
|
if tool_calls:
|
||
|
|
assistant_msg["tool_calls"] = tool_calls
|
||
|
|
messages.append(assistant_msg)
|
||
|
|
|
||
|
|
elif role == 'user':
|
||
|
|
# Handle tool_result blocks and regular content
|
||
|
|
regular_parts = []
|
||
|
|
for block in content:
|
||
|
|
btype = block.get('type')
|
||
|
|
if btype == 'tool_result':
|
||
|
|
# Emit any accumulated regular content first
|
||
|
|
if regular_parts:
|
||
|
|
if len(regular_parts) == 1 and regular_parts[0].get('type') == 'text':
|
||
|
|
messages.append({"role": "user", "content": regular_parts[0]['text']})
|
||
|
|
else:
|
||
|
|
messages.append({"role": "user", "content": regular_parts})
|
||
|
|
regular_parts = []
|
||
|
|
# Convert tool_result to OpenAI tool message
|
||
|
|
tool_content = block.get('content', '')
|
||
|
|
if isinstance(tool_content, list):
|
||
|
|
tool_content = '\n'.join(
|
||
|
|
b.get('text', '') for b in tool_content
|
||
|
|
if isinstance(b, dict) and b.get('type') == 'text'
|
||
|
|
)
|
||
|
|
messages.append({
|
||
|
|
"role": "tool",
|
||
|
|
"tool_call_id": block.get('tool_use_id', ''),
|
||
|
|
"content": str(tool_content)
|
||
|
|
})
|
||
|
|
elif btype == 'text':
|
||
|
|
regular_parts.append({"type": "text", "text": block.get('text', '')})
|
||
|
|
elif btype == 'image':
|
||
|
|
source = block.get('source', {})
|
||
|
|
if source.get('type') == 'base64':
|
||
|
|
media_type = source.get('media_type', 'image/png')
|
||
|
|
data = source.get('data', '')
|
||
|
|
regular_parts.append({
|
||
|
|
"type": "image_url",
|
||
|
|
"image_url": {"url": f"data:{media_type};base64,{data}"}
|
||
|
|
})
|
||
|
|
elif btype == 'thinking':
|
||
|
|
pass # Strip thinking blocks
|
||
|
|
|
||
|
|
if regular_parts:
|
||
|
|
if len(regular_parts) == 1 and regular_parts[0].get('type') == 'text':
|
||
|
|
messages.append({"role": "user", "content": regular_parts[0]['text']})
|
||
|
|
else:
|
||
|
|
messages.append({"role": "user", "content": regular_parts})
|
||
|
|
else:
|
||
|
|
messages.append({"role": role, "content": str(content)})
|
||
|
|
|
||
|
|
# Start with all fields from the original body (includes GenerationOptions defaults)
|
||
|
|
result = dict(body)
|
||
|
|
|
||
|
|
# Remove Anthropic-specific fields that don't map directly
|
||
|
|
for key in ('system', 'stop_sequences', 'tools', 'tool_choice', 'thinking', 'metadata'):
|
||
|
|
result.pop(key, None)
|
||
|
|
|
||
|
|
# Set converted fields
|
||
|
|
result['messages'] = messages
|
||
|
|
result['max_tokens'] = body.get('max_tokens', 4096)
|
||
|
|
result['stream'] = body.get('stream', False)
|
||
|
|
result['mode'] = 'instruct'
|
||
|
|
|
||
|
|
# Ensure ChatCompletionRequestParams defaults are present
|
||
|
|
result.setdefault('continue_', False)
|
||
|
|
result.setdefault('instruction_template', None)
|
||
|
|
result.setdefault('instruction_template_str', None)
|
||
|
|
result.setdefault('character', None)
|
||
|
|
result.setdefault('bot_name', None)
|
||
|
|
result.setdefault('context', None)
|
||
|
|
result.setdefault('greeting', None)
|
||
|
|
result.setdefault('user_name', None)
|
||
|
|
result.setdefault('user_bio', None)
|
||
|
|
result.setdefault('chat_template_str', None)
|
||
|
|
result.setdefault('chat_instruct_command', 'Continue the chat dialogue below. Write a single reply for the character "<|character|>".\n\n<|prompt|>')
|
||
|
|
result.setdefault('frequency_penalty', None)
|
||
|
|
result.setdefault('presence_penalty', None)
|
||
|
|
result.setdefault('logit_bias', None)
|
||
|
|
result.setdefault('logprobs', None)
|
||
|
|
result.setdefault('top_logprobs', None)
|
||
|
|
result.setdefault('n', 1)
|
||
|
|
result.setdefault('model', None)
|
||
|
|
result.setdefault('functions', None)
|
||
|
|
result.setdefault('function_call', None)
|
||
|
|
result.setdefault('stream_options', None)
|
||
|
|
result.setdefault('user', None)
|
||
|
|
result.setdefault('stop', None)
|
||
|
|
result.setdefault('tool_choice', None)
|
||
|
|
|
||
|
|
# Always request usage in streaming so the usage-only chunk triggers
|
||
|
|
# the deferred message_delta/message_stop with accurate output_tokens
|
||
|
|
if body.get('stream', False):
|
||
|
|
result['stream_options'] = {'include_usage': True}
|
||
|
|
|
||
|
|
# Map stop_sequences -> stop
|
||
|
|
if body.get('stop_sequences'):
|
||
|
|
result['stop'] = body['stop_sequences']
|
||
|
|
|
||
|
|
# Tools
|
||
|
|
if body.get('tools'):
|
||
|
|
result['tools'] = [
|
||
|
|
{
|
||
|
|
"type": "function",
|
||
|
|
"function": {
|
||
|
|
"name": t.get('name', ''),
|
||
|
|
"description": t.get('description', ''),
|
||
|
|
"parameters": t.get('input_schema', {"type": "object", "properties": {}})
|
||
|
|
}
|
||
|
|
}
|
||
|
|
for t in body['tools']
|
||
|
|
]
|
||
|
|
|
||
|
|
# Tool choice
|
||
|
|
tc = body.get('tool_choice')
|
||
|
|
if tc and isinstance(tc, dict):
|
||
|
|
tc_type = tc.get('type')
|
||
|
|
if tc_type == 'auto':
|
||
|
|
result['tool_choice'] = 'auto'
|
||
|
|
elif tc_type == 'any':
|
||
|
|
result['tool_choice'] = 'required'
|
||
|
|
elif tc_type == 'tool':
|
||
|
|
result['tool_choice'] = {"type": "function", "function": {"name": tc.get('name', '')}}
|
||
|
|
elif tc_type == 'none':
|
||
|
|
result['tool_choice'] = 'none'
|
||
|
|
else:
|
||
|
|
result.setdefault('tool_choice', None)
|
||
|
|
|
||
|
|
# Thinking
|
||
|
|
thinking = body.get('thinking')
|
||
|
|
if thinking and isinstance(thinking, dict) and thinking.get('type') in ('enabled', 'adaptive'):
|
||
|
|
result['enable_thinking'] = True
|
||
|
|
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
_FINISH_REASON_MAP = {
|
||
|
|
"stop": "end_turn",
|
||
|
|
"length": "max_tokens",
|
||
|
|
"tool_calls": "tool_use",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
def build_response(openai_resp: dict, model: str) -> dict:
|
||
|
|
"""Transform OpenAI chat completion response dict into Anthropic Messages format."""
|
||
|
|
resp_id = openai_resp.get('id', 'msg_unknown')
|
||
|
|
if resp_id.startswith('chatcmpl-'):
|
||
|
|
resp_id = 'msg_' + resp_id[9:]
|
||
|
|
|
||
|
|
choice = openai_resp.get('choices', [{}])[0]
|
||
|
|
message = choice.get('message', {})
|
||
|
|
|
||
|
|
content = []
|
||
|
|
|
||
|
|
# Reasoning/thinking content
|
||
|
|
reasoning = message.get('reasoning_content')
|
||
|
|
if reasoning:
|
||
|
|
content.append({"type": "thinking", "thinking": reasoning, "signature": ""})
|
||
|
|
|
||
|
|
# Text content
|
||
|
|
text = message.get('content')
|
||
|
|
if text:
|
||
|
|
content.append({"type": "text", "text": text})
|
||
|
|
|
||
|
|
# Tool calls
|
||
|
|
tool_calls = message.get('tool_calls')
|
||
|
|
if tool_calls:
|
||
|
|
for tc in tool_calls:
|
||
|
|
func = tc.get('function', {})
|
||
|
|
try:
|
||
|
|
input_data = json.loads(func.get('arguments', '{}'))
|
||
|
|
except (json.JSONDecodeError, TypeError):
|
||
|
|
input_data = {}
|
||
|
|
content.append({
|
||
|
|
"type": "tool_use",
|
||
|
|
"id": tc.get('id', ''),
|
||
|
|
"name": func.get('name', ''),
|
||
|
|
"input": input_data
|
||
|
|
})
|
||
|
|
|
||
|
|
finish_reason = choice.get('finish_reason', 'stop')
|
||
|
|
stop_reason = _FINISH_REASON_MAP.get(finish_reason, 'end_turn')
|
||
|
|
|
||
|
|
usage = openai_resp.get('usage', {})
|
||
|
|
|
||
|
|
return {
|
||
|
|
"id": resp_id,
|
||
|
|
"type": "message",
|
||
|
|
"role": "assistant",
|
||
|
|
"content": content,
|
||
|
|
"model": model,
|
||
|
|
"stop_reason": stop_reason,
|
||
|
|
"stop_sequence": None,
|
||
|
|
"usage": {
|
||
|
|
"input_tokens": usage.get('prompt_tokens', 0),
|
||
|
|
"output_tokens": usage.get('completion_tokens', 0),
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class StreamConverter:
|
||
|
|
"""Stateful converter: processes one OpenAI chunk at a time, yields Anthropic SSE events.
|
||
|
|
|
||
|
|
When include_usage is enabled in the OpenAI request, the final chunk with
|
||
|
|
finish_reason has usage=None, followed by a separate usage-only chunk
|
||
|
|
(choices=[], usage={...}). We defer emitting message_delta and message_stop
|
||
|
|
until we receive that usage chunk so output_tokens is accurate.
|
||
|
|
"""
|
||
|
|
|
||
|
|
def __init__(self, model: str):
|
||
|
|
self.model = model
|
||
|
|
self.msg_id = "msg_%d" % int(time.time() * 1000000000)
|
||
|
|
self.block_index = 0
|
||
|
|
self.in_thinking = False
|
||
|
|
self.in_text = False
|
||
|
|
self.input_tokens = 0
|
||
|
|
self.output_tokens = 0
|
||
|
|
self.tool_calls_accum = {}
|
||
|
|
self.stop_reason = "end_turn"
|
||
|
|
self._pending_finish = False # True after we've seen finish_reason
|
||
|
|
|
||
|
|
def process_chunk(self, chunk: dict) -> list[dict]:
|
||
|
|
"""Process a single OpenAI streaming chunk; return list of Anthropic SSE event dicts."""
|
||
|
|
events = []
|
||
|
|
choices = chunk.get('choices', [])
|
||
|
|
usage = chunk.get('usage')
|
||
|
|
|
||
|
|
if usage:
|
||
|
|
self.input_tokens = usage.get('prompt_tokens', self.input_tokens)
|
||
|
|
self.output_tokens = usage.get('completion_tokens', self.output_tokens)
|
||
|
|
|
||
|
|
# Usage-only chunk (choices=[]) arrives after the finish chunk
|
||
|
|
if not choices:
|
||
|
|
if self._pending_finish:
|
||
|
|
events.extend(self.finish())
|
||
|
|
return events
|
||
|
|
|
||
|
|
choice = choices[0]
|
||
|
|
delta = choice.get('delta', {})
|
||
|
|
finish_reason = choice.get('finish_reason')
|
||
|
|
|
||
|
|
# First chunk with role
|
||
|
|
if 'role' in delta:
|
||
|
|
events.append({
|
||
|
|
"event": "message_start",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "message_start",
|
||
|
|
"message": {
|
||
|
|
"id": self.msg_id,
|
||
|
|
"type": "message",
|
||
|
|
"role": "assistant",
|
||
|
|
"content": [],
|
||
|
|
"model": self.model,
|
||
|
|
"stop_reason": None,
|
||
|
|
"stop_sequence": None,
|
||
|
|
"usage": {"input_tokens": self.input_tokens, "output_tokens": 0}
|
||
|
|
}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
events.append({"event": "ping", "data": json.dumps({"type": "ping"})})
|
||
|
|
return events
|
||
|
|
|
||
|
|
# Reasoning content
|
||
|
|
reasoning_content = delta.get('reasoning_content')
|
||
|
|
if reasoning_content:
|
||
|
|
if not self.in_thinking:
|
||
|
|
self.in_thinking = True
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_start",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_start",
|
||
|
|
"index": self.block_index,
|
||
|
|
"content_block": {"type": "thinking", "thinking": ""}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_delta",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_delta",
|
||
|
|
"index": self.block_index,
|
||
|
|
"delta": {"type": "thinking_delta", "thinking": reasoning_content}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
return events
|
||
|
|
|
||
|
|
# Text content
|
||
|
|
text_content = delta.get('content')
|
||
|
|
if text_content:
|
||
|
|
if self.in_thinking:
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_stop",
|
||
|
|
"data": json.dumps({"type": "content_block_stop", "index": self.block_index})
|
||
|
|
})
|
||
|
|
self.in_thinking = False
|
||
|
|
self.block_index += 1
|
||
|
|
|
||
|
|
if not self.in_text:
|
||
|
|
self.in_text = True
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_start",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_start",
|
||
|
|
"index": self.block_index,
|
||
|
|
"content_block": {"type": "text", "text": ""}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_delta",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_delta",
|
||
|
|
"index": self.block_index,
|
||
|
|
"delta": {"type": "text_delta", "text": text_content}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
return events
|
||
|
|
|
||
|
|
# Tool calls in delta
|
||
|
|
chunk_tool_calls = delta.get('tool_calls')
|
||
|
|
if chunk_tool_calls:
|
||
|
|
for tc in chunk_tool_calls:
|
||
|
|
tc_id = tc.get('id', '')
|
||
|
|
tc_idx = tc.get('index', 0)
|
||
|
|
func = tc.get('function', {})
|
||
|
|
if tc_id:
|
||
|
|
self.tool_calls_accum[tc_idx] = {
|
||
|
|
"id": tc_id,
|
||
|
|
"name": func.get('name', ''),
|
||
|
|
"arguments": func.get('arguments', '')
|
||
|
|
}
|
||
|
|
elif tc_idx in self.tool_calls_accum:
|
||
|
|
self.tool_calls_accum[tc_idx]["arguments"] += func.get('arguments', '')
|
||
|
|
|
||
|
|
# Final chunk — close open content blocks, defer message_delta/stop for usage
|
||
|
|
if finish_reason is not None:
|
||
|
|
self.stop_reason = _FINISH_REASON_MAP.get(finish_reason, 'end_turn')
|
||
|
|
|
||
|
|
if self.in_thinking:
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_stop",
|
||
|
|
"data": json.dumps({"type": "content_block_stop", "index": self.block_index})
|
||
|
|
})
|
||
|
|
self.in_thinking = False
|
||
|
|
self.block_index += 1
|
||
|
|
|
||
|
|
if self.in_text:
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_stop",
|
||
|
|
"data": json.dumps({"type": "content_block_stop", "index": self.block_index})
|
||
|
|
})
|
||
|
|
self.in_text = False
|
||
|
|
self.block_index += 1
|
||
|
|
|
||
|
|
for tc_idx in sorted(self.tool_calls_accum.keys()):
|
||
|
|
tc = self.tool_calls_accum[tc_idx]
|
||
|
|
arguments_str = tc["arguments"] or "{}"
|
||
|
|
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_start",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_start",
|
||
|
|
"index": self.block_index,
|
||
|
|
"content_block": {
|
||
|
|
"type": "tool_use",
|
||
|
|
"id": tc["id"],
|
||
|
|
"name": tc["name"],
|
||
|
|
"input": {}
|
||
|
|
}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
# Emit the full input as a single input_json_delta so SDK
|
||
|
|
# clients that reconstruct from deltas get the correct data
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_delta",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "content_block_delta",
|
||
|
|
"index": self.block_index,
|
||
|
|
"delta": {
|
||
|
|
"type": "input_json_delta",
|
||
|
|
"partial_json": arguments_str
|
||
|
|
}
|
||
|
|
})
|
||
|
|
})
|
||
|
|
events.append({
|
||
|
|
"event": "content_block_stop",
|
||
|
|
"data": json.dumps({"type": "content_block_stop", "index": self.block_index})
|
||
|
|
})
|
||
|
|
self.block_index += 1
|
||
|
|
|
||
|
|
# Defer message_delta/stop — usage chunk may follow
|
||
|
|
self._pending_finish = True
|
||
|
|
|
||
|
|
return events
|
||
|
|
|
||
|
|
def finish(self) -> list[dict]:
|
||
|
|
"""Emit deferred message_delta and message_stop. Safe to call multiple times."""
|
||
|
|
if not self._pending_finish:
|
||
|
|
return []
|
||
|
|
self._pending_finish = False
|
||
|
|
return [
|
||
|
|
{
|
||
|
|
"event": "message_delta",
|
||
|
|
"data": json.dumps({
|
||
|
|
"type": "message_delta",
|
||
|
|
"delta": {"stop_reason": self.stop_reason, "stop_sequence": None},
|
||
|
|
"usage": {"input_tokens": self.input_tokens, "output_tokens": self.output_tokens}
|
||
|
|
})
|
||
|
|
},
|
||
|
|
{
|
||
|
|
"event": "message_stop",
|
||
|
|
"data": json.dumps({"type": "message_stop"})
|
||
|
|
}
|
||
|
|
]
|