text_processing.py
8.04 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
"""
文本处理工具函数
用于清理LLM输出、解析JSON等
"""
import re
import json
from typing import Dict, Any, List
from json.decoder import JSONDecodeError
def clean_json_tags(text: str) -> str:
"""
清理文本中的JSON标签
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除```json 和 ```标签
text = re.sub(r'```json\s*', '', text)
text = re.sub(r'```\s*$', '', text)
text = re.sub(r'```', '', text)
return text.strip()
def clean_markdown_tags(text: str) -> str:
"""
清理文本中的Markdown标签
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除```markdown 和 ```标签
text = re.sub(r'```markdown\s*', '', text)
text = re.sub(r'```\s*$', '', text)
text = re.sub(r'```', '', text)
return text.strip()
def remove_reasoning_from_output(text: str) -> str:
"""
移除输出中的推理过程文本
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 查找JSON开始位置
json_start = -1
# 尝试找到第一个 { 或 [
for i, char in enumerate(text):
if char in '{[':
json_start = i
break
if json_start != -1:
# 从JSON开始位置截取
return text[json_start:].strip()
# 如果没有找到JSON标记,尝试其他方法
# 移除常见的推理标识
patterns = [
r'(?:reasoning|推理|思考|分析)[::]\s*.*?(?=\{|\[)', # 移除推理部分
r'(?:explanation|解释|说明)[::]\s*.*?(?=\{|\[)', # 移除解释部分
r'^.*?(?=\{|\[)', # 移除JSON前的所有文本
]
for pattern in patterns:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
return text.strip()
def extract_clean_response(text: str) -> Dict[str, Any]:
"""
提取并清理响应中的JSON内容
Args:
text: 原始响应文本
Returns:
解析后的JSON字典
"""
# 清理文本
cleaned_text = clean_json_tags(text)
cleaned_text = remove_reasoning_from_output(cleaned_text)
# 尝试直接解析
try:
return json.loads(cleaned_text)
except JSONDecodeError:
pass
# 尝试修复不完整的JSON
fixed_text = fix_incomplete_json(cleaned_text)
if fixed_text:
try:
return json.loads(fixed_text)
except JSONDecodeError:
pass
# 尝试查找JSON对象
json_pattern = r'\{.*\}'
match = re.search(json_pattern, cleaned_text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except JSONDecodeError:
pass
# 尝试查找JSON数组
array_pattern = r'\[.*\]'
match = re.search(array_pattern, cleaned_text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except JSONDecodeError:
pass
# 如果所有方法都失败,返回错误信息
print(f"无法解析JSON响应: {cleaned_text[:200]}...")
return {"error": "JSON解析失败", "raw_text": cleaned_text}
def fix_incomplete_json(text: str) -> str:
"""
修复不完整的JSON响应
Args:
text: 原始文本
Returns:
修复后的JSON文本,如果无法修复则返回空字符串
"""
# 移除多余的逗号和空白
text = re.sub(r',\s*}', '}', text)
text = re.sub(r',\s*]', ']', text)
# 检查是否已经是有效的JSON
try:
json.loads(text)
return text
except JSONDecodeError:
pass
# 检查是否缺少开头的数组符号
if text.strip().startswith('{') and not text.strip().startswith('['):
# 如果以对象开始,尝试包装成数组
if text.count('{') > 1:
# 多个对象,包装成数组
text = '[' + text + ']'
else:
# 单个对象,包装成数组
text = '[' + text + ']'
# 检查是否缺少结尾的数组符号
if text.strip().endswith('}') and not text.strip().endswith(']'):
# 如果以对象结束,尝试包装成数组
if text.count('}') > 1:
# 多个对象,包装成数组
text = '[' + text + ']'
else:
# 单个对象,包装成数组
text = '[' + text + ']'
# 检查括号是否匹配
open_braces = text.count('{')
close_braces = text.count('}')
open_brackets = text.count('[')
close_brackets = text.count(']')
# 修复不匹配的括号
if open_braces > close_braces:
text += '}' * (open_braces - close_braces)
if open_brackets > close_brackets:
text += ']' * (open_brackets - close_brackets)
# 验证修复后的JSON是否有效
try:
json.loads(text)
return text
except JSONDecodeError:
# 如果仍然无效,尝试更激进的修复
return fix_aggressive_json(text)
def fix_aggressive_json(text: str) -> str:
"""
更激进的JSON修复方法
Args:
text: 原始文本
Returns:
修复后的JSON文本
"""
# 查找所有可能的JSON对象
objects = re.findall(r'\{[^{}]*\}', text)
if len(objects) >= 2:
# 如果有多个对象,包装成数组
return '[' + ','.join(objects) + ']'
elif len(objects) == 1:
# 如果只有一个对象,包装成数组
return '[' + objects[0] + ']'
else:
# 如果没有找到对象,返回空数组
return '[]'
def update_state_with_search_results(search_results: List[Dict[str, Any]],
paragraph_index: int, state: Any) -> Any:
"""
将搜索结果更新到状态中
Args:
search_results: 搜索结果列表
paragraph_index: 段落索引
state: 状态对象
Returns:
更新后的状态对象
"""
if 0 <= paragraph_index < len(state.paragraphs):
# 获取最后一次搜索的查询(假设是当前查询)
current_query = ""
if search_results:
# 从搜索结果推断查询(这里需要改进以获取实际查询)
current_query = "搜索查询"
# 添加搜索结果到状态
state.paragraphs[paragraph_index].research.add_search_results(
current_query, search_results
)
return state
def validate_json_schema(data: Dict[str, Any], required_fields: List[str]) -> bool:
"""
验证JSON数据是否包含必需字段
Args:
data: 要验证的数据
required_fields: 必需字段列表
Returns:
验证是否通过
"""
return all(field in data for field in required_fields)
def truncate_content(content: str, max_length: int = 20000) -> str:
"""
截断内容到指定长度
Args:
content: 原始内容
max_length: 最大长度
Returns:
截断后的内容
"""
if len(content) <= max_length:
return content
# 尝试在单词边界截断
truncated = content[:max_length]
last_space = truncated.rfind(' ')
if last_space > max_length * 0.8: # 如果最后一个空格位置合理
return truncated[:last_space] + "..."
else:
return truncated + "..."
def format_search_results_for_prompt(search_results: List[Dict[str, Any]],
max_length: int = 20000) -> List[str]:
"""
格式化搜索结果用于提示词
Args:
search_results: 搜索结果列表
max_length: 每个结果的最大长度
Returns:
格式化后的内容列表
"""
formatted_results = []
for result in search_results:
content = result.get('content', '')
if content:
truncated_content = truncate_content(content, max_length)
formatted_results.append(truncated_content)
return formatted_results