顾海波

【需求】添加搜索技能

... ... @@ -38,9 +38,10 @@ metadata:
1. **认证相关**("登录 / 检查登录 / 切换账号")→ 执行 `xhs-auth` 技能。
2. **内容发布**("发布 / 发帖 / 上传图文 / 上传视频")→ 执行 `xhs-publish` 技能。
3. **搜索发现**("搜索笔记 / 查看详情 / 浏览首页 / 查看用户")→ 执行 `xhs-explore` 技能。
4. **社交互动**("评论 / 回复 / 点赞 / 收藏")→ 执行 `xhs-interact` 技能。
5. **复合运营**("竞品分析 / 热点追踪 / 批量互动 / 一键创作")→ 执行 `xhs-content-ops` 技能。
3. **数据导出**("搜索并导出 / 导出评论 / 导出Excel / 批量采集评论")→ 执行 `xhs-export` 技能。
4. **搜索发现**("搜索笔记 / 查看详情 / 浏览首页 / 查看用户")→ 执行 `xhs-explore` 技能。
5. **社交互动**("评论 / 回复 / 点赞 / 收藏")→ 执行 `xhs-interact` 技能。
6. **复合运营**("竞品分析 / 热点追踪 / 批量互动 / 一键创作")→ 执行 `xhs-content-ops` 技能。
## 全局约束
... ... @@ -100,6 +101,51 @@ metadata:
组合多步骤完成运营工作流:竞品分析、热点追踪、内容创作、互动管理。
### xhs-export — 数据导出
将小红书搜索结果和评论导出到 Excel 文件,支持断点续传。
**使用场景**:当用户要求"搜索并导出"、"导出评论"、"批量采集评论"时触发。
**命令**
| 命令 | 功能 |
|------|------|
| `cli.py export-search` | 搜索文章并导出到 Excel |
| `cli.py export-comments` | 轮询查询评论并导出到 Excel |
**第一步:搜索文章并导出**
```bash
python scripts/cli.py export-search \
--keyword "关键词" \
--output /abs/path/result.xlsx \
[--sort-by 综合|最新|最多点赞|最多评论|最多收藏] \
[--note-type 不限|视频|图文] \
[--publish-time 不限|一天内|一周内|半年内]
```
**第二步:轮询查询评论**
```bash
# 查询所有未完成的文章评论
python scripts/cli.py export-comments --input /abs/path/result.xlsx
# 指定查询某篇文章(序号从1开始)
python scripts/cli.py export-comments --input /abs/path/result.xlsx --feed-index 1
# 设置请求间隔(默认1秒)
python scripts/cli.py export-comments --input /abs/path/result.xlsx --delay 2.0
```
**Excel 输出结构**
- **Sheet1 (文章列表)**:序号、标题、作者、类型、点赞数、评论数、收藏数、评论查询状态、feed_id、xsec_token、链接
- 评论查询状态:空=未查询,成功=绿色标记,失败=红色标记
- **Sheet2 (评论列表)**:文章序号、评论内容、评论时间、点赞数、用户、IP属地、回复数
**断点续传**:第一步失败后,重新运行第二步命令,会自动跳过已成功的文章,只查询未完成和失败的文章。
## 快速开始
```bash
... ...
... ... @@ -8,6 +8,7 @@ requires-python = ">=3.11"
dependencies = [
"requests>=2.28.0",
"websockets>=12.0",
"openpyxl>=3.1.0",
]
[project.optional-dependencies]
... ...
... ... @@ -669,6 +669,60 @@ def cmd_next_step(args: argparse.Namespace) -> None:
browser.close()
def cmd_export_search(args: argparse.Namespace) -> None:
"""搜索文章并导出到 Excel。"""
from xhs.export_excel import search_and_export
from xhs.types import FilterOption
filter_opt = FilterOption(
sort_by=args.sort_by or "",
note_type=args.note_type or "",
publish_time=args.publish_time or "",
search_scope=args.search_scope or "",
location=args.location or "",
)
browser, page = _connect(args)
try:
search_and_export(
page,
args.keyword,
args.output,
filter_opt,
limit=args.limit,
)
_output({
"success": True,
"keyword": args.keyword,
"output": args.output,
"message": "搜索完成,文章已保存到 Excel",
})
finally:
browser.close()
def cmd_export_comments(args: argparse.Namespace) -> None:
"""轮询查询评论并导出到 Excel。"""
from xhs.export_excel import poll_comments
browser, page = _connect(args)
try:
stats = poll_comments(
page,
args.input,
feed_index=args.feed_index if args.feed_index > 0 else None,
delay=args.delay,
)
_output({
"success": True,
"input": args.input,
"stats": stats,
"message": f"评论查询完成: 成功={stats['success']}, 失败={stats['failed']}",
})
finally:
browser.close()
def cmd_publish_video(args: argparse.Namespace) -> None:
"""发布视频内容。"""
from xhs.publish_video import publish_video_content
... ... @@ -832,6 +886,25 @@ def build_parser() -> argparse.ArgumentParser:
sub.add_argument("--visibility")
sub.set_defaults(func=cmd_publish_video)
# export-search
sub = subparsers.add_parser("export-search", help="搜索文章并导出到 Excel")
sub.add_argument("--keyword", required=True, help="搜索关键词")
sub.add_argument("--output", required=True, help="输出 Excel 文件路径")
sub.add_argument("--sort-by", help="排序: 综合、最新、最多点赞、最多评论、最多收藏")
sub.add_argument("--note-type", help="类型: 不限、视频、图文")
sub.add_argument("--publish-time", help="时间: 不限、一天内、一周内、半年内")
sub.add_argument("--search-scope", help="范围: 不限、已看过、未看过、已关注")
sub.add_argument("--location", help="位置: 不限、同城、附近")
sub.add_argument("--limit", type=int, default=0, help="限制搜索文章数量,0表示不限制 (default: 0)")
sub.set_defaults(func=cmd_export_search)
# export-comments
sub = subparsers.add_parser("export-comments", help="轮询查询评论并导出到 Excel")
sub.add_argument("--input", required=True, help="输入 Excel 文件路径")
sub.add_argument("--feed-index", type=int, default=0, help="指定文章序号(从1开始),0或不指定表示查询所有未完成的")
sub.add_argument("--delay", type=float, default=1.0, help="请求间隔秒数 (default: 1.0)")
sub.set_defaults(func=cmd_export_comments)
# fill-publish
sub = subparsers.add_parser("fill-publish", help="填写图文表单(不发布)")
sub.add_argument("--title-file", required=True)
... ...
"""导出小红书搜索结果和评论到 Excel。"""
from __future__ import annotations
import logging
import time
from datetime import datetime
from pathlib import Path
from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font, PatternFill
from .errors import NoFeedDetailError, PageNotAccessibleError, XHSError
from .feed_detail import get_feed_detail
from .search import search_feeds
from .types import Comment, Feed, FilterOption
logger = logging.getLogger(__name__)
# Sheet 名称
SHEET_FEEDS = "文章列表"
SHEET_COMMENTS = "评论列表"
# 评论查询状态
STATUS_PENDING = "" # 未查询
STATUS_SUCCESS = "成功"
STATUS_FAILED = "失败"
# 标题行样式
HEADER_FONT = Font(bold=True, color="FFFFFF")
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_ALIGNMENT = Alignment(horizontal="center", vertical="center", wrap_text=True)
class ExportExcel:
"""导出小红书数据到 Excel。"""
def __init__(self, file_path: str, keyword: str) -> None:
self.file_path = Path(file_path)
self.keyword = keyword
self.wb = None
self.feeds_sheet = None
self.comments_sheet = None
# 列定义(从 1 开始)
self.feeds_headers = [
"序号", "标题", "作者", "类型", "点赞数", "评论数", "收藏数",
"评论查询状态", "文章id", "xsec_token", "链接"
]
self.comments_headers = [
"文章id", "评论内容", "评论时间", "点赞数", "用户", "IP属地", "回复数"
]
def create(self) -> None:
"""创建新的 Excel 文件并初始化表头。"""
from openpyxl import Workbook
self.wb = Workbook()
self.wb.remove(self.wb.active) # 删除默认 sheet
# 创建文章列表 sheet
self.feeds_sheet = self.wb.create_sheet(SHEET_FEEDS)
self._write_feeds_headers()
# 创建评论列表 sheet
self.comments_sheet = self.wb.create_sheet(SHEET_COMMENTS)
self._write_comments_headers()
self.wb.save(self.file_path)
logger.info("创建 Excel 文件: %s", self.file_path)
def load(self) -> bool:
"""加载已存在的 Excel 文件。"""
if not self.file_path.exists():
return False
self.wb = load_workbook(self.file_path)
self.feeds_sheet = self.wb[SHEET_FEEDS]
self.comments_sheet = self.wb[SHEET_COMMENTS]
logger.info("加载已存在的 Excel 文件: %s", self.file_path)
return True
def _write_feeds_headers(self) -> None:
"""写入文章列表表头。"""
for col, header in enumerate(self.feeds_headers, 1):
cell = self.feeds_sheet.cell(row=1, column=col, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = HEADER_ALIGNMENT
self.feeds_sheet.freeze_panes = "A2"
# 设置列宽 (A=序号, B=标题, C=作者, D=类型, E=点赞数, F=评论数, G=收藏数, H=状态, I=文章id, J=xsec_token, K=链接)
col_widths = {
"A": 6, # 序号
"B": 40, # 标题
"C": 12, # 作者
"D": 6, # 类型
"E": 8, # 点赞数
"F": 8, # 评论数
"G": 8, # 收藏数
"H": 12, # 评论查询状态
"I": 24, # 文章id
"J": 60, # xsec_token
"K": 50, # 链接
}
for col_letter, width in col_widths.items():
self.feeds_sheet.column_dimensions[col_letter].width = width
# 设置行高
self.feeds_sheet.row_dimensions[1].height = 25
def _write_comments_headers(self) -> None:
"""写入评论列表表头。"""
for col, header in enumerate(self.comments_headers, 1):
cell = self.comments_sheet.cell(row=1, column=col, value=header)
cell.font = HEADER_FONT
cell.fill = HEADER_FILL
cell.alignment = HEADER_ALIGNMENT
self.comments_sheet.freeze_panes = "A2"
# 设置列宽 (A=文章id, B=评论内容, C=评论时间, D=点赞数, E=用户, F=IP属地, G=回复数)
col_widths = {
"A": 24, # 文章id
"B": 60, # 评论内容
"C": 18, # 评论时间
"D": 8, # 点赞数
"E": 12, # 用户
"F": 12, # IP属地
"G": 8, # 回复数
}
for col_letter, width in col_widths.items():
self.comments_sheet.column_dimensions[col_letter].width = width
# 设置行高
self.comments_sheet.row_dimensions[1].height = 25
def _get_feeds_next_row(self) -> int:
"""获取文章列表下一个空行号。"""
return self.feeds_sheet.max_row + 1
def _get_comments_next_row(self) -> int:
"""获取评论列表下一个空行号。"""
return self.comments_sheet.max_row + 1
def append_feed(self, feed: Feed, row: int | None = None) -> int:
"""追加文章数据到 Excel。
Returns:
实际写入的行号
"""
if row is None:
row = self._get_feeds_next_row()
interact = feed.note_card.interact_info
user = feed.note_card.user
title = feed.note_card.display_title
data = [
row - 1, # 序号(从 1 开始,但Excel行从2开始)
title,
user.nickname or user.nick_name,
"视频" if feed.note_card.type == "video" else "图文",
interact.liked_count,
interact.comment_count,
interact.collected_count,
STATUS_PENDING, # 评论查询状态
feed.id,
feed.xsec_token,
f"https://www.xiaohongshu.com/explore/{feed.id}?xsec_token={feed.xsec_token}&xsec_source=pc_feed",
]
for col, value in enumerate(data, 1):
cell = self.feeds_sheet.cell(row=row, column=col, value=value)
if col == 2: # 标题列
cell.alignment = Alignment(wrap_text=True)
elif col == 11: # 链接列
cell.alignment = Alignment(wrap_text=True)
# 设置数据行行高
self.feeds_sheet.row_dimensions[row].height = 18
self.wb.save(self.file_path)
return row
def append_comment(self, feed_id: str, comment: Comment) -> int:
"""追加评论数据到 Excel。
Args:
feed_id: 文章的 feed_id,用于跳转链接
Returns:
实际写入的行号
"""
row = self._get_comments_next_row()
# 格式化评论时间
comment_time = ""
if comment.create_time:
try:
dt = datetime.fromtimestamp(comment.create_time)
comment_time = dt.strftime("%Y-%m-%d %H:%M:%S")
except (ValueError, OSError):
comment_time = str(comment.create_time)
data = [
feed_id,
comment.content,
comment_time,
comment.like_count,
comment.user_info.nickname or comment.user_info.nick_name,
comment.ip_location,
comment.sub_comment_count,
]
for col, value in enumerate(data, 1):
cell = self.comments_sheet.cell(row=row, column=col, value=value)
if col == 2: # 评论内容列
cell.alignment = Alignment(wrap_text=True)
elif col == 1: # feed_id 列,添加超链接跳转到文章 sheet
# 查找 feed_id 对应的行号
target_row = self._find_feed_row(feed_id)
if target_row:
cell.hyperlink = f"#'{SHEET_FEEDS}'!A{target_row}"
cell.font = Font(color="0563C1", underline="single")
# 设置数据行行高
self.comments_sheet.row_dimensions[row].height = 18
self.wb.save(self.file_path)
return row
def _find_feed_row(self, feed_id: str) -> int | None:
"""根据 feed_id 查找对应的行号。"""
for row in range(2, self.feeds_sheet.max_row + 1):
cell_feed_id = self.feeds_sheet.cell(row=row, column=9).value # feed_id 列
if cell_feed_id == feed_id:
return row
return None
def update_feed_status(self, row: int, status: str) -> None:
"""更新文章的评论查询状态。"""
status_col = 8 # 评论查询状态列
self.feeds_sheet.cell(row=row, column=status_col, value=status)
# 设置状态颜色
cell = self.feeds_sheet.cell(row=row, column=status_col)
if status == STATUS_SUCCESS:
cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
cell.font = Font(color="006100")
elif status == STATUS_FAILED:
cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
cell.font = Font(color="9C0006")
self.wb.save(self.file_path)
def get_pending_feeds(self) -> list[dict]:
"""获取所有待查询评论的文章。"""
pending = []
for row in range(2, self.feeds_sheet.max_row + 1):
status = self.feeds_sheet.cell(row=row, column=8).value
if status != STATUS_SUCCESS:
pending.append({
"row": row,
"feed_id": self.feeds_sheet.cell(row=row, column=9).value,
"xsec_token": self.feeds_sheet.cell(row=row, column=10).value,
"title": self.feeds_sheet.cell(row=row, column=2).value,
})
return pending
def get_saved_feeds_count(self) -> int:
"""获取已保存的文章数量。"""
return max(0, self.feeds_sheet.max_row - 1)
def save(self) -> None:
"""保存 Excel 文件。"""
self.wb.save(self.file_path)
def search_and_export(
page,
keyword: str,
output_file: str,
filter_option: FilterOption | None = None,
limit: int = 0,
) -> None:
"""搜索文章并导出到 Excel。
Args:
page: CDP 页面对象
keyword: 搜索关键词
output_file: 输出 Excel 文件路径
filter_option: 筛选选项
limit: 限制搜索文章数量,0 表示不限制
"""
export = ExportExcel(output_file, keyword)
if export.load():
logger.info("文件已存在,将追加新数据")
else:
export.create()
# 搜索文章
logger.info("开始搜索关键词: %s", keyword)
feeds = search_feeds(page, keyword, filter_option)
logger.info("搜索到 %d 篇文章", len(feeds))
# 限制数量
if limit > 0 and len(feeds) > limit:
feeds = feeds[:limit]
logger.info("限制为前 %d 篇", limit)
start_row = export._get_feeds_next_row() if export.load() else 2
# 追加到 Excel
for i, feed in enumerate(feeds):
row = start_row + i
export.append_feed(feed, row)
logger.info("已保存文章 [%d/%d]: %s", i + 1, len(feeds), feed.note_card.display_title[:30])
logger.info("文章搜索完成,已保存到: %s", output_file)
def poll_comments(
page,
excel_file: str,
feed_index: int | None = None,
delay: float = 1.0,
) -> dict:
"""轮询查询评论并保存到 Excel。
Args:
page: CDP 页面对象
excel_file: Excel 文件路径
feed_index: 指定文章序号(从 1 开始),None 表示查询所有未完成的
delay: 请求间隔(秒)
Returns:
统计信息 {"success": count, "failed": count}
"""
export = ExportExcel(excel_file, "")
if not export.load():
raise FileNotFoundError(f"Excel 文件不存在: {excel_file}")
pending_feeds = export.get_pending_feeds()
if feed_index is not None:
# 只查询指定文章
feed_index_0 = feed_index - 1 # 转为 0-based
pending_feeds = [
f for f in pending_feeds
if f["row"] - 2 == feed_index_0 # row 2 = index 0
]
if not pending_feeds:
# 检查是否已完成
row = feed_index + 1
status = export.feeds_sheet.cell(row=row, column=8).value
if status == STATUS_SUCCESS:
logger.info("文章 %d 的评论已查询完成", feed_index)
return {"success": 0, "failed": 0, "skipped": 1}
raise ValueError(f"文章序号 {feed_index} 不存在")
if not pending_feeds:
logger.info("所有文章的评论都已查询完成")
return {"success": 0, "failed": 0, "skipped": 0}
logger.info("开始查询 %d 篇文章的评论", len(pending_feeds))
stats = {"success": 0, "failed": 0}
for i, feed_info in enumerate(pending_feeds):
row = feed_info["row"]
feed_id = feed_info["feed_id"]
xsec_token = feed_info["xsec_token"]
title = feed_info["title"]
logger.info("[%d/%d] 查询文章评论: %s (row=%d)",
i + 1, len(pending_feeds), str(title)[:30], row)
try:
# 获取文章详情(含评论)
detail = get_feed_detail(page, feed_id, xsec_token, load_all_comments=True)
# 保存评论
for comment in detail.comments.list_:
export.append_comment(feed_id=feed_id, comment=comment)
_append_sub_comments(export, feed_id, comment.sub_comments)
# 更新状态为成功
export.update_feed_status(row, STATUS_SUCCESS)
stats["success"] += 1
logger.info("文章评论查询成功: %s", str(title)[:30])
except PageNotAccessibleError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("文章无法访问 [%s]: %s", title, e)
except NoFeedDetailError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("获取详情失败 [%s]: %s", title, e)
except XHSError as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.warning("查询失败 [%s]: %s", title, e)
except Exception as e:
export.update_feed_status(row, STATUS_FAILED)
stats["failed"] += 1
logger.error("未知错误 [%s]: %s", title, e)
# 请求间隔
if i < len(pending_feeds) - 1:
time.sleep(delay)
logger.info("评论查询完成: 成功=%d, 失败=%d", stats["success"], stats["failed"])
return stats
def _append_sub_comments(
export: ExportExcel,
feed_id: str,
sub_comments: list[Comment],
) -> None:
"""递归追加子评论。"""
for comment in sub_comments:
export.append_comment(feed_id, comment)
if comment.sub_comments:
_append_sub_comments(export, feed_id, comment.sub_comments)
... ...
... ... @@ -31,6 +31,7 @@ from .human import (
)
from .selectors import (
ACCESS_ERROR_WRAPPER,
COMMENTS_CONTAINER,
END_CONTAINER,
NO_COMMENTS_TEXT,
PARENT_COMMENT,
... ... @@ -236,6 +237,11 @@ def _load_all_comments(page: Page, config: CommentLoadConfig) -> None:
_scroll_to_comments_area(page)
sleep_random(*HUMAN_DELAY)
# 检查评论区容器是否存在(下架/违规帖子可能没有评论区)
if page.get_elements_count(COMMENTS_CONTAINER) == 0:
logger.info("评论区容器不存在,可能是下架帖子,跳过加载")
return
# 检查是否无评论
if _check_no_comments(page):
logger.info("检测到无评论区域,跳过加载")
... ...
... ... @@ -94,6 +94,15 @@ wheels = [
]
[[package]]
name = "et-xmlfile"
version = "2.0.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/d3/38/af70d7ab1ae9d4da450eeec1fa3918940a5fafb9055e934af8d6eb0c2313/et_xmlfile-2.0.0.tar.gz", hash = "sha256:dab3f4764309081ce75662649be815c4c9081e88f0837825f90fd28317d4da54", size = 17234, upload-time = "2024-10-25T17:25:40.039Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c1/8b/5fe2cc11fee489817272089c4203e679c63b570a5aaeb18d852ae3cbba6a/et_xmlfile-2.0.0-py3-none-any.whl", hash = "sha256:7a91720bc756843502c3b7504c77b8fe44217c85c537d85037f0f536151b2caa", size = 18059, upload-time = "2024-10-25T17:25:39.051Z" },
]
[[package]]
name = "idna"
version = "3.11"
source = { registry = "https://pypi.org/simple" }
... ... @@ -112,6 +121,18 @@ wheels = [
]
[[package]]
name = "openpyxl"
version = "3.1.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "et-xmlfile" },
]
sdist = { url = "https://files.pythonhosted.org/packages/3d/f9/88d94a75de065ea32619465d2f77b29a0469500e99012523b91cc4141cd1/openpyxl-3.1.5.tar.gz", hash = "sha256:cf0e3cf56142039133628b5acffe8ef0c12bc902d2aadd3e0fe5878dc08d1050", size = 186464, upload-time = "2024-06-28T14:03:44.161Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" },
]
[[package]]
name = "packaging"
version = "26.0"
source = { registry = "https://pypi.org/simple" }
... ... @@ -267,6 +288,7 @@ name = "xiaohongshu-skills"
version = "0.1.0"
source = { virtual = "." }
dependencies = [
{ name = "openpyxl" },
{ name = "requests" },
{ name = "websockets" },
]
... ... @@ -284,6 +306,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "openpyxl", specifier = ">=3.1.0" },
{ name = "pytest", marker = "extra == 'dev'", specifier = ">=8.0" },
{ name = "requests", specifier = ">=2.28.0" },
{ name = "ruff", marker = "extra == 'dev'", specifier = ">=0.9.0" },
... ...