protocol.py
8.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# AIfeng/2025-07-11 13:36:00
"""
豆包语音识别WebSocket协议处理模块
实现二进制协议的编解码、消息类型定义和数据包处理
"""
import gzip
import json
from typing import Dict, Any, Tuple, Optional
# 协议版本和头部大小
PROTOCOL_VERSION = 0b0001
DEFAULT_HEADER_SIZE = 0b0001
# 消息类型定义
class MessageType:
FULL_CLIENT_REQUEST = 0b0001
AUDIO_ONLY_REQUEST = 0b0010
FULL_SERVER_RESPONSE = 0b1001
SERVER_ACK = 0b1011
SERVER_ERROR_RESPONSE = 0b1111
# 消息类型特定标志
class MessageFlags:
NO_SEQUENCE = 0b0000
POS_SEQUENCE = 0b0001
NEG_SEQUENCE = 0b0010
NEG_WITH_SEQUENCE = 0b0011
# 序列化方法
class SerializationMethod:
NO_SERIALIZATION = 0b0000
JSON = 0b0001
# 压缩方法
class CompressionType:
NO_COMPRESSION = 0b0000
GZIP = 0b0001
class DoubaoProtocol:
"""豆包ASR WebSocket协议处理器"""
@staticmethod
def generate_header(
message_type: int = MessageType.FULL_CLIENT_REQUEST,
message_type_specific_flags: int = MessageFlags.NO_SEQUENCE,
serial_method: int = SerializationMethod.JSON,
compression_type: int = CompressionType.GZIP,
reserved_data: int = 0x00
) -> bytearray:
"""
生成协议头部
Args:
message_type: 消息类型
message_type_specific_flags: 消息类型特定标志
serial_method: 序列化方法
compression_type: 压缩类型
reserved_data: 保留字段
Returns:
bytearray: 4字节协议头部
"""
header = bytearray()
header_size = 1
header.append((PROTOCOL_VERSION << 4) | header_size)
header.append((message_type << 4) | message_type_specific_flags)
header.append((serial_method << 4) | compression_type)
header.append(reserved_data)
return header
@staticmethod
def generate_sequence_payload(sequence: int) -> bytearray:
"""
生成序列号载荷
Args:
sequence: 序列号
Returns:
bytearray: 4字节序列号数据
"""
payload = bytearray()
payload.extend(sequence.to_bytes(4, 'big', signed=True))
return payload
@staticmethod
def parse_response(response_data: bytes) -> Dict[str, Any]:
"""
解析服务器响应数据
Args:
response_data: 服务器响应的二进制数据
Returns:
Dict: 解析后的响应数据
"""
if len(response_data) < 4:
raise ValueError("响应数据长度不足")
# 解析头部
protocol_version = response_data[0] >> 4
header_size = response_data[0] & 0x0f
message_type = response_data[1] >> 4
message_type_specific_flags = response_data[1] & 0x0f
serialization_method = response_data[2] >> 4
message_compression = response_data[2] & 0x0f
reserved = response_data[3]
# 解析扩展头部和载荷
header_extensions = response_data[4:header_size * 4]
payload = response_data[header_size * 4:]
result = {
'protocol_version': protocol_version,
'header_size': header_size,
'message_type': message_type,
'message_type_specific_flags': message_type_specific_flags,
'serialization_method': serialization_method,
'message_compression': message_compression,
'is_last_package': False,
'payload_msg': None,
'payload_size': 0
}
# 处理序列号
if message_type_specific_flags & 0x01:
if len(payload) >= 4:
seq = int.from_bytes(payload[:4], "big", signed=True)
result['payload_sequence'] = seq
payload = payload[4:]
# 检查是否为最后一包
if message_type_specific_flags & 0x02:
result['is_last_package'] = True
# 根据消息类型解析载荷
payload_msg = None
payload_size = 0
if message_type == MessageType.FULL_SERVER_RESPONSE:
if len(payload) >= 4:
payload_size = int.from_bytes(payload[:4], "big", signed=True)
payload_msg = payload[4:]
elif message_type == MessageType.SERVER_ACK:
if len(payload) >= 4:
seq = int.from_bytes(payload[:4], "big", signed=True)
result['seq'] = seq
if len(payload) >= 8:
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
elif message_type == MessageType.SERVER_ERROR_RESPONSE:
if len(payload) >= 8:
code = int.from_bytes(payload[:4], "big", signed=False)
result['code'] = code
payload_size = int.from_bytes(payload[4:8], "big", signed=False)
payload_msg = payload[8:]
# 解压缩和反序列化载荷
if payload_msg is not None:
if message_compression == CompressionType.GZIP:
try:
payload_msg = gzip.decompress(payload_msg)
except Exception as e:
result['decompress_error'] = str(e)
return result
if serialization_method == SerializationMethod.JSON:
try:
payload_msg = json.loads(payload_msg.decode('utf-8'))
except Exception as e:
result['json_parse_error'] = str(e)
return result
elif serialization_method != SerializationMethod.NO_SERIALIZATION:
payload_msg = payload_msg.decode('utf-8')
result['payload_msg'] = payload_msg
result['payload_size'] = payload_size
return result
@staticmethod
def build_full_request(
request_params: Dict[str, Any],
sequence: int = 1,
compression: bool = True
) -> bytearray:
"""
构建完整客户端请求
Args:
request_params: 请求参数字典
sequence: 序列号
compression: 是否启用压缩
Returns:
bytearray: 完整的请求数据包
"""
# 序列化请求参数
payload_bytes = json.dumps(request_params).encode('utf-8')
# 压缩载荷
compression_type = CompressionType.GZIP if compression else CompressionType.NO_COMPRESSION
if compression:
payload_bytes = gzip.compress(payload_bytes)
# 生成头部
header = DoubaoProtocol.generate_header(
message_type=MessageType.FULL_CLIENT_REQUEST,
message_type_specific_flags=MessageFlags.POS_SEQUENCE,
compression_type=compression_type
)
# 构建完整请求
request = bytearray(header)
request.extend(DoubaoProtocol.generate_sequence_payload(sequence))
request.extend(len(payload_bytes).to_bytes(4, 'big'))
request.extend(payload_bytes)
return request
@staticmethod
def build_audio_request(
audio_data: bytes,
sequence: int,
is_last: bool = False,
compression: bool = True
) -> bytearray:
"""
构建音频数据请求
Args:
audio_data: 音频数据
sequence: 序列号
is_last: 是否为最后一包
compression: 是否启用压缩
Returns:
bytearray: 音频请求数据包
"""
# 压缩音频数据
compression_type = CompressionType.GZIP if compression else CompressionType.NO_COMPRESSION
payload_bytes = gzip.compress(audio_data) if compression else audio_data
# 确定消息标志
if is_last:
flags = MessageFlags.NEG_WITH_SEQUENCE
sequence = -abs(sequence)
else:
flags = MessageFlags.POS_SEQUENCE
# 生成头部
header = DoubaoProtocol.generate_header(
message_type=MessageType.AUDIO_ONLY_REQUEST,
message_type_specific_flags=flags,
compression_type=compression_type
)
# 构建音频请求
request = bytearray(header)
request.extend(DoubaoProtocol.generate_sequence_payload(sequence))
request.extend(len(payload_bytes).to_bytes(4, 'big'))
request.extend(payload_bytes)
return request