text_processing.py
5.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""
文本处理工具函数
用于清理LLM输出、解析JSON等
"""
import re
import json
from typing import Dict, Any, List
from json.decoder import JSONDecodeError
def clean_json_tags(text: str) -> str:
"""
清理文本中的JSON标签
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除```json 和 ```标签
text = re.sub(r'```json\s*', '', text)
text = re.sub(r'```\s*$', '', text)
text = re.sub(r'```', '', text)
return text.strip()
def clean_markdown_tags(text: str) -> str:
"""
清理文本中的Markdown标签
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除```markdown 和 ```标签
text = re.sub(r'```markdown\s*', '', text)
text = re.sub(r'```\s*$', '', text)
text = re.sub(r'```', '', text)
return text.strip()
def remove_reasoning_from_output(text: str) -> str:
"""
移除输出中的推理过程文本
Args:
text: 原始文本
Returns:
清理后的文本
"""
# 移除常见的推理标识
patterns = [
r'(?:reasoning|推理|思考|分析)[::]\s*.*?(?=\{|\[)', # 移除推理部分
r'(?:explanation|解释|说明)[::]\s*.*?(?=\{|\[)', # 移除解释部分
r'^.*?(?=\{|\[)', # 移除JSON前的所有文本
]
for pattern in patterns:
text = re.sub(pattern, '', text, flags=re.IGNORECASE | re.DOTALL)
return text.strip()
def extract_clean_response(text: str) -> Dict[str, Any]:
"""
提取并清理响应中的JSON内容
Args:
text: 原始响应文本
Returns:
解析后的JSON字典
"""
# 清理文本
cleaned_text = clean_json_tags(text)
cleaned_text = remove_reasoning_from_output(cleaned_text)
# 尝试直接解析
try:
return json.loads(cleaned_text)
except JSONDecodeError:
pass
# 尝试查找JSON对象
json_pattern = r'\{.*\}'
match = re.search(json_pattern, cleaned_text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except JSONDecodeError:
pass
# 尝试查找JSON数组
array_pattern = r'\[.*\]'
match = re.search(array_pattern, cleaned_text, re.DOTALL)
if match:
try:
return json.loads(match.group())
except JSONDecodeError:
pass
# 如果所有方法都失败,返回错误信息
print(f"无法解析JSON响应: {cleaned_text[:200]}...")
return {"error": "JSON解析失败", "raw_text": cleaned_text}
def update_state_with_search_results(search_results: List[Dict[str, Any]],
paragraph_index: int, state: Any) -> Any:
"""
将搜索结果更新到状态中
Args:
search_results: 搜索结果列表
paragraph_index: 段落索引
state: 状态对象
Returns:
更新后的状态对象
"""
if 0 <= paragraph_index < len(state.paragraphs):
# 获取最后一次搜索的查询(假设是当前查询)
current_query = ""
if search_results:
# 从搜索结果推断查询(这里需要改进以获取实际查询)
current_query = "搜索查询"
# 添加搜索结果到状态
state.paragraphs[paragraph_index].research.add_search_results(
current_query, search_results
)
return state
def validate_json_schema(data: Dict[str, Any], required_fields: List[str]) -> bool:
"""
验证JSON数据是否包含必需字段
Args:
data: 要验证的数据
required_fields: 必需字段列表
Returns:
验证是否通过
"""
return all(field in data for field in required_fields)
def truncate_content(content: str, max_length: int = 20000) -> str:
"""
截断内容到指定长度
Args:
content: 原始内容
max_length: 最大长度
Returns:
截断后的内容
"""
if len(content) <= max_length:
return content
# 尝试在单词边界截断
truncated = content[:max_length]
last_space = truncated.rfind(' ')
if last_space > max_length * 0.8: # 如果最后一个空格位置合理
return truncated[:last_space] + "..."
else:
return truncated + "..."
def format_search_results_for_prompt(search_results: List[Dict[str, Any]],
max_length: int = 20000) -> List[str]:
"""
格式化搜索结果用于提示词
Args:
search_results: 搜索结果列表
max_length: 每个结果的最大长度
Returns:
格式化后的内容列表
"""
formatted_results = []
for result in search_results:
content = result.get('content', '')
if content:
truncated_content = truncate_content(content, max_length)
formatted_results.append(truncated_content)
return formatted_results