get_today_news.py
10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
BroadTopicExtraction模块 - 新闻获取和收集
整合新闻API调用和数据库存储功能
"""
import sys
import asyncio
import httpx
import json
from datetime import datetime, date
from pathlib import Path
from typing import List, Dict, Optional
from loguru import logger
# 添加项目根目录到路径
project_root = Path(__file__).parent.parent
sys.path.append(str(project_root))
try:
from BroadTopicExtraction.database_manager import DatabaseManager
except ImportError as e:
raise ImportError(f"导入模块失败: {e}")
# 新闻API基础URL
BASE_URL = "https://newsnow.busiyi.world"
# 新闻源中文名称映射
SOURCE_NAMES = {
"weibo": "微博热搜",
"zhihu": "知乎热榜",
"bilibili-hot-search": "B站热搜",
"toutiao": "今日头条",
"douyin": "抖音热榜",
"github-trending-today": "GitHub趋势",
"coolapk": "酷安热榜",
"tieba": "百度贴吧",
"wallstreetcn": "华尔街见闻",
"thepaper": "澎湃新闻",
"cls-hot": "财联社",
"xueqiu": "雪球热榜"
}
class NewsCollector:
"""新闻收集器 - 整合API调用和数据库存储"""
def __init__(self):
"""初始化新闻收集器"""
self.db_manager = DatabaseManager()
self.supported_sources = list(SOURCE_NAMES.keys())
def close(self):
"""关闭资源"""
if self.db_manager:
self.db_manager.close()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
self.close()
# ==================== 新闻API调用 ====================
async def fetch_news(self, source: str) -> dict:
"""从指定源获取最新新闻"""
url = f"{BASE_URL}/api/s?id={source}&latest"
headers = {
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/124.0.0.0 Safari/537.36"
),
"Referer": BASE_URL,
"Connection": "keep-alive",
}
try:
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True) as client:
response = await client.get(url, headers=headers)
response.raise_for_status()
# 解析JSON响应
data = response.json()
return {
"source": source,
"status": "success",
"data": data,
"timestamp": datetime.now().isoformat()
}
except httpx.TimeoutException:
return {
"source": source,
"status": "timeout",
"error": f"请求超时: {source}({url})",
"timestamp": datetime.now().isoformat()
}
except httpx.HTTPStatusError as e:
return {
"source": source,
"status": "http_error",
"error": f"HTTP错误: {source}({url}) - {e.response.status_code}",
"timestamp": datetime.now().isoformat()
}
except Exception as e:
return {
"source": source,
"status": "error",
"error": f"未知错误: {source}({url}) - {str(e)}",
"timestamp": datetime.now().isoformat()
}
async def get_popular_news(self, sources: List[str] = None) -> List[dict]:
"""获取热门新闻"""
if sources is None:
sources = list(SOURCE_NAMES.keys())
logger.info(f"正在获取 {len(sources)} 个新闻源的最新内容...")
logger.info("=" * 80)
results = []
for source in sources:
source_name = SOURCE_NAMES.get(source, source)
logger.info(f"正在获取 {source_name} 的新闻...")
result = await self.fetch_news(source)
results.append(result)
if result["status"] == "success":
data = result["data"]
if 'items' in data and isinstance(data['items'], list):
count = len(data['items'])
logger.info(f"✓ {source_name}: 获取成功,共 {count} 条新闻")
else:
logger.info(f"✓ {source_name}: 获取成功")
else:
logger.error(f"✗ {source_name}: {result.get('error', '获取失败')}")
# 避免请求过快
await asyncio.sleep(0.5)
return results
# ==================== 数据处理和存储 ====================
async def collect_and_save_news(self, sources: Optional[List[str]] = None) -> Dict:
"""
收集并保存每日热点新闻
Args:
sources: 指定的新闻源列表,None表示使用所有支持的源
Returns:
包含收集结果的字典
"""
collection_summary_message = ""
collection_summary_message += "\n开始收集每日热点新闻...\n"
collection_summary_message += f"时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n"
# 选择新闻源
if sources is None:
# 使用所有支持的新闻源
sources = list(SOURCE_NAMES.keys())
collection_summary_message += f"将从 {len(sources)} 个新闻源收集数据:\n"
for source in sources:
source_name = SOURCE_NAMES.get(source, source)
collection_summary_message += f" - {source_name}\n"
logger.info(collection_summary_message)
try:
# 获取新闻数据
results = await self.get_popular_news(sources)
# 处理结果
processed_data = self._process_news_results(results)
# 保存到数据库(覆盖模式)
if processed_data['news_list']:
saved_count = self.db_manager.save_daily_news(
processed_data['news_list'],
date.today()
)
processed_data['saved_count'] = saved_count
# 打印统计信息
self._print_collection_summary(processed_data)
return processed_data
except Exception as e:
logger.exception(f"收集新闻失败: {e}")
return {
'success': False,
'error': str(e),
'news_list': [],
'total_news': 0
}
def _process_news_results(self, results: List[Dict]) -> Dict:
"""处理新闻获取结果"""
news_list = []
successful_sources = 0
total_news = 0
for result in results:
source = result['source']
status = result['status']
if status == 'success':
successful_sources += 1
data = result['data']
if 'items' in data and isinstance(data['items'], list):
source_news_count = len(data['items'])
total_news += source_news_count
# 处理该源的新闻
for i, item in enumerate(data['items'], 1):
processed_news = self._process_news_item(item, source, i)
if processed_news:
news_list.append(processed_news)
return {
'success': True,
'news_list': news_list,
'successful_sources': successful_sources,
'total_sources': len(results),
'total_news': total_news,
'collection_time': datetime.now().isoformat()
}
def _process_news_item(self, item: Dict, source: str, rank: int) -> Optional[Dict]:
"""处理单条新闻"""
try:
if isinstance(item, dict):
title = item.get('title', '无标题').strip()
url = item.get('url', '')
# 生成新闻ID
news_id = f"{source}_{item.get('id', f'rank_{rank}')}"
return {
'id': news_id,
'title': title,
'url': url,
'source': source,
'rank': rank
}
else:
# 处理字符串类型的新闻
title = str(item)[:100] if len(str(item)) > 100 else str(item)
return {
'id': f"{source}_rank_{rank}",
'title': title,
'url': '',
'source': source,
'rank': rank
}
except Exception as e:
logger.exception(f"处理新闻项失败: {e}")
return None
def _print_collection_summary(self, data: Dict):
"""打印收集摘要"""
collection_summary_message = ""
collection_summary_message += f"\n总新闻源: {data['total_sources']}\n"
collection_summary_message += f"成功源数: {data['successful_sources']}\n"
collection_summary_message += f"总新闻数: {data['total_news']}\n"
if 'saved_count' in data:
collection_summary_message += f"已保存数: {data['saved_count']}\n"
logger.info(collection_summary_message)
def get_today_news(self) -> List[Dict]:
"""获取今天的新闻"""
try:
return self.db_manager.get_daily_news(date.today())
except Exception as e:
logger.exception(f"获取今日新闻失败: {e}")
return []
async def main():
"""测试新闻收集器"""
logger.info("测试新闻收集器...")
async with NewsCollector() as collector:
# 收集新闻
result = await collector.collect_and_save_news(
sources=["weibo", "zhihu"] # 测试用,只使用两个源
)
if result['success']:
logger.info(f"收集成功!共获取 {result['total_news']} 条新闻")
else:
logger.error(f"收集失败: {result.get('error', '未知错误')}")
if __name__ == "__main__":
asyncio.run(main())