Skip to content

gpt realtime模型调用

python
# example requires websocket-client library:
# pip install websocket-client

import os
import json
import websocket


url = "ws://www.anyapi.vip/v1/realtime?model=gpt-4o-realtime-preview"
headers = [
    "Authorization: Bearer sk-xxx" ,
    "OpenAI-Beta: realtime=v1"
]

def decode_unicode_string(s):
    """解码Unicode字符串,使中文可读"""
    try:
        # 先尝试直接解码
        if isinstance(s, bytes):
            return s.decode('utf-8')
        # 如果是已经乱码的字符串,先编码再解码
        return s.encode('latin1').decode('utf-8')
    except Exception as e:
        print(f"解码失败: {e}")
        return s

def on_error(ws, error):
    print("\n=== 错误信息 ===")
    print(f"{error}")
    
def on_close(ws, close_status_code, close_msg):
    print("\n=== 连接关闭 ===")
    print(f"状态码: {close_status_code}")
    print(f"关闭信息: {close_msg}")

def on_open(ws):
    print("\n=== 连接已建立 ===")
    
    event = {
        "type": "response.create",
        "response": {
            "modalities": ["text"],
            "instructions": "Please assist the user."
        }
    }
    print("\n=== 发送消息 ===")
    print(json.dumps(event, ensure_ascii=False, indent=2))
    ws.send(json.dumps(event))

def on_message(ws, message):
    print("\n=== 收到消息 ===")
    try:
        data = json.loads(message)
        # 如果存在错误信息,解码显示中文
        if 'error' in data and 'message' in data['error']:
            data['error']['message'] = decode_unicode_string(data['error']['message'])
        
        print(json.dumps(data, ensure_ascii=False, indent=2))
        
        # 如果是错误消息,额外显示一个简化版本
        if data.get('type') == 'error':
            print("\n=== 错误摘要 ===")
            print(f"错误类型: {data['error'].get('type', 'unknown')}")
            print(f"错误代码: {data['error'].get('code', 'unknown')}")
            error_message = decode_unicode_string(data['error'].get('message', 'unknown'))
            print(f"错误信息: {error_message}")
            
    except json.JSONDecodeError as e:
        print(f"解析消息失败: {message}")
        print(f"错误: {e}")

# 启用websocket日志,但只显示关键信息
websocket.enableTrace(False)

ws = websocket.WebSocketApp(
    url,
    header=headers,
    on_open=on_open,
    on_message=on_message,
    on_error=on_error,
    on_close=on_close
)

print("\n=== 开始连接 ===")
print(f"连接地址: {url}")
ws.run_forever()

AnyAPI — 专业的 AI 接口聚合服务