export_excel.py
14.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
"""导出小红书搜索结果和评论到 Excel。"""
from __future__ import annotations
import logging
import time
from datetime import datetime
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font, PatternFill
from .errors import NoFeedDetailError, PageNotAccessibleError, XHSError
from .feed_detail import get_feed_detail
from .search import search_feeds
from .types import Comment, Feed, FilterOption
logger = logging.getLogger(__name__)
# Sheet 名称
SHEET_FEEDS = "文章列表"
SHEET_COMMENTS = "评论列表"
# 评论查询状态
STATUS_PENDING = "" # 未查询
STATUS_SUCCESS = "成功"
STATUS_FAILED = "失败"
# 标题行样式
HEADER_FONT = Font(bold=True, color="FFFFFF")
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_ALIGNMENT = Alignment(horizontal="center", vertical="center", wrap_text=True)
class ExportExcel:
"""导出小红书数据到 Excel。"""
def __init__(self, file_path: str, keyword: str) -> None:
self.file_path = Path(file_path)
self.keyword = keyword
self.wb = None
self.feeds_sheet = None
self.comments_sheet = None
# 列定义(从 1 开始)
self.feeds_headers = [
"序号", "标题", "作者", "类型", "点赞数", "评论数", "收藏数",
"评论查询状态", "文章id", "xsec_token", "链接"
]
self.comments_headers = [
"文章id", "评论内容", "评论时间", "点赞数", "用户", "IP属地", "回复数"
]
def create(self) -> None:
"""创建新的 Excel 文件并初始化表头。"""
from openpyxl import Workbook
self.wb = Workbook()
self.wb.remove(self.wb.active) # 删除默认 sheet
# 创建文章列表 sheet
self.feeds_sheet = self.wb.create_sheet(SHEET_FEEDS)
self._write_feeds_headers()
# 创建评论列表 sheet
self.comments_sheet = self.wb.create_sheet(SHEET_COMMENTS)
self._write_comments_headers()
self.wb.save(self.file_path)
logger.info("创建 Excel 文件: %s", self.file_path)
def load(self) -> bool:
"""加载已存在的 Excel 文件。"""
if not self.file_path.exists():
return False
self.wb = load_workbook(self.file_path)
self.feeds_sheet = self.wb[SHEET_FEEDS]
self.comments_sheet = self.wb[SHEET_COMMENTS]
logger.info("加载已存在的 Excel 文件: %s", self.file_path)
return True
def _write_feeds_headers(self) -> None:
"""写入文章列表表头。"""
for col, header in enumerate(self.feeds_headers, 1):
cell = self.feeds_sheet.cell(row=1, column=col, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = HEADER_ALIGNMENT
self.feeds_sheet.freeze_panes = "A2"
# 设置列宽 (A=序号, B=标题, C=作者, D=类型, E=点赞数, F=评论数, G=收藏数, H=状态, I=文章id, J=xsec_token, K=链接)
col_widths = {
"A": 6, # 序号
"B": 40, # 标题
"C": 12, # 作者
"D": 6, # 类型
"E": 8, # 点赞数
"F": 8, # 评论数
"G": 8, # 收藏数
"H": 12, # 评论查询状态
"I": 24, # 文章id
"J": 60, # xsec_token
"K": 50, # 链接
}
for col_letter, width in col_widths.items():
self.feeds_sheet.column_dimensions[col_letter].width = width
# 设置行高
self.feeds_sheet.row_dimensions[1].height = 25
def _write_comments_headers(self) -> None:
"""写入评论列表表头。"""
for col, header in enumerate(self.comments_headers, 1):
cell = self.comments_sheet.cell(row=1, column=col, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = HEADER_ALIGNMENT
self.comments_sheet.freeze_panes = "A2"
# 设置列宽 (A=文章id, B=评论内容, C=评论时间, D=点赞数, E=用户, F=IP属地, G=回复数)
col_widths = {
"A": 24, # 文章id
"B": 60, # 评论内容
"C": 18, # 评论时间
"D": 8, # 点赞数
"E": 12, # 用户
"F": 12, # IP属地
"G": 8, # 回复数
}
for col_letter, width in col_widths.items():
self.comments_sheet.column_dimensions[col_letter].width = width
# 设置行高
self.comments_sheet.row_dimensions[1].height = 25
def _get_feeds_next_row(self) -> int:
"""获取文章列表下一个空行号。"""
return self.feeds_sheet.max_row + 1
def _get_comments_next_row(self) -> int:
"""获取评论列表下一个空行号。"""
return self.comments_sheet.max_row + 1
def append_feed(self, feed: Feed, row: int | None = None) -> int:
"""追加文章数据到 Excel。
Returns:
实际写入的行号
"""
if row is None:
row = self._get_feeds_next_row()
interact = feed.note_card.interact_info
user = feed.note_card.user
title = feed.note_card.display_title
data = [
row - 1, # 序号(从 1 开始,但Excel行从2开始)
title,
user.nickname or user.nick_name,
"视频" if feed.note_card.type == "video" else "图文",
interact.liked_count,
interact.comment_count,
interact.collected_count,
STATUS_PENDING, # 评论查询状态
feed.id,
feed.xsec_token,
f"https://www.xiaohongshu.com/explore/{feed.id}?xsec_token={feed.xsec_token}&xsec_source=pc_feed",
]
for col, value in enumerate(data, 1):
cell = self.feeds_sheet.cell(row=row, column=col, value=value)
if col == 2: # 标题列
cell.alignment = Alignment(wrap_text=True)
elif col == 11: # 链接列
cell.alignment = Alignment(wrap_text=True)
# 设置数据行行高
self.feeds_sheet.row_dimensions[row].height = 18
self.wb.save(self.file_path)
return row
def append_comment(self, feed_id: str, comment: Comment) -> int:
"""追加评论数据到 Excel。
Args:
feed_id: 文章的 feed_id,用于跳转链接
Returns:
实际写入的行号
"""
row = self._get_comments_next_row()
# 格式化评论时间
comment_time = ""
if comment.create_time:
try:
dt = datetime.fromtimestamp(comment.create_time)
comment_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
comment_time = str(comment.create_time)
data = [
feed_id,
comment.content,
comment_time,
comment.like_count,
comment.user_info.nickname or comment.user_info.nick_name,
comment.ip_location,
comment.sub_comment_count,
]
for col, value in enumerate(data, 1):
cell = self.comments_sheet.cell(row=row, column=col, value=value)
if col == 2: # 评论内容列
cell.alignment = Alignment(wrap_text=True)
elif col == 1: # feed_id 列,添加超链接跳转到文章 sheet
# 查找 feed_id 对应的行号
target_row = self._find_feed_row(feed_id)
if target_row:
cell.hyperlink = f"#'{SHEET_FEEDS}'!A{target_row}"
cell.font = Font(color="0563C1", underline="single")
# 设置数据行行高
self.comments_sheet.row_dimensions[row].height = 18
self.wb.save(self.file_path)
return row
def _find_feed_row(self, feed_id: str) -> int | None:
"""根据 feed_id 查找对应的行号。"""
for row in range(2, self.feeds_sheet.max_row + 1):
cell_feed_id = self.feeds_sheet.cell(row=row, column=9).value # feed_id 列
if cell_feed_id == feed_id:
return row
return None
def update_feed_status(self, row: int, status: str) -> None:
"""更新文章的评论查询状态。"""
status_col = 8 # 评论查询状态列
self.feeds_sheet.cell(row=row, column=status_col, value=status)
# 设置状态颜色
cell = self.feeds_sheet.cell(row=row, column=status_col)
if status == STATUS_SUCCESS:
cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
cell.font = Font(color="006100")
elif status == STATUS_FAILED:
cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
cell.font = Font(color="9C0006")
self.wb.save(self.file_path)
def get_pending_feeds(self) -> list[dict]:
"""获取所有待查询评论的文章。"""
pending = []
for row in range(2, self.feeds_sheet.max_row + 1):
status = self.feeds_sheet.cell(row=row, column=8).value
if status != STATUS_SUCCESS:
pending.append({
"row": row,
"feed_id": self.feeds_sheet.cell(row=row, column=9).value,
"xsec_token": self.feeds_sheet.cell(row=row, column=10).value,
"title": self.feeds_sheet.cell(row=row, column=2).value,
})
return pending
def get_saved_feeds_count(self) -> int:
"""获取已保存的文章数量。"""
return max(0, self.feeds_sheet.max_row - 1)
def save(self) -> None:
"""保存 Excel 文件。"""
self.wb.save(self.file_path)
def search_and_export(
page,
keyword: str,
output_file: str,
filter_option: FilterOption | None = None,
limit: int = 0,
) -> None:
"""搜索文章并导出到 Excel。
Args:
page: CDP 页面对象
keyword: 搜索关键词
output_file: 输出 Excel 文件路径
filter_option: 筛选选项
limit: 限制搜索文章数量,0 表示不限制
"""
export = ExportExcel(output_file, keyword)
if export.load():
logger.info("文件已存在,将追加新数据")
else:
export.create()
# 搜索文章
logger.info("开始搜索关键词: %s", keyword)
feeds = search_feeds(page, keyword, filter_option)
logger.info("搜索到 %d 篇文章", len(feeds))
# 限制数量
if limit > 0 and len(feeds) > limit:
feeds = feeds[:limit]
logger.info("限制为前 %d 篇", limit)
start_row = export._get_feeds_next_row() if export.load() else 2
# 追加到 Excel
for i, feed in enumerate(feeds):
row = start_row + i
export.append_feed(feed, row)
logger.info("已保存文章 [%d/%d]: %s", i + 1, len(feeds), feed.note_card.display_title[:30])
logger.info("文章搜索完成,已保存到: %s", output_file)
def poll_comments(
page,
excel_file: str,
feed_index: int | None = None,
delay: float = 1.0,
) -> dict:
"""轮询查询评论并保存到 Excel。
Args:
page: CDP 页面对象
excel_file: Excel 文件路径
feed_index: 指定文章序号(从 1 开始),None 表示查询所有未完成的
delay: 请求间隔(秒)
Returns:
统计信息 {"success": count, "failed": count}
"""
export = ExportExcel(excel_file, "")
if not export.load():
raise FileNotFoundError(f"Excel 文件不存在: {excel_file}")
pending_feeds = export.get_pending_feeds()
if feed_index is not None:
# 只查询指定文章
feed_index_0 = feed_index - 1 # 转为 0-based
pending_feeds = [
f for f in pending_feeds
if f["row"] - 2 == feed_index_0 # row 2 = index 0
]
if not pending_feeds:
# 检查是否已完成
row = feed_index + 1
status = export.feeds_sheet.cell(row=row, column=8).value
if status == STATUS_SUCCESS:
logger.info("文章 %d 的评论已查询完成", feed_index)
return {"success": 0, "failed": 0, "skipped": 1}
raise ValueError(f"文章序号 {feed_index} 不存在")
if not pending_feeds:
logger.info("所有文章的评论都已查询完成")
return {"success": 0, "failed": 0, "skipped": 0}
logger.info("开始查询 %d 篇文章的评论", len(pending_feeds))
stats = {"success": 0, "failed": 0}
for i, feed_info in enumerate(pending_feeds):
row = feed_info["row"]
feed_id = feed_info["feed_id"]
xsec_token = feed_info["xsec_token"]
title = feed_info["title"]
logger.info("[%d/%d] 查询文章评论: %s (row=%d)",
i + 1, len(pending_feeds), str(title)[:30], row)
try:
# 获取文章详情(含评论)
detail = get_feed_detail(page, feed_id, xsec_token, load_all_comments=True)
# 保存评论
for comment in detail.comments.list_:
export.append_comment(feed_id=feed_id, comment=comment)
_append_sub_comments(export, feed_id, comment.sub_comments)
# 更新状态为成功
export.update_feed_status(row, STATUS_SUCCESS)
stats["success"] += 1
logger.info("文章评论查询成功: %s", str(title)[:30])
except PageNotAccessibleError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("文章无法访问 [%s]: %s", title, e)
except NoFeedDetailError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("获取详情失败 [%s]: %s", title, e)
except XHSError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("查询失败 [%s]: %s", title, e)
except Exception as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.error("未知错误 [%s]: %s", title, e)
# 请求间隔
if i < len(pending_feeds) - 1:
time.sleep(delay)
logger.info("评论查询完成: 成功=%d, 失败=%d", stats["success"], stats["failed"])
return stats
def _append_sub_comments(
export: ExportExcel,
feed_id: str,
sub_comments: list[Comment],
) -> None:
"""递归追加子评论。"""
for comment in sub_comments:
export.append_comment(feed_id, comment)
if comment.sub_comments:
_append_sub_comments(export, feed_id, comment.sub_comments)