zy
Committed by GitHub

功能: 重写小红书 Skills,完整迁移为 CDP Python 实现 (#1)

## 主要变更

### 核心模块重写
- 创建 scripts/xhs/ 包,包含 18 个专业模块(3728 行代码)
- 基于 xiaohongshu-mcp Go 源码完整实现
- CDP WebSocket 直接通信,替代第三方库依赖

### 模块清单
- cdp.py: Browser/Page/Element 类,完整 CDP 协议实现
- stealth.py: 反检测 JS 注入 + Chrome 启动参数
- login.py: 登录检查与二维码登录(QR 码保存到临时文件供 Agent 显示)
- publish.py: 图文发布完整流程
- publish_video.py: 视频发布完整流程
- search.py: 搜索与内容筛选
- feed_detail.py: 笔记详情与评论加载
- comment.py: 评论与回复
- like_favorite.py: 点赞与收藏
- user_profile.py: 用户主页
- cookies.py: Cookie 持久化
- types.py: 完整的 dataclass 数据类型系统
- errors.py: 自定义异常体系
- human.py: 人类行为模拟(延迟、滚动)
- selectors.py: CSS 选择器常量
- urls.py: URL 构建函数

### CLI 统一接口
- scripts/cli.py: 13 个子命令,完全兼容 xiaohongshu-mcp MCP 工具
- check-login: 检查登录状态
- login: 获取登录二维码
- switch-account/delete-cookies: 账号切换
- publish-content: 图文发布
- publish-with-video: 视频发布
- list-feeds: Feed 列表
- search-feeds: Feed 搜索
- get-feed-detail: 笔记详情
- user-profile: 用户主页
- post-comment: 发送评论
- like-feed: 点赞笔记
- favorite-feed: 收藏笔记

### 支持脚本重写
- chrome_launcher.py: Chrome 进程管理(跨平台)
- account_manager.py: 多账号 Profile 隔离
- image_downloader.py: 图片/视频下载(SHA256 缓存)
- title_utils.py: UTF-16 标题长度计算
- run_lock.py: 单实例锁机制
- publish_pipeline.py: 发布流程编排 CLI

### 文档与配置
- SKILL.md: 统一技能入口(路由到 5 个子技能)
- skills/xhs-auth/SKILL.md: 认证管理技能
- skills/xhs-publish/SKILL.md: 内容发布技能(图文+视频)
- skills/xhs-explore/SKILL.md: 内容发现与分析技能
- skills/xhs-interact/SKILL.md: 社交互动技能(评论/点赞/收藏)
- skills/xhs-content-ops/SKILL.md: 复合内容运营工作流技能
- CLAUDE.md: 项目开发指南
- PROMPT.md: Ralph Loop 驱动文件
- pyproject.toml: uv 项目配置(uv.lock)
- README.md: 完整项目文档

### 技术栈
- Python 3.11+ with uv 包管理
- requests + websockets: CDP WebSocket 通信
- 代码规范: ruff lint + format

## 对应关系
所有 13 个子命令与 xiaohongshu-mcp MCP 工具完全对应
支持 OpenClaw agent 框架直接调用

## 前置工作
- 创建 scripts/xhs/ 包架构
- 实现 CDP WebSocket 协议
- 完整的类型系统和错误处理
- CLI 子命令系统

Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>

Too many changes to show.

To preserve performance only 30 of 30+ files are displayed.

... ... @@ -205,3 +205,15 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/
# Project specific
tmp/
*.txt
!requirements.txt
config/accounts.json
title.txt
content.txt
comment.txt
# Ralph Loop state
.claude/.ralph-loop.local.md
... ...
# xiaohongshu-skills
小红书自动化 Claude Code Skills,基于 Python CDP 浏览器自动化引擎。
为 OpenClaw 生态提供小红书操作能力,同时支持 Claude Code skills 格式。
## 项目结构
```
xiaohongshu-skills/
├── scripts/ # Python CDP 自动化引擎
│ ├── xhs/ # 核心 XHS 自动化包
│ │ ├── __init__.py
│ │ ├── cdp.py # CDP WebSocket 客户端(Browser, Page, Element)
│ │ ├── stealth.py # 反检测 JS 注入 + Chrome 启动参数
│ │ ├── cookies.py # Cookie 文件持久化
│ │ ├── types.py # 数据类型(dataclass)
│ │ ├── errors.py # 异常体系
│ │ ├── selectors.py # CSS 选择器常量
│ │ ├── urls.py # URL 常量和构建函数
│ │ ├── human.py # 人类行为模拟(延迟、滚动)
│ │ ├── login.py # 登录检查、二维码登录
│ │ ├── feeds.py # 首页 Feed 列表
│ │ ├── search.py # 搜索 + 筛选
│ │ ├── feed_detail.py # 笔记详情 + 评论加载
│ │ ├── user_profile.py # 用户主页
│ │ ├── comment.py # 评论、回复
│ │ ├── like_favorite.py # 点赞、收藏
│ │ ├── publish.py # 图文发布
│ │ └── publish_video.py # 视频发布
│ ├── cli.py # 统一 CLI 入口(13 个子命令)
│ ├── chrome_launcher.py # Chrome 进程管理
│ ├── account_manager.py # 多账号管理
│ ├── image_downloader.py # 媒体下载(SHA256 缓存)
│ ├── title_utils.py # UTF-16 标题长度计算
│ ├── run_lock.py # 单实例锁
│ └── publish_pipeline.py # 发布编排器
├── skills/ # Claude Code Skills 定义
│ ├── xhs-auth/SKILL.md # 认证管理
│ ├── xhs-publish/SKILL.md # 内容发布(图文+视频)
│ ├── xhs-explore/SKILL.md # 内容发现与分析
│ ├── xhs-interact/SKILL.md # 社交互动(评论/点赞/收藏)
│ └── xhs-content-ops/SKILL.md # 复合内容运营工作流
├── pyproject.toml # uv 项目配置
├── SKILL.md # 统一入口(路由到子技能)
├── CLAUDE.md # 本文件
├── PROMPT.md # Ralph Loop 驱动文件
└── README.md
```
## 技术栈
- **Python**: >=3.11
- **包管理**: uv
- **依赖**: requests + websockets(直接 CDP WebSocket 通信)
- **浏览器**: Chrome(通过 CDP 远程调试协议控制)
- **代码规范**: ruff(lint + format)
- **数据提取**: `window.__INITIAL_STATE__`(与 Go 源码一致)
## 开发命令
```bash
uv sync # 安装依赖
uv run ruff check . # Lint 检查
uv run ruff format . # 代码格式化
uv run pytest # 运行测试
```
## 架构设计
### 双层结构
1. **scripts/ — Python CDP 引擎**
- 基于 xiaohongshu-mcp Go 源码从零重写
- `xhs/` 包:模块化的核心自动化库
- `cli.py`:统一 CLI 入口,13 个子命令对应 MCP 工具
- JSON 结构化输出,便于 agent 解析
- 多账号支持,独立 Chrome Profile 隔离
- 反检测保护(stealth flags + JS 注入)
2. **skills/ — Claude Code Skills 定义**
- SKILL.md 格式,指导 Claude 如何调用 scripts/
- 包含输入判断、约束规则、工作流程、失败处理
### 调用方式
```bash
# 统一 CLI 入口
python scripts/cli.py check-login
python scripts/cli.py search-feeds --keyword "关键词"
python scripts/cli.py publish --title-file t.txt --content-file c.txt --images pic.jpg
# 发布流水线(含图片下载和登录检查)
python scripts/publish_pipeline.py --title-file t.txt --content-file c.txt --images URL1
```
## 代码规范
### Python 风格
- 遵循 PEP 8,使用 ruff 强制执行
- 完整的 type hints(PEP 484),使用 `str | None` 语法
- 公共函数和类必须有 docstring
- 行长度上限 100 字符
- 使用 `from __future__ import annotations` 启用延迟注解
### 命名约定
- 文件名:snake_case
- 类名:PascalCase
- 函数/变量:snake_case
- 常量:UPPER_SNAKE_CASE
### 错误处理
- 自定义异常类继承自 `XHSError` 基类(`xhs/errors.py`
- CLI 命令使用结构化 exit code:0=成功,1=未登录,2=错误
- 所有用户可见的错误信息使用中文
### 安全约束
- 发布类操作必须有用户确认机制
- 文件路径必须使用绝对路径
- 不在命令行参数中内联敏感内容(使用文件传递)
- Chrome Profile 目录隔离账号 cookies
## 参考资源
- **xiaohongshu-mcp Go 源码**: /Users/zy/src/zy/xiaohongshu-mcp/
## MCP 工具对照表
scripts/cli.py 的 13 个子命令对应 xiaohongshu-mcp 的 MCP 工具:
| CLI 子命令 | MCP 工具 | 分类 |
|--|--|--|
| `check-login` | check_login_status | 认证 |
| `login` | get_login_qrcode | 认证 |
| `delete-cookies` | delete_cookies | 认证 |
| `list-feeds` | list_feeds | 浏览 |
| `search-feeds` | search_feeds | 浏览 |
| `get-feed-detail` | get_feed_detail | 浏览 |
| `user-profile` | user_profile | 浏览 |
| `post-comment` | post_comment_to_feed | 互动 |
| `reply-comment` | reply_comment_in_feed | 互动 |
| `like-feed` | like_feed | 互动 |
| `favorite-feed` | favorite_feed | 互动 |
| `publish` | publish_content | 发布 |
| `publish-video` | publish_with_video | 发布 |
... ...
# 小红书 Skills 开发任务
## 目标
基于 xiaohongshu-mcp Go 源码,从零重写 Python CDP 引擎,为 OpenClaw 生态构建完整的小红书自动化 Skills。
## 参考资料
- **xiaohongshu-mcp Go 源码**: `/Users/zy/src/zy/xiaohongshu-mcp/` — 10k stars,13 个 MCP 工具
- **xiaohongshu-mcp 数据结构**: `/Users/zy/src/zy/xiaohongshu-mcp/xiaohongshu/types.go`
- **xiaohongshu-mcp 工具定义**: `/Users/zy/src/zy/xiaohongshu-mcp/mcp_server.go`
## 架构
### 模块结构
```
scripts/
├── xhs/ # 核心 XHS 自动化包
│ ├── cdp.py # CDP WebSocket 客户端
│ ├── stealth.py # 反检测 JS 注入 + Chrome 启动参数
│ ├── cookies.py # Cookie 文件持久化
│ ├── types.py # 数据类型(dataclass)
│ ├── errors.py # 异常体系
│ ├── selectors.py # CSS 选择器常量
│ ├── urls.py # URL 常量
│ ├── human.py # 人类行为模拟
│ ├── login.py # 登录
│ ├── feeds.py # 首页 Feed
│ ├── search.py # 搜索 + 筛选
│ ├── feed_detail.py # 笔记详情 + 评论加载
│ ├── user_profile.py # 用户主页
│ ├── comment.py # 评论、回复
│ ├── like_favorite.py # 点赞、收藏
│ ├── publish.py # 图文发布
│ └── publish_video.py # 视频发布
├── cli.py # 统一 CLI 入口(13 个子命令)
├── chrome_launcher.py # Chrome 进程管理
├── account_manager.py # 多账号管理
├── image_downloader.py # 媒体下载(SHA256 缓存)
├── title_utils.py # UTF-16 标题长度计算
├── run_lock.py # 单实例锁
└── publish_pipeline.py # 发布编排器
```
### CLI 接口(对应 Go 的 13 个 MCP 工具)
```bash
python scripts/cli.py check-login
python scripts/cli.py login
python scripts/cli.py delete-cookies
python scripts/cli.py list-feeds
python scripts/cli.py search-feeds --keyword "关键词" [--sort-by --note-type ...]
python scripts/cli.py get-feed-detail --feed-id ID --xsec-token TOKEN [--load-all-comments]
python scripts/cli.py user-profile --user-id ID --xsec-token TOKEN
python scripts/cli.py post-comment --feed-id ID --xsec-token TOKEN --content "内容"
python scripts/cli.py reply-comment --feed-id ID --xsec-token TOKEN --content "内容" [--comment-id | --user-id]
python scripts/cli.py like-feed --feed-id ID --xsec-token TOKEN [--unlike]
python scripts/cli.py favorite-feed --feed-id ID --xsec-token TOKEN [--unfavorite]
python scripts/cli.py publish --title-file T --content-file C --images P1 P2 [--tags --schedule-at --visibility]
python scripts/cli.py publish-video --title-file T --content-file C --video P [--tags --schedule-at]
```
全局选项:`--host`, `--port`, `--account`
输出:JSON(`ensure_ascii=False`
退出码:0=成功,1=未登录,2=错误
## 代码规范要求
- Python 代码必须通过 `ruff check` 和 `ruff format`
- 完整的 type hints(PEP 484),使用 `str | None` 而非 `Optional[str]`
- 公共函数和类必须有 docstring
- 行长度上限 100 字符
- 使用 `from __future__ import annotations` 启用延迟注解
- 异常类统一继承自 `XHSError`
- CLI 使用 argparse,exit code: 0=成功,1=未登录,2=错误
- JSON 输出使用 `ensure_ascii=False` 保留中文
## 完成标志
当以下条件全部满足时,输出完成标志:
1. `xhs/` 包 17 个模块已全部创建
2. `cli.py` 13 个子命令已实现
3. 5 个支撑脚本已重写
4. 5 个 `skills/*/SKILL.md` 已更新
5. 根目录 `SKILL.md`、`CLAUDE.md`、`README.md` 已更新
6. `uv run ruff check .` 无错误
7. `uv run ruff format --check .` 无差异
<promise>ALL SKILLS COMPLETE</promise>
... ...
# xiaohongshu-skills
xiaohongshu-skills
小红书自动化 Claude Code Skills,基于 Python CDP 浏览器自动化引擎。
为 OpenClaw 生态提供小红书操作能力,同时兼容 Claude Code Skills 格式。
## 功能概览
| 技能 | 说明 | 核心命令 |
|------|------|----------|
| **xhs-auth** | 认证管理 | `check-login`, `login`, `delete-cookies` |
| **xhs-publish** | 内容发布 | `publish`, `publish-video` |
| **xhs-explore** | 内容发现 | `list-feeds`, `search-feeds`, `get-feed-detail`, `user-profile` |
| **xhs-interact** | 社交互动 | `post-comment`, `reply-comment`, `like-feed`, `favorite-feed` |
| **xhs-content-ops** | 复合运营 | 竞品分析、热点追踪、内容创作、互动管理 |
## 安装
```bash
# 克隆项目
git clone https://github.com/autoclaw-cc/xiaohongshu-skills.git
cd xiaohongshu-skills
# 安装依赖(需要 uv)
uv sync
```
### 前置条件
- Python >= 3.11
- [uv](https://docs.astral.sh/uv/) 包管理器
- Google Chrome 浏览器
## 快速开始
### 1. 启动 Chrome
```bash
# 有窗口模式(推荐首次登录)
python scripts/chrome_launcher.py
# 无头模式
python scripts/chrome_launcher.py --headless
```
### 2. 登录小红书
```bash
# 检查登录状态
python scripts/cli.py check-login
# 登录(扫码)
python scripts/cli.py login
```
### 3. 搜索笔记
```bash
python scripts/cli.py search-feeds --keyword "关键词"
# 带筛选
python scripts/cli.py search-feeds \
--keyword "关键词" --sort-by 最新 --note-type 图文
```
### 4. 查看笔记详情
```bash
python scripts/cli.py get-feed-detail \
--feed-id FEED_ID --xsec-token XSEC_TOKEN
```
### 5. 发布内容
```bash
# 图文发布
python scripts/cli.py publish \
--title-file title.txt \
--content-file content.txt \
--images "/abs/path/pic1.jpg" "/abs/path/pic2.jpg"
# 视频发布
python scripts/cli.py publish-video \
--title-file title.txt \
--content-file content.txt \
--video "/abs/path/video.mp4"
```
### 6. 社交互动
```bash
# 发表评论
python scripts/cli.py post-comment \
--feed-id FEED_ID \
--xsec-token XSEC_TOKEN \
--content "评论内容"
# 点赞
python scripts/cli.py like-feed \
--feed-id FEED_ID --xsec-token XSEC_TOKEN
# 收藏
python scripts/cli.py favorite-feed \
--feed-id FEED_ID --xsec-token XSEC_TOKEN
```
## CLI 命令参考
所有命令通过 `scripts/cli.py` 统一入口调用,输出 JSON 格式。
全局选项:
- `--host HOST` — Chrome 调试主机(默认 127.0.0.1)
- `--port PORT` — Chrome 调试端口(默认 9222)
- `--account NAME` — 指定账号
| 子命令 | 说明 |
|--------|------|
| `check-login` | 检查登录状态 |
| `login` | 获取登录二维码,等待扫码 |
| `delete-cookies` | 清除 cookies |
| `list-feeds` | 获取首页推荐 Feed |
| `search-feeds` | 关键词搜索笔记 |
| `get-feed-detail` | 获取笔记详情和评论 |
| `user-profile` | 获取用户主页信息 |
| `post-comment` | 对笔记发表评论 |
| `reply-comment` | 回复指定评论 |
| `like-feed` | 点赞 / 取消点赞 |
| `favorite-feed` | 收藏 / 取消收藏 |
| `publish` | 发布图文内容 |
| `publish-video` | 发布视频内容 |
退出码:0=成功,1=未登录,2=错误
## 项目结构
```
xiaohongshu-skills/
├── scripts/ # Python CDP 自动化引擎
│ ├── xhs/ # 核心自动化包(模块化)
│ │ ├── cdp.py # CDP WebSocket 客户端
│ │ ├── stealth.py # 反检测保护
│ │ ├── cookies.py # Cookie 持久化
│ │ ├── types.py # 数据类型
│ │ ├── errors.py # 异常体系
│ │ ├── selectors.py # CSS 选择器
│ │ ├── urls.py # URL 常量
│ │ ├── human.py # 人类行为模拟
│ │ ├── login.py # 登录
│ │ ├── feeds.py # 首页 Feed
│ │ ├── search.py # 搜索
│ │ ├── feed_detail.py # 笔记详情
│ │ ├── user_profile.py # 用户主页
│ │ ├── comment.py # 评论
│ │ ├── like_favorite.py # 点赞/收藏
│ │ ├── publish.py # 图文发布
│ │ └── publish_video.py # 视频发布
│ ├── cli.py # 统一 CLI(13 个子命令)
│ ├── chrome_launcher.py # Chrome 进程管理
│ ├── account_manager.py # 多账号管理
│ ├── image_downloader.py # 媒体下载
│ ├── title_utils.py # 标题长度计算
│ ├── run_lock.py # 单实例锁
│ └── publish_pipeline.py # 发布编排器
├── skills/ # Claude Code Skills 定义
│ ├── xhs-auth/SKILL.md # 认证管理
│ ├── xhs-publish/SKILL.md # 内容发布
│ ├── xhs-explore/SKILL.md # 内容发现
│ ├── xhs-interact/SKILL.md # 社交互动
│ └── xhs-content-ops/SKILL.md # 复合运营
├── SKILL.md # 统一入口
├── CLAUDE.md # 项目开发指南
├── pyproject.toml # uv 项目配置
└── README.md
```
## 技术架构
### 双层结构
1. **scripts/ — Python CDP 引擎**
- 基于 xiaohongshu-mcp Go 源码从零重写
- 通过 Chrome DevTools Protocol (CDP) 直接控制浏览器
- 数据提取使用 `window.__INITIAL_STATE__` 模式
- 内置反检测保护(stealth flags + JS 注入)
- JSON 结构化输出
2. **skills/ — Claude Code Skills 定义**
- SKILL.md 格式,指导 AI agent 如何调用 scripts/
- 包含输入判断、约束规则、工作流程、失败处理
## 开发
```bash
uv sync # 安装依赖
uv run ruff check . # Lint 检查
uv run ruff format . # 代码格式化
uv run pytest # 运行测试
```
... ...
---
name: xiaohongshu-skills
description: |
小红书自动化技能集合。支持认证登录、内容发布、搜索发现、社交互动、复合运营。
当用户要求操作小红书(发布、搜索、评论、登录、分析、点赞、收藏)时触发。
---
# 小红书自动化 Skills
你是"小红书自动化助手"。根据用户意图路由到对应的子技能完成任务。
## 输入判断
按优先级判断用户意图,路由到对应子技能:
1. **认证相关**("登录 / 检查登录 / 切换账号")→ 执行 `xhs-auth` 技能。
2. **内容发布**("发布 / 发帖 / 上传图文 / 上传视频")→ 执行 `xhs-publish` 技能。
3. **搜索发现**("搜索笔记 / 查看详情 / 浏览首页 / 查看用户")→ 执行 `xhs-explore` 技能。
4. **社交互动**("评论 / 回复 / 点赞 / 收藏")→ 执行 `xhs-interact` 技能。
5. **复合运营**("竞品分析 / 热点追踪 / 批量互动 / 一键创作")→ 执行 `xhs-content-ops` 技能。
## 全局约束
- 所有操作前应确认登录状态(通过 `check-login`)。
- 发布和评论操作必须经过用户确认后才能执行。
- 文件路径必须使用绝对路径。
- CLI 输出为 JSON 格式,结构化呈现给用户。
- 操作频率不宜过高,保持合理间隔。
## 子技能概览
### xhs-auth — 认证管理
管理小红书登录状态和多账号切换。
| 命令 | 功能 |
|------|------|
| `cli.py check-login` | 检查登录状态 |
| `cli.py login` | 获取登录二维码,等待扫码 |
| `cli.py delete-cookies` | 清除 cookies(退出/切换账号) |
### xhs-publish — 内容发布
发布图文或视频内容到小红书。
| 命令 | 功能 |
|------|------|
| `cli.py publish` | 图文发布(本地图片或 URL) |
| `cli.py publish-video` | 视频发布 |
| `publish_pipeline.py` | 发布流水线(含图片下载和登录检查) |
### xhs-explore — 内容发现
搜索笔记、查看详情、获取用户资料。
| 命令 | 功能 |
|------|------|
| `cli.py list-feeds` | 获取首页推荐 Feed |
| `cli.py search-feeds` | 关键词搜索笔记 |
| `cli.py get-feed-detail` | 获取笔记完整内容和评论 |
| `cli.py user-profile` | 获取用户主页信息 |
### xhs-interact — 社交互动
发表评论、回复、点赞、收藏。
| 命令 | 功能 |
|------|------|
| `cli.py post-comment` | 对笔记发表评论 |
| `cli.py reply-comment` | 回复指定评论 |
| `cli.py like-feed` | 点赞 / 取消点赞 |
| `cli.py favorite-feed` | 收藏 / 取消收藏 |
### xhs-content-ops — 复合运营
组合多步骤完成运营工作流:竞品分析、热点追踪、内容创作、互动管理。
## 快速开始
```bash
# 1. 启动 Chrome
python scripts/chrome_launcher.py
# 2. 检查登录状态
python scripts/cli.py check-login
# 3. 登录(如需要)
python scripts/cli.py login
# 4. 搜索笔记
python scripts/cli.py search-feeds --keyword "关键词"
# 5. 查看笔记详情
python scripts/cli.py get-feed-detail \
--feed-id FEED_ID --xsec-token XSEC_TOKEN
# 6. 发布图文
python scripts/cli.py publish \
--title-file title.txt \
--content-file content.txt \
--images "/abs/path/pic1.jpg"
# 7. 发表评论
python scripts/cli.py post-comment \
--feed-id FEED_ID \
--xsec-token XSEC_TOKEN \
--content "评论内容"
# 8. 点赞
python scripts/cli.py like-feed \
--feed-id FEED_ID --xsec-token XSEC_TOKEN
```
## 失败处理
- **未登录**:提示用户执行登录流程(xhs-auth)。
- **Chrome 未启动**:使用 `chrome_launcher.py` 启动浏览器。
- **操作超时**:检查网络连接,适当增加等待时间。
- **频率限制**:降低操作频率,增大间隔。
... ...
[project]
name = "xiaohongshu-skills"
version = "0.1.0"
description = "小红书自动化 Skills,基于 CDP 浏览器自动化"
readme = "README.md"
license = { text = "MIT" }
requires-python = ">=3.11"
dependencies = [
"requests>=2.28.0",
"websockets>=12.0",
]
[project.optional-dependencies]
dev = [
"ruff>=0.9.0",
"pytest>=8.0",
]
[tool.ruff]
target-version = "py311"
line-length = 100
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # pyflakes
"I", # isort
"N", # pep8-naming
"UP", # pyupgrade
"B", # flake8-bugbear
"SIM", # flake8-simplify
"RUF", # ruff-specific rules
]
ignore = [
"E402", # module-level imports not at top (needed for sys.path manipulation)
"RUF001", # ambiguous unicode characters (Chinese punctuation is intentional)
"RUF002", # ambiguous unicode in docstrings (Chinese punctuation is intentional)
"RUF003", # ambiguous unicode in comments (Chinese punctuation is intentional)
]
[tool.ruff.lint.per-file-ignores]
[tool.ruff.lint.isort]
known-first-party = ["xiaohongshu_skills"]
[tool.pytest.ini_options]
testpaths = ["tests"]
... ...
"""多账号管理,对应独立的账号配置管理。"""
from __future__ import annotations
import json
import logging
import os
from pathlib import Path
logger = logging.getLogger(__name__)
# 账号配置文件路径
_CONFIG_DIR = Path.home() / ".xhs"
_ACCOUNTS_FILE = _CONFIG_DIR / "accounts.json"
def _load_config() -> dict:
"""加载账号配置。"""
if not _ACCOUNTS_FILE.exists():
return {"default": "", "accounts": {}}
with open(_ACCOUNTS_FILE, encoding="utf-8") as f:
return json.load(f)
def _save_config(config: dict) -> None:
"""保存账号配置。"""
_CONFIG_DIR.mkdir(parents=True, exist_ok=True)
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
def list_accounts() -> list[dict]:
"""列出所有账号。"""
config = _load_config()
default = config.get("default", "")
accounts = config.get("accounts", {})
result = []
for name, info in accounts.items():
result.append(
{
"name": name,
"description": info.get("description", ""),
"is_default": name == default,
"profile_dir": _get_profile_dir(name),
}
)
return result
def add_account(name: str, description: str = "") -> None:
"""添加账号。"""
config = _load_config()
accounts = config.setdefault("accounts", {})
if name in accounts:
raise ValueError(f"账号 '{name}' 已存在")
accounts[name] = {"description": description}
# 如果是第一个账号,设为默认
if not config.get("default"):
config["default"] = name
_save_config(config)
# 创建 Profile 目录
profile_dir = _get_profile_dir(name)
os.makedirs(profile_dir, exist_ok=True)
logger.info("添加账号: %s", name)
def remove_account(name: str) -> None:
"""删除账号。"""
config = _load_config()
accounts = config.get("accounts", {})
if name not in accounts:
raise ValueError(f"账号 '{name}' 不存在")
del accounts[name]
# 如果删除的是默认账号,清除默认
if config.get("default") == name:
config["default"] = next(iter(accounts), "")
_save_config(config)
logger.info("删除账号: %s", name)
def set_default_account(name: str) -> None:
"""设置默认账号。"""
config = _load_config()
accounts = config.get("accounts", {})
if name not in accounts:
raise ValueError(f"账号 '{name}' 不存在")
config["default"] = name
_save_config(config)
logger.info("默认账号设置为: %s", name)
def get_default_account() -> str:
"""获取默认账号名称。"""
config = _load_config()
return config.get("default", "")
def _get_profile_dir(account: str) -> str:
"""获取账号的 Chrome Profile 目录。"""
return str(_CONFIG_DIR / "accounts" / account / "chrome-profile")
... ...
"""Chrome 进程管理(跨平台),对应 Go browser/browser.go 的进程管理部分。"""
from __future__ import annotations
import logging
import os
import platform
import shutil
import signal
import subprocess
import time
from xhs.stealth import STEALTH_ARGS
logger = logging.getLogger(__name__)
# 默认远程调试端口
DEFAULT_PORT = 9222
# 各平台 Chrome 默认路径
_CHROME_PATHS: dict[str, list[str]] = {
"Darwin": [
"/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
"/Applications/Chromium.app/Contents/MacOS/Chromium",
],
"Linux": [
"/usr/bin/google-chrome",
"/usr/bin/google-chrome-stable",
"/usr/bin/chromium",
"/usr/bin/chromium-browser",
"/snap/bin/chromium",
],
"Windows": [
r"C:\Program Files\Google\Chrome\Application\chrome.exe",
r"C:\Program Files (x86)\Google\Chrome\Application\chrome.exe",
],
}
def find_chrome() -> str | None:
"""查找 Chrome 可执行文件路径。"""
# 环境变量优先
env_path = os.getenv("CHROME_BIN")
if env_path and os.path.isfile(env_path):
return env_path
# which/where 查找
chrome = shutil.which("google-chrome") or shutil.which("chromium")
if chrome:
return chrome
# 平台默认路径
system = platform.system()
for path in _CHROME_PATHS.get(system, []):
if os.path.isfile(path):
return path
return None
def launch_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen:
"""启动 Chrome 进程(带远程调试端口)。
Args:
port: 远程调试端口。
headless: 是否无头模式。
user_data_dir: 用户数据目录(Profile 隔离)。
chrome_bin: Chrome 可执行文件路径。
Returns:
Chrome 子进程。
Raises:
FileNotFoundError: 未找到 Chrome。
"""
if not chrome_bin:
chrome_bin = find_chrome()
if not chrome_bin:
raise FileNotFoundError("未找到 Chrome,请设置 CHROME_BIN 环境变量或安装 Chrome")
args = [
chrome_bin,
f"--remote-debugging-port={port}",
*STEALTH_ARGS,
]
if headless:
args.append("--headless=new")
if user_data_dir:
args.append(f"--user-data-dir={user_data_dir}")
# 代理
proxy = os.getenv("XHS_PROXY")
if proxy:
args.append(f"--proxy-server={proxy}")
logger.info("使用代理: %s", _mask_proxy(proxy))
logger.info("启动 Chrome: port=%d, headless=%s", port, headless)
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
# 等待 Chrome 准备就绪
_wait_for_chrome(port)
return process
def close_chrome(process: subprocess.Popen) -> None:
"""关闭 Chrome 进程。"""
if process.poll() is not None:
return
try:
process.send_signal(signal.SIGTERM)
process.wait(timeout=5)
except (subprocess.TimeoutExpired, OSError):
process.kill()
process.wait(timeout=3)
logger.info("Chrome 进程已关闭")
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行。"""
import requests
try:
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
return resp.status_code == 200
except (requests.ConnectionError, requests.Timeout):
return False
def _wait_for_chrome(port: int, timeout: float = 15.0) -> None:
"""等待 Chrome 调试端口就绪。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if is_chrome_running(port):
logger.info("Chrome 已就绪 (port=%d)", port)
return
time.sleep(0.5)
logger.warning("等待 Chrome 就绪超时 (port=%d)", port)
def _mask_proxy(proxy_url: str) -> str:
"""隐藏代理 URL 中的敏感信息。"""
from urllib.parse import urlparse
try:
parsed = urlparse(proxy_url)
if parsed.username:
return proxy_url.replace(parsed.username, "***").replace(parsed.password or "", "***")
except Exception:
pass
return proxy_url
... ...
"""统一 CLI 入口,对应 Go MCP 工具的 13 个子命令。
全局选项: --host, --port, --account
输出: JSON(ensure_ascii=False)
退出码: 0=成功, 1=未登录, 2=错误
"""
from __future__ import annotations
import argparse
import json
import logging
import sys
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
)
logger = logging.getLogger("xhs-cli")
def _output(data: dict, exit_code: int = 0) -> None:
"""输出 JSON 并退出。"""
print(json.dumps(data, ensure_ascii=False, indent=2))
sys.exit(exit_code)
def _connect(args: argparse.Namespace):
"""连接到 Chrome 并返回 (browser, page)。"""
from xhs.cdp import Browser
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.new_page()
return browser, page
# ========== 子命令实现 ==========
def cmd_check_login(args: argparse.Namespace) -> None:
"""检查登录状态。"""
from xhs.login import check_login_status
browser, page = _connect(args)
try:
logged_in = check_login_status(page)
_output({"logged_in": logged_in}, exit_code=0 if logged_in else 1)
finally:
browser.close_page(page)
browser.close()
def cmd_login(args: argparse.Namespace) -> None:
"""获取登录二维码并等待扫码。"""
from xhs.login import fetch_qrcode, save_qrcode_to_file, wait_for_login
browser, page = _connect(args)
try:
src, already = fetch_qrcode(page)
if already:
_output({"logged_in": True, "message": "已登录"})
else:
# 保存二维码到临时文件
qrcode_path = save_qrcode_to_file(src)
print(
json.dumps(
{
"qrcode_path": qrcode_path,
"message": "请扫码登录,二维码已保存到文件",
},
ensure_ascii=False,
)
)
success = wait_for_login(page, timeout=120)
_output(
{"logged_in": success, "message": "登录成功" if success else "登录超时"},
exit_code=0 if success else 2,
)
finally:
browser.close_page(page)
browser.close()
def cmd_delete_cookies(args: argparse.Namespace) -> None:
"""删除 cookies。"""
from xhs.cookies import delete_cookies, get_cookies_file_path
path = get_cookies_file_path(args.account)
delete_cookies(path)
_output({"success": True, "message": f"已删除 cookies: {path}"})
def cmd_list_feeds(args: argparse.Namespace) -> None:
"""获取首页 Feed 列表。"""
from xhs.feeds import list_feeds
browser, page = _connect(args)
try:
feeds = list_feeds(page)
_output({"feeds": [f.to_dict() for f in feeds], "count": len(feeds)})
finally:
browser.close_page(page)
browser.close()
def cmd_search_feeds(args: argparse.Namespace) -> None:
"""搜索 Feeds。"""
from xhs.search import search_feeds
from xhs.types import FilterOption
filter_opt = FilterOption(
sort_by=args.sort_by or "",
note_type=args.note_type or "",
publish_time=args.publish_time or "",
search_scope=args.search_scope or "",
location=args.location or "",
)
browser, page = _connect(args)
try:
feeds = search_feeds(page, args.keyword, filter_opt)
_output({"feeds": [f.to_dict() for f in feeds], "count": len(feeds)})
finally:
browser.close_page(page)
browser.close()
def cmd_get_feed_detail(args: argparse.Namespace) -> None:
"""获取 Feed 详情。"""
from xhs.feed_detail import get_feed_detail
from xhs.types import CommentLoadConfig
config = CommentLoadConfig(
click_more_replies=args.click_more_replies,
max_replies_threshold=args.max_replies_threshold,
max_comment_items=args.max_comment_items,
scroll_speed=args.scroll_speed,
)
browser, page = _connect(args)
try:
detail = get_feed_detail(
page,
args.feed_id,
args.xsec_token,
load_all_comments=args.load_all_comments,
config=config,
)
_output(detail.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_user_profile(args: argparse.Namespace) -> None:
"""获取用户主页。"""
from xhs.user_profile import get_user_profile
browser, page = _connect(args)
try:
profile = get_user_profile(page, args.user_id, args.xsec_token)
_output(profile.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_post_comment(args: argparse.Namespace) -> None:
"""发表评论。"""
from xhs.comment import post_comment
browser, page = _connect(args)
try:
post_comment(page, args.feed_id, args.xsec_token, args.content)
_output({"success": True, "message": "评论发送成功"})
finally:
browser.close_page(page)
browser.close()
def cmd_reply_comment(args: argparse.Namespace) -> None:
"""回复评论。"""
from xhs.comment import reply_comment
browser, page = _connect(args)
try:
reply_comment(
page,
args.feed_id,
args.xsec_token,
args.content,
comment_id=args.comment_id or "",
user_id=args.user_id or "",
)
_output({"success": True, "message": "回复成功"})
finally:
browser.close_page(page)
browser.close()
def cmd_like_feed(args: argparse.Namespace) -> None:
"""点赞/取消点赞。"""
from xhs.like_favorite import like_feed, unlike_feed
browser, page = _connect(args)
try:
if args.unlike:
result = unlike_feed(page, args.feed_id, args.xsec_token)
else:
result = like_feed(page, args.feed_id, args.xsec_token)
_output(result.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_favorite_feed(args: argparse.Namespace) -> None:
"""收藏/取消收藏。"""
from xhs.like_favorite import favorite_feed, unfavorite_feed
browser, page = _connect(args)
try:
if args.unfavorite:
result = unfavorite_feed(page, args.feed_id, args.xsec_token)
else:
result = favorite_feed(page, args.feed_id, args.xsec_token)
_output(result.to_dict())
finally:
browser.close_page(page)
browser.close()
def cmd_publish(args: argparse.Namespace) -> None:
"""发布图文内容。"""
from image_downloader import process_images
from xhs.publish import publish_image_content
from xhs.types import PublishImageContent
# 读取标题和正文
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
# 处理图片
image_paths = process_images(args.images) if args.images else []
if not image_paths:
_output({"success": False, "error": "没有有效的图片"}, exit_code=2)
browser, page = _connect(args)
try:
publish_image_content(
page,
PublishImageContent(
title=title,
content=content,
tags=args.tags or [],
image_paths=image_paths,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility or "",
),
)
_output({"success": True, "title": title, "images": len(image_paths), "status": "发布完成"})
finally:
browser.close_page(page)
browser.close()
def cmd_publish_video(args: argparse.Namespace) -> None:
"""发布视频内容。"""
from xhs.publish_video import publish_video_content
from xhs.types import PublishVideoContent
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
browser, page = _connect(args)
try:
publish_video_content(
page,
PublishVideoContent(
title=title,
content=content,
tags=args.tags or [],
video_path=args.video,
schedule_time=args.schedule_at,
visibility=args.visibility or "",
),
)
_output({"success": True, "title": title, "video": args.video, "status": "发布完成"})
finally:
browser.close_page(page)
browser.close()
# ========== 参数解析 ==========
def build_parser() -> argparse.ArgumentParser:
"""构建 CLI 参数解析器。"""
parser = argparse.ArgumentParser(
prog="xhs-cli",
description="小红书自动化 CLI",
)
# 全局选项
parser.add_argument("--host", default="127.0.0.1", help="Chrome 调试主机 (default: 127.0.0.1)")
parser.add_argument("--port", type=int, default=9222, help="Chrome 调试端口 (default: 9222)")
parser.add_argument("--account", default="", help="账号名称")
subparsers = parser.add_subparsers(dest="command", required=True)
# check-login
sub = subparsers.add_parser("check-login", help="检查登录状态")
sub.set_defaults(func=cmd_check_login)
# login
sub = subparsers.add_parser("login", help="登录(扫码)")
sub.set_defaults(func=cmd_login)
# delete-cookies
sub = subparsers.add_parser("delete-cookies", help="删除 cookies")
sub.set_defaults(func=cmd_delete_cookies)
# list-feeds
sub = subparsers.add_parser("list-feeds", help="获取首页 Feed 列表")
sub.set_defaults(func=cmd_list_feeds)
# search-feeds
sub = subparsers.add_parser("search-feeds", help="搜索 Feeds")
sub.add_argument("--keyword", required=True, help="搜索关键词")
sub.add_argument("--sort-by", help="排序: 综合|最新|最多点赞|最多评论|最多收藏")
sub.add_argument("--note-type", help="类型: 不限|视频|图文")
sub.add_argument("--publish-time", help="时间: 不限|一天内|一周内|半年内")
sub.add_argument("--search-scope", help="范围: 不限|已看过|未看过|已关注")
sub.add_argument("--location", help="位置: 不限|同城|附近")
sub.set_defaults(func=cmd_search_feeds)
# get-feed-detail
sub = subparsers.add_parser("get-feed-detail", help="获取 Feed 详情")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--load-all-comments", action="store_true", help="加载全部评论")
sub.add_argument("--click-more-replies", action="store_true", help="点击展开更多回复")
sub.add_argument("--max-replies-threshold", type=int, default=10, help="展开回复数阈值")
sub.add_argument("--max-comment-items", type=int, default=0, help="最大评论数 (0=不限)")
sub.add_argument("--scroll-speed", default="normal", help="滚动速度: slow|normal|fast")
sub.set_defaults(func=cmd_get_feed_detail)
# user-profile
sub = subparsers.add_parser("user-profile", help="获取用户主页")
sub.add_argument("--user-id", required=True, help="用户 ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.set_defaults(func=cmd_user_profile)
# post-comment
sub = subparsers.add_parser("post-comment", help="发表评论")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--content", required=True, help="评论内容")
sub.set_defaults(func=cmd_post_comment)
# reply-comment
sub = subparsers.add_parser("reply-comment", help="回复评论")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--content", required=True, help="回复内容")
sub.add_argument("--comment-id", help="目标评论 ID")
sub.add_argument("--user-id", help="目标用户 ID")
sub.set_defaults(func=cmd_reply_comment)
# like-feed
sub = subparsers.add_parser("like-feed", help="点赞")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--unlike", action="store_true", help="取消点赞")
sub.set_defaults(func=cmd_like_feed)
# favorite-feed
sub = subparsers.add_parser("favorite-feed", help="收藏")
sub.add_argument("--feed-id", required=True, help="Feed ID")
sub.add_argument("--xsec-token", required=True, help="xsec_token")
sub.add_argument("--unfavorite", action="store_true", help="取消收藏")
sub.set_defaults(func=cmd_favorite_feed)
# publish
sub = subparsers.add_parser("publish", help="发布图文")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--images", nargs="+", required=True, help="图片路径/URL")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--original", action="store_true", help="声明原创")
sub.add_argument("--visibility", help="可见范围")
sub.set_defaults(func=cmd_publish)
# publish-video
sub = subparsers.add_parser("publish-video", help="发布视频")
sub.add_argument("--title-file", required=True, help="标题文件路径")
sub.add_argument("--content-file", required=True, help="正文文件路径")
sub.add_argument("--video", required=True, help="视频文件路径")
sub.add_argument("--tags", nargs="*", help="标签")
sub.add_argument("--schedule-at", help="定时发布 (ISO8601)")
sub.add_argument("--visibility", help="可见范围")
sub.set_defaults(func=cmd_publish_video)
return parser
def main() -> None:
"""CLI 入口。"""
parser = build_parser()
args = parser.parse_args()
try:
args.func(args)
except Exception as e:
logger.error("执行失败: %s", e, exc_info=True)
_output({"success": False, "error": str(e)}, exit_code=2)
if __name__ == "__main__":
main()
... ...
"""媒体下载(SHA256 缓存),对应 Go pkg/downloader/images.go。"""
from __future__ import annotations
import hashlib
import logging
import os
import time
from urllib.parse import urlparse
import requests
logger = logging.getLogger(__name__)
_USER_AGENT = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
)
# 已知图片扩展名
_IMAGE_EXTENSIONS = {".jpg", ".jpeg", ".png", ".gif", ".webp", ".bmp", ".svg"}
def is_image_url(path: str) -> bool:
"""判断字符串是否为图片/媒体 URL。"""
return path.lower().startswith(("http://", "https://"))
class ImageDownloader:
"""图片下载器(带 SHA256 缓存)。"""
def __init__(self, save_path: str) -> None:
self.save_path = save_path
os.makedirs(save_path, exist_ok=True)
self._session = requests.Session()
self._session.timeout = 30
def download_image(self, image_url: str) -> str:
"""下载单张图片,返回本地文件路径。
如果文件已存在(通过 URL hash 判断),直接返回路径。
Raises:
ValueError: URL 格式无效。
RuntimeError: 下载失败。
"""
if not is_image_url(image_url):
raise ValueError(f"无效的图片 URL: {image_url}")
# 生成文件名
url_hash = hashlib.sha256(image_url.encode()).hexdigest()[:16]
ext = self._detect_extension(image_url)
filename = f"img_{url_hash}_{int(time.time())}{ext}"
filepath = os.path.join(self.save_path, filename)
# 检查是否已有同 hash 的文件
existing = self._find_existing(url_hash)
if existing:
return existing
# 下载
parsed = urlparse(image_url)
headers = {
"User-Agent": _USER_AGENT,
"Referer": f"{parsed.scheme}://{parsed.hostname}/",
}
resp = self._session.get(image_url, headers=headers)
if resp.status_code != 200:
raise RuntimeError(f"下载失败 (status={resp.status_code}): {image_url}")
# 保存
with open(filepath, "wb") as f:
f.write(resp.content)
logger.info("下载完成: %s -> %s", image_url, filepath)
return filepath
def download_images(self, image_urls: list[str]) -> list[str]:
"""批量下载图片。"""
paths = []
for url in image_urls:
try:
path = self.download_image(url)
paths.append(path)
except Exception as e:
logger.error("下载失败 %s: %s", url, e)
return paths
def _detect_extension(self, url: str) -> str:
"""从 URL 推断文件扩展名。"""
parsed = urlparse(url)
path = parsed.path.lower()
for ext in _IMAGE_EXTENSIONS:
if path.endswith(ext):
return ext
return ".jpg" # 默认
def _find_existing(self, url_hash: str) -> str | None:
"""查找已有同 hash 的文件。"""
prefix = f"img_{url_hash}_"
for filename in os.listdir(self.save_path):
if filename.startswith(prefix):
return os.path.join(self.save_path, filename)
return None
def process_images(images: list[str], save_dir: str | None = None) -> list[str]:
"""处理图片列表(URL 下载,本地路径直接返回)。"""
if not save_dir:
save_dir = os.path.join(os.path.expanduser("~"), ".xhs", "images")
downloader = ImageDownloader(save_dir)
result = []
for img in images:
if is_image_url(img):
path = downloader.download_image(img)
result.append(path)
else:
# 本地路径
if os.path.exists(img):
result.append(os.path.abspath(img))
else:
logger.warning("文件不存在: %s", img)
return result
... ...
"""发布编排器:下载 → 登录检查 → 发布 → 报告。"""
from __future__ import annotations
import json
import logging
import sys
from image_downloader import process_images
from title_utils import calc_title_length
from xhs.cdp import Browser
from xhs.login import check_login_status
from xhs.publish import publish_image_content
from xhs.publish_video import publish_video_content
from xhs.types import PublishImageContent, PublishVideoContent
logger = logging.getLogger(__name__)
def run_publish_pipeline(
title: str,
content: str,
images: list[str] | None = None,
video: str | None = None,
tags: list[str] | None = None,
schedule_time: str | None = None,
is_original: bool = False,
visibility: str = "",
host: str = "127.0.0.1",
port: int = 9222,
account: str = "",
) -> dict:
"""执行完整发布流水线。
Returns:
发布结果字典。
"""
# 标题长度校验
title_len = calc_title_length(title)
if title_len > 20:
return {"success": False, "error": f"标题长度超限: {title_len}/20"}
# 处理图片(下载 URL / 验证本地路径)
local_images: list[str] = []
if images:
local_images = process_images(images)
if not local_images:
return {"success": False, "error": "没有有效的图片"}
# 连接浏览器
browser = Browser(host=host, port=port)
browser.connect()
try:
page = browser.new_page()
try:
# 登录检查
if not check_login_status(page):
return {"success": False, "error": "未登录", "exit_code": 1}
# 发布
if video:
publish_video_content(
page,
PublishVideoContent(
title=title,
content=content,
tags=tags or [],
video_path=video,
schedule_time=schedule_time,
visibility=visibility,
),
)
else:
publish_image_content(
page,
PublishImageContent(
title=title,
content=content,
tags=tags or [],
image_paths=local_images,
schedule_time=schedule_time,
is_original=is_original,
visibility=visibility,
),
)
return {
"success": True,
"title": title,
"content_length": len(content),
"images": len(local_images),
"video": video or "",
"status": "发布完成",
}
finally:
browser.close_page(page)
finally:
browser.close()
def main() -> None:
"""CLI 入口(被 cli.py 的 publish/publish-video 子命令调用时使用)。"""
import argparse
parser = argparse.ArgumentParser(description="小红书发布流水线")
parser.add_argument("--title-file", required=True, help="标题文件路径")
parser.add_argument("--content-file", required=True, help="正文文件路径")
parser.add_argument("--images", nargs="*", help="图片路径或 URL 列表")
parser.add_argument("--video", help="视频文件路径")
parser.add_argument("--tags", nargs="*", help="标签列表")
parser.add_argument("--schedule-at", help="定时发布时间 (ISO8601)")
parser.add_argument("--original", action="store_true", help="声明原创")
parser.add_argument("--visibility", default="", help="可见范围")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=9222)
parser.add_argument("--account", default="")
args = parser.parse_args()
# 读取标题和正文
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
result = run_publish_pipeline(
title=title,
content=content,
images=args.images,
video=args.video,
tags=args.tags,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility,
host=args.host,
port=args.port,
account=args.account,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["success"] else 2)
if __name__ == "__main__":
main()
... ...
"""单实例锁,防止多个进程同时操作浏览器。"""
from __future__ import annotations
import contextlib
import logging
import os
import time
logger = logging.getLogger(__name__)
_DEFAULT_LOCK_FILE = os.path.join(os.path.expanduser("~"), ".xhs", "run.lock")
class RunLock:
"""文件锁,确保同一时间只有一个进程在操作。"""
def __init__(self, lock_file: str = _DEFAULT_LOCK_FILE) -> None:
self.lock_file = lock_file
self._fd: int | None = None
def acquire(self, timeout: float = 30.0) -> bool:
"""获取锁。
Args:
timeout: 超时时间(秒)。
Returns:
True 获取成功,False 超时。
"""
os.makedirs(os.path.dirname(self.lock_file), exist_ok=True)
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
self._fd = os.open(
self.lock_file,
os.O_CREAT | os.O_EXCL | os.O_WRONLY,
)
# 写入 PID
os.write(self._fd, str(os.getpid()).encode())
logger.debug("获取锁成功: %s", self.lock_file)
return True
except FileExistsError:
# 检查持有者是否还活着
if self._is_stale():
self._force_release()
continue
time.sleep(1)
logger.warning("获取锁超时: %s", self.lock_file)
return False
def release(self) -> None:
"""释放锁。"""
if self._fd is not None:
with contextlib.suppress(OSError):
os.close(self._fd)
self._fd = None
with contextlib.suppress(FileNotFoundError):
os.remove(self.lock_file)
logger.debug("释放锁: %s", self.lock_file)
def _is_stale(self) -> bool:
"""检查锁文件是否已过时(持有进程已退出)。"""
try:
with open(self.lock_file) as f:
pid = int(f.read().strip())
# 检查进程是否存在
os.kill(pid, 0)
return False
except (FileNotFoundError, ValueError, ProcessLookupError, PermissionError):
return True
def _force_release(self) -> None:
"""强制释放过时的锁。"""
with contextlib.suppress(FileNotFoundError):
os.remove(self.lock_file)
logger.info("强制释放过时锁: %s", self.lock_file)
def __enter__(self) -> RunLock:
if not self.acquire():
raise TimeoutError(f"无法获取锁: {self.lock_file}")
return self
def __exit__(self, *args: object) -> None:
self.release()
... ...
"""UTF-16 标题长度计算,对应 Go pkg/xhsutil/title.go。"""
def calc_title_length(s: str) -> int:
"""计算小红书标题长度。
规则:非 ASCII 字符(中文、全角符号等)算 2 字节,
ASCII 字符算 1 字节,最终结果向上取整除以 2。
Examples:
>>> calc_title_length("你好世界")
4
>>> calc_title_length("hello")
3
>>> calc_title_length("OOTD穿搭分享")
6
"""
byte_len = 0
# 用 UTF-16 编码来处理(包括 surrogate pairs)
encoded = s.encode("utf-16-le")
for i in range(0, len(encoded), 2):
code_unit = int.from_bytes(encoded[i : i + 2], "little")
if code_unit > 127:
byte_len += 2
else:
byte_len += 1
return (byte_len + 1) // 2
... ...
"""小红书 CDP 自动化核心包。"""
... ...
"""CDP WebSocket 客户端(Browser, Page, Element),对应 Go browser/browser.go + go-rod API。
通过原生 WebSocket 与 Chrome DevTools Protocol 通信,实现浏览器自动化控制。
"""
from __future__ import annotations
import json
import logging
import time
from typing import Any
import requests
import websockets.sync.client as ws_client
from .errors import CDPError, ElementNotFoundError
from .stealth import STEALTH_JS
logger = logging.getLogger(__name__)
class CDPClient:
"""底层 CDP WebSocket 通信客户端。"""
def __init__(self, ws_url: str) -> None:
self._ws = ws_client.connect(ws_url, max_size=50 * 1024 * 1024)
self._id = 0
self._callbacks: dict[int, Any] = {}
def send(self, method: str, params: dict | None = None) -> dict:
"""发送 CDP 命令并等待结果。"""
self._id += 1
msg: dict[str, Any] = {"id": self._id, "method": method}
if params:
msg["params"] = params
self._ws.send(json.dumps(msg))
return self._wait_for(self._id)
def _wait_for(self, msg_id: int, timeout: float = 30.0) -> dict:
"""等待指定 id 的响应。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
raw = self._ws.recv(timeout=max(0.1, deadline - time.monotonic()))
except TimeoutError:
break
data = json.loads(raw)
if data.get("id") == msg_id:
if "error" in data:
raise CDPError(f"CDP 错误: {data['error']}")
return data.get("result", {})
raise CDPError(f"等待 CDP 响应超时 (id={msg_id})")
def close(self) -> None:
import contextlib
with contextlib.suppress(Exception):
self._ws.close()
class Page:
"""CDP 页面对象,封装常用操作。"""
def __init__(self, cdp: CDPClient, target_id: str, session_id: str) -> None:
self._cdp = cdp
self.target_id = target_id
self.session_id = session_id
self._ws = cdp._ws
self._id_counter = 1000
def _send_session(self, method: str, params: dict | None = None) -> dict:
"""向 session 发送命令。"""
self._id_counter += 1
msg: dict[str, Any] = {
"id": self._id_counter,
"method": method,
"sessionId": self.session_id,
}
if params:
msg["params"] = params
self._ws.send(json.dumps(msg))
return self._wait_session(self._id_counter)
def _wait_session(self, msg_id: int, timeout: float = 60.0) -> dict:
"""等待 session 响应。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
raw = self._ws.recv(timeout=max(0.1, deadline - time.monotonic()))
except TimeoutError:
break
data = json.loads(raw)
if data.get("id") == msg_id:
if "error" in data:
raise CDPError(f"CDP 错误: {data['error']}")
return data.get("result", {})
raise CDPError(f"等待 session 响应超时 (id={msg_id})")
def navigate(self, url: str) -> None:
"""导航到指定 URL。"""
logger.info("导航到: %s", url)
self._send_session("Page.navigate", {"url": url})
def wait_for_load(self, timeout: float = 60.0) -> None:
"""等待页面加载完成(通过轮询 document.readyState)。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
state = self.evaluate("document.readyState")
if state == "complete":
return
except CDPError:
pass
time.sleep(0.5)
logger.warning("等待页面加载超时")
def wait_dom_stable(self, timeout: float = 10.0, interval: float = 0.5) -> None:
"""等待 DOM 稳定(连续两次 DOM 快照一致)。"""
last_html = ""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
try:
html = self.evaluate("document.body ? document.body.innerHTML.length : 0")
if html == last_html and html != "":
return
last_html = html
except CDPError:
pass
time.sleep(interval)
def evaluate(self, expression: str, timeout: float = 30.0) -> Any:
"""执行 JavaScript 表达式并返回结果。"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": expression,
"returnByValue": True,
"awaitPromise": False,
},
)
if "exceptionDetails" in result:
raise CDPError(f"JS 执行异常: {result['exceptionDetails']}")
remote_obj = result.get("result", {})
return remote_obj.get("value")
def evaluate_function(self, function_body: str, *args: Any) -> Any:
"""执行 JavaScript 函数并返回结果。
function_body 是一个完整的函数体,如 `() => { return 1; }`
"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": f"({function_body})()",
"returnByValue": True,
"awaitPromise": False,
},
)
if "exceptionDetails" in result:
raise CDPError(f"JS 函数执行异常: {result['exceptionDetails']}")
remote_obj = result.get("result", {})
return remote_obj.get("value")
def query_selector(self, selector: str) -> str | None:
"""查找单个元素,返回 objectId 或 None。"""
result = self._send_session(
"Runtime.evaluate",
{
"expression": f"document.querySelector({json.dumps(selector)})",
"returnByValue": False,
},
)
remote_obj = result.get("result", {})
if remote_obj.get("subtype") == "null" or remote_obj.get("type") == "undefined":
return None
return remote_obj.get("objectId")
def query_selector_all(self, selector: str) -> list[str]:
"""查找多个元素,返回 objectId 列表。"""
# 通过 JS 返回元素数量,然后逐个获取
count = self.evaluate(f"document.querySelectorAll({json.dumps(selector)}).length")
if not count:
return []
object_ids = []
for i in range(count):
result = self._send_session(
"Runtime.evaluate",
{
"expression": (f"document.querySelectorAll({json.dumps(selector)})[{i}]"),
"returnByValue": False,
},
)
obj = result.get("result", {})
oid = obj.get("objectId")
if oid:
object_ids.append(oid)
return object_ids
def has_element(self, selector: str) -> bool:
"""检查元素是否存在。"""
return self.evaluate(f"document.querySelector({json.dumps(selector)}) !== null") is True
def wait_for_element(self, selector: str, timeout: float = 30.0) -> str:
"""等待元素出现,返回 objectId。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
oid = self.query_selector(selector)
if oid:
return oid
time.sleep(0.5)
raise ElementNotFoundError(selector)
def click_element(self, selector: str) -> None:
"""点击指定选择器的元素。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.click();
}})()
"""
)
def input_text(self, selector: str, text: str) -> None:
"""向指定选择器的元素输入文本。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.value = {json.dumps(text)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
el.dispatchEvent(new Event('change', {{bubbles: true}}));
}})()
"""
)
def input_content_editable(self, selector: str, text: str) -> None:
"""向 contentEditable 元素输入文本(如 div.ql-editor)。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.textContent = {json.dumps(text)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}})()
"""
)
def get_element_text(self, selector: str) -> str | None:
"""获取元素文本内容。"""
return self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
return el ? el.textContent : null;
}})()
"""
)
def get_element_attribute(self, selector: str, attr: str) -> str | None:
"""获取元素属性值。"""
return self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
return el ? el.getAttribute({json.dumps(attr)}) : null;
}})()
"""
)
def get_elements_count(self, selector: str) -> int:
"""获取匹配元素数量。"""
result = self.evaluate(f"document.querySelectorAll({json.dumps(selector)}).length")
return result if isinstance(result, int) else 0
def scroll_by(self, x: int, y: int) -> None:
"""滚动页面。"""
self.evaluate(f"window.scrollBy({x}, {y})")
def scroll_to(self, x: int, y: int) -> None:
"""滚动到指定位置。"""
self.evaluate(f"window.scrollTo({x}, {y})")
def scroll_to_bottom(self) -> None:
"""滚动到页面底部。"""
self.evaluate("window.scrollTo(0, document.body.scrollHeight)")
def scroll_element_into_view(self, selector: str) -> None:
"""将元素滚动到可视区域。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.scrollIntoView({{behavior: 'smooth', block: 'center'}});
}})()
"""
)
def scroll_nth_element_into_view(self, selector: str, index: int) -> None:
"""将第 N 个匹配元素滚动到可视区域。"""
self.evaluate(
f"""
(() => {{
const els = document.querySelectorAll({json.dumps(selector)});
if (els[{index}]) els[{index}].scrollIntoView(
{{behavior: 'smooth', block: 'center'}}
);
}})()
"""
)
def get_scroll_top(self) -> int:
"""获取当前滚动位置。"""
result = self.evaluate(
"window.pageYOffset || document.documentElement.scrollTop"
" || document.body.scrollTop || 0"
)
return int(result) if result else 0
def get_viewport_height(self) -> int:
"""获取视口高度。"""
result = self.evaluate("window.innerHeight")
return int(result) if result else 768
def set_file_input(self, selector: str, files: list[str]) -> None:
"""设置文件输入框的文件(通过 CDP DOM.setFileInputFiles)。"""
# 先获取 nodeId
doc = self._send_session("DOM.getDocument", {"depth": 0})
root_node_id = doc["root"]["nodeId"]
result = self._send_session(
"DOM.querySelector",
{"nodeId": root_node_id, "selector": selector},
)
node_id = result.get("nodeId", 0)
if node_id == 0:
raise ElementNotFoundError(selector)
self._send_session(
"DOM.setFileInputFiles",
{"nodeId": node_id, "files": files},
)
def dispatch_wheel_event(self, delta_y: float) -> None:
"""触发滚轮事件以激活懒加载。"""
self.evaluate(
f"""
(() => {{
let target = document.querySelector('.note-scroller')
|| document.querySelector('.interaction-container')
|| document.documentElement;
const event = new WheelEvent('wheel', {{
deltaY: {delta_y},
deltaMode: 0,
bubbles: true,
cancelable: true,
view: window,
}});
target.dispatchEvent(event);
}})()
"""
)
def mouse_move(self, x: float, y: float) -> None:
"""移动鼠标。"""
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mouseMoved", "x": x, "y": y},
)
def mouse_click(self, x: float, y: float, button: str = "left") -> None:
"""在指定坐标点击。"""
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mousePressed", "x": x, "y": y, "button": button, "clickCount": 1},
)
self._send_session(
"Input.dispatchMouseEvent",
{"type": "mouseReleased", "x": x, "y": y, "button": button, "clickCount": 1},
)
def type_text(self, text: str, delay_ms: int = 50) -> None:
"""逐字符输入文本。"""
for char in text:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "text": char},
)
if delay_ms > 0:
time.sleep(delay_ms / 1000.0)
def press_key(self, key: str) -> None:
"""按下并释放指定键。"""
key_map = {
"Enter": {"key": "Enter", "code": "Enter", "windowsVirtualKeyCode": 13},
"ArrowDown": {
"key": "ArrowDown",
"code": "ArrowDown",
"windowsVirtualKeyCode": 40,
},
"Tab": {"key": "Tab", "code": "Tab", "windowsVirtualKeyCode": 9},
}
info = key_map.get(key, {"key": key, "code": key})
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", **info},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", **info},
)
def inject_stealth(self) -> None:
"""注入反检测脚本。"""
self._send_session(
"Page.addScriptToEvaluateOnNewDocument",
{"source": STEALTH_JS},
)
def remove_element(self, selector: str) -> None:
"""移除 DOM 元素。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.remove();
}})()
"""
)
def hover_element(self, selector: str) -> None:
"""悬停到元素中心。"""
box = self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return null;
const rect = el.getBoundingClientRect();
return {{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}};
}})()
"""
)
if box:
self.mouse_move(box["x"], box["y"])
def select_all_text(self, selector: str) -> None:
"""选中输入框内所有文本。"""
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.select ? el.select() : document.execCommand('selectAll');
}})()
"""
)
class Browser:
"""Chrome 浏览器 CDP 控制器。"""
def __init__(self, host: str = "127.0.0.1", port: int = 9222) -> None:
self.host = host
self.port = port
self.base_url = f"http://{host}:{port}"
self._cdp: CDPClient | None = None
def connect(self) -> None:
"""连接到 Chrome DevTools。"""
resp = requests.get(f"{self.base_url}/json/version", timeout=5)
resp.raise_for_status()
info = resp.json()
ws_url = info["webSocketDebuggerUrl"]
logger.info("连接到 Chrome: %s", ws_url)
self._cdp = CDPClient(ws_url)
def new_page(self, url: str = "about:blank") -> Page:
"""创建新页面。"""
if not self._cdp:
self.connect()
assert self._cdp is not None
# 创建 target
result = self._cdp.send("Target.createTarget", {"url": url})
target_id = result["targetId"]
# 附加到 target
result = self._cdp.send(
"Target.attachToTarget",
{"targetId": target_id, "flatten": True},
)
session_id = result["sessionId"]
page = Page(self._cdp, target_id, session_id)
# 启用必要的 domain
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
# 注入反检测
page.inject_stealth()
return page
def get_existing_page(self) -> Page | None:
"""获取已有页面(取第一个非 about:blank 的 page target)。"""
if not self._cdp:
self.connect()
assert self._cdp is not None
resp = requests.get(f"{self.base_url}/json", timeout=5)
targets = resp.json()
for target in targets:
if target.get("type") == "page" and target.get("url") != "about:blank":
target_id = target["id"]
result = self._cdp.send(
"Target.attachToTarget",
{"targetId": target_id, "flatten": True},
)
session_id = result["sessionId"]
page = Page(self._cdp, target_id, session_id)
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
page.inject_stealth()
return page
return None
def close_page(self, page: Page) -> None:
"""关闭页面。"""
import contextlib
if self._cdp:
with contextlib.suppress(CDPError):
self._cdp.send("Target.closeTarget", {"targetId": page.target_id})
def close(self) -> None:
"""关闭连接。"""
if self._cdp:
self._cdp.close()
self._cdp = None
... ...
"""评论操作,对应 Go xiaohongshu/comment_feed.go。"""
from __future__ import annotations
import logging
import time
from .cdp import Page
from .feed_detail import _check_end_container, _check_page_accessible, _get_comment_count
from .selectors import (
COMMENT_INPUT_FIELD,
COMMENT_INPUT_TRIGGER,
COMMENT_SUBMIT_BUTTON,
PARENT_COMMENT,
REPLY_BUTTON,
)
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> None:
"""发表评论到 Feed。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
content: 评论内容。
Raises:
RuntimeError: 评论失败。
"""
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页: %s", url)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
_check_page_accessible(page)
# 点击评论输入触发区域
if not page.has_element(COMMENT_INPUT_TRIGGER):
raise RuntimeError("未找到评论输入框,该帖子可能不支持评论或网页端不可访问")
page.click_element(COMMENT_INPUT_TRIGGER)
time.sleep(0.5)
# 输入评论内容
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(1)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(1)
logger.info("评论发送成功: feed=%s", feed_id)
def reply_comment(
page: Page,
feed_id: str,
xsec_token: str,
content: str,
comment_id: str = "",
user_id: str = "",
) -> None:
"""回复指定评论。
通过 comment_id 或 user_id 定位评论,然后回复。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
content: 回复内容。
comment_id: 评论 ID(优先使用)。
user_id: 用户 ID(备选)。
Raises:
RuntimeError: 回复失败。
"""
if not comment_id and not user_id:
raise ValueError("comment_id 和 user_id 至少提供一个")
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页进行回复: %s", url)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
_check_page_accessible(page)
time.sleep(2)
# 查找目标评论
comment_found = _find_and_scroll_to_comment(page, comment_id, user_id)
if not comment_found:
raise RuntimeError(f"未找到评论 (commentID: {comment_id}, userID: {user_id})")
time.sleep(1)
# 点击回复按钮
reply_selector = f"#comment-{comment_id} {REPLY_BUTTON}" if comment_id else REPLY_BUTTON
page.click_element(reply_selector)
time.sleep(1)
# 输入回复内容
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(0.5)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(2)
logger.info("回复评论成功")
def _find_and_scroll_to_comment(
page: Page,
comment_id: str,
user_id: str,
max_attempts: int = 100,
) -> bool:
"""查找并滚动到目标评论。"""
logger.info("开始查找评论 - commentID: %s, userID: %s", comment_id, user_id)
# 先滚动到评论区
page.scroll_element_into_view(".comments-container")
time.sleep(1)
last_count = 0
stagnant = 0
for attempt in range(max_attempts):
# 检查是否到底
if _check_end_container(page):
logger.info("已到达评论底部,未找到目标评论")
break
# 停滞检测
current_count = _get_comment_count(page)
if current_count != last_count:
last_count = current_count
stagnant = 0
else:
stagnant += 1
if stagnant >= 10:
logger.info("评论数量停滞超过10次")
break
# 滚动到最后一条评论
if current_count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, current_count - 1)
time.sleep(0.3)
# 继续滚动
page.evaluate("window.scrollBy(0, window.innerHeight * 0.8)")
time.sleep(0.5)
# 通过 commentID 查找
if comment_id:
selector = f"#comment-{comment_id}"
if page.has_element(selector):
logger.info("通过 commentID 找到评论 (尝试 %d 次)", attempt + 1)
page.scroll_element_into_view(selector)
return True
# 通过 userID 查找
if user_id:
found = page.evaluate(
f"""
(() => {{
const els = document.querySelectorAll(
'.parent-comment, .comment-item, .comment'
);
for (const el of els) {{
if (el.querySelector('[data-user-id="{user_id}"]')) {{
el.scrollIntoView({{behavior: 'smooth', block: 'center'}});
return true;
}}
}}
return false;
}})()
"""
)
if found:
logger.info("通过 userID 找到评论 (尝试 %d 次)", attempt + 1)
return True
time.sleep(0.8)
return False
def _js_str(s: str) -> str:
"""将 Python 字符串转为 JS 字面量(含引号)。"""
import json
return json.dumps(s)
... ...
"""Cookie 文件持久化,对应 Go cookies/cookies.go。"""
from __future__ import annotations
import os
from pathlib import Path
def get_cookies_file_path(account: str = "") -> str:
"""获取 cookies 文件路径。
优先级:
1. /tmp/cookies.json(向后兼容)
2. COOKIES_PATH 环境变量
3. 多账号模式:~/.xhs/accounts/{account}/cookies.json
4. ./cookies.json(本地调试)
"""
if account:
account_dir = Path.home() / ".xhs" / "accounts" / account
account_dir.mkdir(parents=True, exist_ok=True)
return str(account_dir / "cookies.json")
# 旧路径
import tempfile
old_path = os.path.join(tempfile.gettempdir(), "cookies.json")
if os.path.exists(old_path):
return old_path
# 环境变量
env_path = os.getenv("COOKIES_PATH")
if env_path:
return env_path
return "cookies.json"
def load_cookies(path: str) -> bytes | None:
"""从文件加载 cookies。"""
try:
with open(path, "rb") as f:
return f.read()
except FileNotFoundError:
return None
def save_cookies(path: str, data: bytes) -> None:
"""保存 cookies 到文件。"""
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with open(path, "wb") as f:
f.write(data)
def delete_cookies(path: str) -> None:
"""删除 cookies 文件。"""
import contextlib
with contextlib.suppress(FileNotFoundError):
os.remove(path)
... ...
"""小红书自动化异常体系。"""
class XHSError(Exception):
"""小红书自动化基础异常。"""
class NoFeedsError(XHSError):
"""没有捕获到 feeds 数据。"""
def __init__(self) -> None:
super().__init__("没有捕获到 feeds 数据")
class NoFeedDetailError(XHSError):
"""没有捕获到 feed 详情数据。"""
def __init__(self) -> None:
super().__init__("没有捕获到 feed 详情数据")
class NotLoggedInError(XHSError):
"""未登录。"""
def __init__(self) -> None:
super().__init__("未登录,请先扫码登录")
class PageNotAccessibleError(XHSError):
"""页面不可访问。"""
def __init__(self, reason: str) -> None:
self.reason = reason
super().__init__(f"笔记不可访问: {reason}")
class UploadTimeoutError(XHSError):
"""上传超时。"""
class PublishError(XHSError):
"""发布失败。"""
class TitleTooLongError(PublishError):
"""标题超过长度限制。"""
def __init__(self, current: str, maximum: str) -> None:
self.current = current
self.maximum = maximum
super().__init__(f"当前输入长度为{current},最大长度为{maximum}")
class ContentTooLongError(PublishError):
"""正文超过长度限制。"""
def __init__(self, current: str, maximum: str) -> None:
self.current = current
self.maximum = maximum
super().__init__(f"当前输入长度为{current},最大长度为{maximum}")
class CDPError(XHSError):
"""CDP 通信异常。"""
class ElementNotFoundError(XHSError):
"""页面元素未找到。"""
def __init__(self, selector: str) -> None:
self.selector = selector
super().__init__(f"未找到元素: {selector}")
... ...
"""Feed 详情 + 评论加载,对应 Go xiaohongshu/feed_detail.go(867 行)。"""
from __future__ import annotations
import json
import logging
import random
import re
import time
from .cdp import Page
from .errors import NoFeedDetailError, PageNotAccessibleError
from .human import (
BUTTON_CLICK_INTERVAL,
DEFAULT_MAX_ATTEMPTS,
FINAL_SPRINT_PUSH_COUNT,
HUMAN_DELAY,
LARGE_SCROLL_TRIGGER,
MAX_CLICK_PER_ROUND,
MIN_SCROLL_DELTA,
POST_SCROLL,
REACTION_TIME,
READ_TIME,
SCROLL_WAIT,
SHORT_READ,
STAGNANT_LIMIT,
calculate_scroll_delta,
get_scroll_interval,
get_scroll_ratio,
sleep_random,
)
from .selectors import (
ACCESS_ERROR_WRAPPER,
END_CONTAINER,
NO_COMMENTS_TEXT,
PARENT_COMMENT,
SHOW_MORE_BUTTON,
)
from .types import (
CommentList,
CommentLoadConfig,
FeedDetail,
FeedDetailResponse,
)
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
# 页面不可访问关键词
_INACCESSIBLE_KEYWORDS = [
"当前笔记暂时无法浏览",
"该内容因违规已被删除",
"该笔记已被删除",
"内容不存在",
"笔记不存在",
"已失效",
"私密笔记",
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
]
_REPLY_COUNT_RE = re.compile(r"展开\s*(\d+)\s*条回复")
_TOTAL_COMMENT_RE = re.compile(r"共(\d+)条评论")
def get_feed_detail(
page: Page,
feed_id: str,
xsec_token: str,
load_all_comments: bool = False,
config: CommentLoadConfig | None = None,
) -> FeedDetailResponse:
"""获取 Feed 详情(含评论)。
Args:
page: CDP 页面对象。
feed_id: Feed ID。
xsec_token: xsec_token。
load_all_comments: 是否加载全部评论。
config: 评论加载配置。
Raises:
PageNotAccessibleError: 页面不可访问。
NoFeedDetailError: 未获取到详情数据。
"""
if config is None:
config = CommentLoadConfig()
url = make_feed_detail_url(feed_id, xsec_token)
logger.info("打开 feed 详情页: %s", url)
logger.info(
"配置: 点击更多=%s, 回复阈值=%d, 最大评论数=%d, 滚动速度=%s",
config.click_more_replies,
config.max_replies_threshold,
config.max_comment_items,
config.scroll_speed,
)
# 导航(含重试)
for attempt in range(3):
try:
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
break
except Exception as e:
logger.debug("页面导航重试 #%d: %s", attempt, e)
time.sleep(0.5 + random.random())
else:
raise RuntimeError("页面导航失败")
sleep_random(1000, 1000)
# 检查页面可访问性
_check_page_accessible(page)
# 加载全部评论
if load_all_comments:
try:
_load_all_comments(page, config)
except Exception as e:
logger.warning("加载全部评论失败: %s", e)
return _extract_feed_detail(page, feed_id)
# ========== 页面检查 ==========
def _check_page_accessible(page: Page) -> None:
"""检查页面是否可访问。"""
time.sleep(0.5)
text = page.get_element_text(ACCESS_ERROR_WRAPPER)
if not text:
return
text = text.strip()
for kw in _INACCESSIBLE_KEYWORDS:
if kw in text:
raise PageNotAccessibleError(kw)
if text:
raise PageNotAccessibleError(text)
# ========== 数据提取 ==========
_EXTRACT_DETAIL_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.note &&
window.__INITIAL_STATE__.note.noteDetailMap) {
return JSON.stringify(window.__INITIAL_STATE__.note.noteDetailMap);
}
return "";
})()
"""
def _extract_feed_detail(page: Page, feed_id: str) -> FeedDetailResponse:
"""从 __INITIAL_STATE__ 提取 Feed 详情。"""
result = None
for _ in range(3):
result = page.evaluate(_EXTRACT_DETAIL_JS)
if result:
break
time.sleep(0.2)
if not result:
raise NoFeedDetailError()
note_detail_map = json.loads(result)
note_data = note_detail_map.get(feed_id)
if not note_data:
raise NoFeedDetailError()
return FeedDetailResponse(
note=FeedDetail.from_dict(note_data.get("note", {})),
comments=CommentList.from_dict(note_data.get("comments", {})),
)
# ========== 评论加载状态机 ==========
def _load_all_comments(page: Page, config: CommentLoadConfig) -> None:
"""加载全部评论的状态机。"""
max_attempts = (
config.max_comment_items * 3 if config.max_comment_items > 0 else DEFAULT_MAX_ATTEMPTS
)
scroll_interval = get_scroll_interval(config.scroll_speed)
logger.info("开始加载评论...")
_scroll_to_comments_area(page)
sleep_random(*HUMAN_DELAY)
# 检查是否无评论
if _check_no_comments(page):
logger.info("检测到无评论区域,跳过加载")
return
# 状态
last_count = 0
last_scroll_top = 0
stagnant_checks = 0
total_clicked = 0
total_skipped = 0
for attempt in range(max_attempts):
logger.debug("=== 尝试 %d/%d ===", attempt + 1, max_attempts)
# 检查是否到达底部
if _check_end_container(page):
count = _get_comment_count(page)
logger.info(
"检测到 THE END,加载完成: %d 条评论, 点击: %d, 跳过: %d",
count,
total_clicked,
total_skipped,
)
return
# 定期点击展开按钮
if config.click_more_replies and attempt % BUTTON_CLICK_INTERVAL == 0:
clicked, skipped = _click_show_more_buttons(page, config.max_replies_threshold)
total_clicked += clicked
total_skipped += skipped
if clicked > 0 or skipped > 0:
sleep_random(*READ_TIME)
# 第二轮
c2, s2 = _click_show_more_buttons(page, config.max_replies_threshold)
total_clicked += c2
total_skipped += s2
if c2 > 0 or s2 > 0:
sleep_random(*SHORT_READ)
# 获取当前评论数
current_count = _get_comment_count(page)
if current_count != last_count:
logger.info("评论增加: %d -> %d", last_count, current_count)
last_count = current_count
stagnant_checks = 0
else:
stagnant_checks += 1
# 检查是否达到目标
if config.max_comment_items > 0 and current_count >= config.max_comment_items:
logger.info("已达到目标评论数: %d/%d", current_count, config.max_comment_items)
return
# 滚动
if current_count > 0:
_scroll_to_last_comment(page)
sleep_random(*POST_SCROLL)
large_mode = stagnant_checks >= LARGE_SCROLL_TRIGGER
push_count = 1
if large_mode:
push_count = 3 + random.randint(0, 2)
scroll_delta, current_scroll_top = _human_scroll(
page, config.scroll_speed, large_mode, push_count
)
if scroll_delta < MIN_SCROLL_DELTA or current_scroll_top == last_scroll_top:
stagnant_checks += 1
else:
stagnant_checks = 0
last_scroll_top = current_scroll_top
# 停滞处理
if stagnant_checks >= STAGNANT_LIMIT:
logger.info("停滞过多,尝试大冲刺...")
_human_scroll(page, config.scroll_speed, True, 10)
stagnant_checks = 0
time.sleep(scroll_interval)
# 最终冲刺
logger.info("达到最大尝试次数,最后冲刺...")
_human_scroll(page, config.scroll_speed, True, FINAL_SPRINT_PUSH_COUNT)
count = _get_comment_count(page)
logger.info("加载结束: %d 条评论, 点击: %d, 跳过: %d", count, total_clicked, total_skipped)
# ========== 滚动 ==========
def _human_scroll(
page: Page,
speed: str,
large_mode: bool,
push_count: int,
) -> tuple[int, int]:
"""人类化滚动。
Returns:
(actual_delta, current_scroll_top)
"""
before_top = page.get_scroll_top()
viewport_height = page.get_viewport_height()
base_ratio = get_scroll_ratio(speed)
if large_mode:
base_ratio *= 2.0
actual_delta = 0
current_scroll_top = before_top
for i in range(max(1, push_count)):
scroll_delta = calculate_scroll_delta(viewport_height, base_ratio)
page.scroll_by(0, int(scroll_delta))
sleep_random(*SCROLL_WAIT)
current_scroll_top = page.get_scroll_top()
delta_this = current_scroll_top - before_top
actual_delta += delta_this
before_top = current_scroll_top
if i < push_count - 1:
sleep_random(*HUMAN_DELAY)
# 如果没有滚动,强制到底部
if actual_delta < MIN_SCROLL_DELTA and push_count > 0:
page.scroll_to_bottom()
sleep_random(*POST_SCROLL)
current_scroll_top = page.get_scroll_top()
actual_delta = current_scroll_top - (before_top - actual_delta)
return actual_delta, current_scroll_top
def _scroll_to_comments_area(page: Page) -> None:
"""滚动到评论区。"""
logger.info("滚动到评论区...")
page.scroll_element_into_view(".comments-container")
time.sleep(0.5)
# 触发懒加载
page.dispatch_wheel_event(100)
def _scroll_to_last_comment(page: Page) -> None:
"""滚动到最后一条评论。"""
count = page.get_elements_count(PARENT_COMMENT)
if count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, count - 1)
# ========== DOM 查询 ==========
def _get_comment_count(page: Page) -> int:
"""获取当前评论数量。"""
return page.get_elements_count(PARENT_COMMENT)
def _get_total_comment_count(page: Page) -> int:
"""获取总评论数(从 "共N条评论" 提取)。"""
text = page.get_element_text(".comments-container .total")
if not text:
return 0
match = _TOTAL_COMMENT_RE.search(text)
if match:
return int(match.group(1))
return 0
def _check_no_comments(page: Page) -> bool:
"""检查是否无评论区域。"""
text = page.get_element_text(NO_COMMENTS_TEXT)
if not text:
return False
return "这是一片荒地" in text.strip()
def _check_end_container(page: Page) -> bool:
"""检查是否到达底部 THE END。"""
text = page.get_element_text(END_CONTAINER)
if not text:
return False
upper = text.strip().upper()
return "THE END" in upper or "THEEND" in upper
# ========== 按钮点击 ==========
def _click_show_more_buttons(page: Page, max_threshold: int) -> tuple[int, int]:
"""点击"展开N条回复"按钮。
Returns:
(clicked, skipped)
"""
count = page.get_elements_count(SHOW_MORE_BUTTON)
if count == 0:
return 0, 0
max_click = MAX_CLICK_PER_ROUND + random.randint(0, MAX_CLICK_PER_ROUND - 1)
clicked = 0
skipped = 0
for i in range(count):
if clicked >= max_click:
break
# 获取按钮文本
text = page.evaluate(
f"document.querySelectorAll({json.dumps(SHOW_MORE_BUTTON)})[{i}]?.textContent || ''"
)
if not text:
continue
# 检查是否应该跳过
if max_threshold > 0:
match = _REPLY_COUNT_RE.search(text)
if match:
reply_count = int(match.group(1))
if reply_count > max_threshold:
logger.debug(
"跳过 '%s'(回复数 %d > 阈值 %d)", text, reply_count, max_threshold
)
skipped += 1
continue
# 滚动到按钮并点击
page.scroll_nth_element_into_view(SHOW_MORE_BUTTON, i)
sleep_random(*REACTION_TIME)
page.evaluate(f"document.querySelectorAll({json.dumps(SHOW_MORE_BUTTON)})[{i}]?.click()")
sleep_random(*READ_TIME)
clicked += 1
return clicked, skipped
... ...
"""首页 Feed 列表,对应 Go xiaohongshu/feeds.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedsError
from .types import Feed
from .urls import HOME_URL
logger = logging.getLogger(__name__)
# 从 __INITIAL_STATE__ 提取 feeds 的 JS
_EXTRACT_FEEDS_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.feed &&
window.__INITIAL_STATE__.feed.feeds) {
const feeds = window.__INITIAL_STATE__.feed.feeds;
const feedsData = feeds.value !== undefined ? feeds.value : feeds._value;
if (feedsData) {
return JSON.stringify(feedsData);
}
}
return "";
})()
"""
def list_feeds(page: Page) -> list[Feed]:
"""获取首页 Feed 列表。
Raises:
NoFeedsError: 没有捕获到 feeds 数据。
"""
page.navigate(HOME_URL)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
result = page.evaluate(_EXTRACT_FEEDS_JS)
if not result:
raise NoFeedsError()
feeds_data = json.loads(result)
return [Feed.from_dict(f) for f in feeds_data]
... ...
"""人类行为模拟参数(延迟、滚动、悬停),对应 Go feed_detail.go 中的常量。"""
import random
import time
# ========== 配置常量 ==========
DEFAULT_MAX_ATTEMPTS = 500
STAGNANT_LIMIT = 20
MIN_SCROLL_DELTA = 10
MAX_CLICK_PER_ROUND = 3
STAGNANT_CHECK_THRESHOLD = 2
LARGE_SCROLL_TRIGGER = 5
BUTTON_CLICK_INTERVAL = 3
FINAL_SPRINT_PUSH_COUNT = 15
# ========== 延迟范围(毫秒) ==========
HUMAN_DELAY = (300, 700)
REACTION_TIME = (300, 800)
HOVER_TIME = (100, 300)
READ_TIME = (500, 1200)
SHORT_READ = (600, 1200)
SCROLL_WAIT = (100, 200)
POST_SCROLL = (300, 500)
def sleep_random(min_ms: int, max_ms: int) -> None:
"""随机延迟。"""
if max_ms <= min_ms:
time.sleep(min_ms / 1000.0)
return
delay = random.randint(min_ms, max_ms) / 1000.0
time.sleep(delay)
def get_scroll_interval(speed: str) -> float:
"""根据速度获取滚动间隔(秒)。"""
if speed == "slow":
return (1200 + random.randint(0, 300)) / 1000.0
if speed == "fast":
return (300 + random.randint(0, 100)) / 1000.0
# normal
return (600 + random.randint(0, 200)) / 1000.0
def get_scroll_ratio(speed: str) -> float:
"""根据速度获取滚动比例。"""
if speed == "slow":
return 0.5
if speed == "fast":
return 0.9
return 0.7
def calculate_scroll_delta(viewport_height: int, base_ratio: float) -> float:
"""计算滚动距离。"""
scroll_delta = viewport_height * (base_ratio + random.random() * 0.2)
if scroll_delta < 400:
scroll_delta = 400.0
return scroll_delta + random.randint(-50, 50)
# 页面不可访问关键词
INACCESSIBLE_KEYWORDS = [
"当前笔记暂时无法浏览",
"该内容因违规已被删除",
"该笔记已被删除",
"内容不存在",
"笔记不存在",
"已失效",
"私密笔记",
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
]
... ...
"""点赞/收藏操作,对应 Go xiaohongshu/like_favorite.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedDetailError
from .selectors import COLLECT_BUTTON, LIKE_BUTTON
from .types import ActionResult
from .urls import make_feed_detail_url
logger = logging.getLogger(__name__)
# 从 __INITIAL_STATE__ 读取互动状态的 JS
_GET_INTERACT_STATE_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.note &&
window.__INITIAL_STATE__.note.noteDetailMap) {
return JSON.stringify(window.__INITIAL_STATE__.note.noteDetailMap);
}
return "";
})()
"""
def _get_interact_state(page: Page, feed_id: str) -> tuple[bool, bool]:
"""读取笔记的点赞/收藏状态。
Returns:
(liked, collected)
Raises:
NoFeedDetailError: 无法获取状态。
"""
result = page.evaluate(_GET_INTERACT_STATE_JS)
if not result:
raise NoFeedDetailError()
note_detail_map = json.loads(result)
detail = note_detail_map.get(feed_id)
if not detail:
raise NoFeedDetailError()
interact = detail.get("note", {}).get("interactInfo", {})
return interact.get("liked", False), interact.get("collected", False)
def _prepare_page(page: Page, feed_id: str, xsec_token: str) -> None:
"""导航到 feed 详情页。"""
url = make_feed_detail_url(feed_id, xsec_token)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
# ========== 点赞 ==========
def like_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""点赞笔记(幂等:已点赞则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_like(page, feed_id, target_liked=True)
def unlike_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""取消点赞(幂等:未点赞则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_like(page, feed_id, target_liked=False)
def _toggle_like(page: Page, feed_id: str, target_liked: bool) -> ActionResult:
"""执行点赞/取消点赞操作。"""
action_name = "点赞" if target_liked else "取消点赞"
try:
liked, _ = _get_interact_state(page, feed_id)
except NoFeedDetailError:
logger.warning("无法读取互动状态,直接点击")
liked = not target_liked # 强制执行点击
# 幂等检查
if liked == target_liked:
logger.info("feed %s 已%s,跳过", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"已{action_name}")
# 点击
page.click_element(LIKE_BUTTON)
time.sleep(3)
# 验证
try:
liked, _ = _get_interact_state(page, feed_id)
if liked == target_liked:
logger.info("feed %s %s成功", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}成功")
except NoFeedDetailError:
pass
# 重试一次
logger.warning("feed %s %s可能未成功,重试", feed_id, action_name)
page.click_element(LIKE_BUTTON)
time.sleep(2)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}已执行")
# ========== 收藏 ==========
def favorite_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""收藏笔记(幂等:已收藏则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_favorite(page, feed_id, target_collected=True)
def unfavorite_feed(page: Page, feed_id: str, xsec_token: str) -> ActionResult:
"""取消收藏(幂等:未收藏则跳过)。"""
_prepare_page(page, feed_id, xsec_token)
return _toggle_favorite(page, feed_id, target_collected=False)
def _toggle_favorite(page: Page, feed_id: str, target_collected: bool) -> ActionResult:
"""执行收藏/取消收藏操作。"""
action_name = "收藏" if target_collected else "取消收藏"
try:
_, collected = _get_interact_state(page, feed_id)
except NoFeedDetailError:
logger.warning("无法读取互动状态,直接点击")
collected = not target_collected
# 幂等检查
if collected == target_collected:
logger.info("feed %s 已%s,跳过", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"已{action_name}")
# 点击
page.click_element(COLLECT_BUTTON)
time.sleep(3)
# 验证
try:
_, collected = _get_interact_state(page, feed_id)
if collected == target_collected:
logger.info("feed %s %s成功", feed_id, action_name)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}成功")
except NoFeedDetailError:
pass
# 重试
logger.warning("feed %s %s可能未成功,重试", feed_id, action_name)
page.click_element(COLLECT_BUTTON)
time.sleep(2)
return ActionResult(feed_id=feed_id, success=True, message=f"{action_name}已执行")
... ...
"""登录管理,对应 Go xiaohongshu/login.go。"""
from __future__ import annotations
import base64
import logging
import os
import tempfile
import time
from .cdp import Page
from .selectors import LOGIN_STATUS, QRCODE_IMG
from .urls import EXPLORE_URL
logger = logging.getLogger(__name__)
def check_login_status(page: Page) -> bool:
"""检查登录状态。
Returns:
True 已登录,False 未登录。
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(1)
return page.has_element(LOGIN_STATUS)
def fetch_qrcode(page: Page) -> tuple[str, bool]:
"""获取登录二维码。
Returns:
(qrcode_src, already_logged_in)
- 如果已登录,返回 ("", True)
- 如果未登录,返回 (qrcode_base64_or_url, False)
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(2)
# 检查是否已登录
if page.has_element(LOGIN_STATUS):
return "", True
# 获取二维码图片 src
src = page.get_element_attribute(QRCODE_IMG, "src")
if not src:
raise RuntimeError("二维码图片 src 为空")
return src, False
def save_qrcode_to_file(src: str) -> str:
"""将二维码 data URL 保存为临时 PNG 文件。
Args:
src: 二维码图片的 data URL(data:image/png;base64,...)或普通 URL。
Returns:
保存的文件绝对路径。
"""
prefix = "data:image/png;base64,"
if src.startswith(prefix):
img_data = base64.b64decode(src[len(prefix) :])
elif src.startswith("data:image/"):
# 处理其他 MIME 类型,如 data:image/jpeg;base64,...
_, encoded = src.split(",", 1)
img_data = base64.b64decode(encoded)
else:
# 不是 data URL,无法保存
raise ValueError(f"不支持的二维码格式,需要 data URL: {src[:50]}...")
qr_dir = os.path.join(tempfile.gettempdir(), "xhs")
os.makedirs(qr_dir, exist_ok=True)
filepath = os.path.join(qr_dir, "login_qrcode.png")
with open(filepath, "wb") as f:
f.write(img_data)
logger.info("二维码已保存: %s", filepath)
return filepath
def wait_for_login(page: Page, timeout: float = 120.0) -> bool:
"""等待扫码登录完成。
Args:
page: CDP 页面对象。
timeout: 超时时间(秒)。
Returns:
True 登录成功,False 超时。
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if page.has_element(LOGIN_STATUS):
logger.info("登录成功")
return True
time.sleep(0.5)
return False
... ...
"""图文发布,对应 Go xiaohongshu/publish.go(837 行)。"""
from __future__ import annotations
import json
import logging
import random
import time
from .cdp import Page
from .errors import ContentTooLongError, PublishError, TitleTooLongError, UploadTimeoutError
from .selectors import (
CONTENT_EDITOR,
CONTENT_LENGTH_ERROR,
CREATOR_TAB,
DATETIME_INPUT,
FILE_INPUT,
IMAGE_PREVIEW,
ORIGINAL_SWITCH,
ORIGINAL_SWITCH_CARD,
POPOVER,
PUBLISH_BUTTON,
SCHEDULE_SWITCH,
TAG_FIRST_ITEM,
TAG_TOPIC_CONTAINER,
TITLE_INPUT,
TITLE_MAX_SUFFIX,
UPLOAD_CONTENT,
UPLOAD_INPUT,
VISIBILITY_DROPDOWN,
VISIBILITY_OPTIONS,
)
from .types import PublishImageContent
from .urls import PUBLISH_URL
logger = logging.getLogger(__name__)
def publish_image_content(page: Page, content: PublishImageContent) -> None:
"""发布图文内容。
Args:
page: CDP 页面对象。
content: 发布内容。
Raises:
PublishError: 发布失败。
UploadTimeoutError: 上传超时。
TitleTooLongError: 标题超长。
ContentTooLongError: 正文超长。
"""
if not content.image_paths:
raise PublishError("图片不能为空")
# 导航到发布页
_navigate_to_publish_page(page)
# 点击"上传图文" TAB
_click_publish_tab(page, "上传图文")
time.sleep(1)
# 上传图片
_upload_images(page, content.image_paths)
# 标签截取
tags = content.tags[:10] if len(content.tags) > 10 else content.tags
if len(content.tags) > 10:
logger.warning("标签数量超过10,截取前10个")
logger.info(
"发布内容: title=%s, images=%d, tags=%d, schedule=%s, original=%s, visibility=%s",
content.title,
len(content.image_paths),
len(tags),
content.schedule_time,
content.is_original,
content.visibility,
)
# 提交发布
_submit_publish(
page,
content.title,
content.content,
tags,
content.schedule_time,
content.is_original,
content.visibility,
)
# ========== 页面导航 ==========
def _navigate_to_publish_page(page: Page) -> None:
"""导航到发布页面。"""
page.navigate(PUBLISH_URL)
page.wait_for_load(timeout=300)
time.sleep(2)
page.wait_dom_stable()
time.sleep(1)
def _click_publish_tab(page: Page, tab_name: str) -> None:
"""点击发布页 TAB(上传图文/上传视频)。"""
page.wait_for_element(UPLOAD_CONTENT, timeout=15)
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
# 查找匹配的 TAB
found = page.evaluate(
f"""
(() => {{
const tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
for (const tab of tabs) {{
if (tab.textContent.trim() === {json.dumps(tab_name)}) {{
// 检查是否被遮挡
const rect = tab.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) continue;
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const target = document.elementFromPoint(x, y);
if (target === tab || tab.contains(target)) {{
tab.click();
return 'clicked';
}}
return 'blocked';
}}
}}
return 'not_found';
}})()
"""
)
if found == "clicked":
return
if found == "blocked":
# 尝试移除弹窗
_remove_pop_cover(page)
time.sleep(0.2)
raise PublishError(f"没有找到发布 TAB - {tab_name}")
def _remove_pop_cover(page: Page) -> None:
"""移除弹窗遮挡。"""
if page.has_element(POPOVER):
page.remove_element(POPOVER)
# 点击空位置
x = 380 + random.randint(0, 100)
y = 20 + random.randint(0, 60)
page.mouse_click(float(x), float(y))
# ========== 图片上传 ==========
def _upload_images(page: Page, image_paths: list[str]) -> None:
"""逐张上传图片。"""
import os
valid_paths = [p for p in image_paths if os.path.exists(p)]
if not valid_paths:
raise PublishError("没有有效的图片文件")
for i, path in enumerate(valid_paths):
selector = UPLOAD_INPUT if i == 0 else FILE_INPUT
logger.info("上传第 %d 张图片: %s", i + 1, path)
page.set_file_input(selector, [path])
_wait_for_upload_complete(page, i + 1)
time.sleep(1)
def _wait_for_upload_complete(page: Page, expected_count: int) -> None:
"""等待图片上传完成。"""
max_wait = 60.0
start = time.monotonic()
while time.monotonic() - start < max_wait:
count = page.get_elements_count(IMAGE_PREVIEW)
if count >= expected_count:
logger.info("图片上传完成: %d", count)
return
time.sleep(0.5)
raise UploadTimeoutError(f"第{expected_count}张图片上传超时(60s)")
# ========== 表单提交 ==========
def _submit_publish(
page: Page,
title: str,
content: str,
tags: list[str],
schedule_time: str | None,
is_original: bool,
visibility: str,
) -> None:
"""填写表单并提交。"""
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(0.5)
_check_title_max_length(page)
logger.info("标题长度检查通过")
time.sleep(1)
# 正文
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, content)
# 回点标题(增强稳定性)
time.sleep(1)
page.click_element(TITLE_INPUT)
logger.info("已回点标题输入框")
# 标签
if tags:
_input_tags(page, content_selector, tags)
time.sleep(1)
_check_content_max_length(page)
logger.info("正文长度检查通过")
# 定时发布
if schedule_time:
_set_schedule_publish(page, schedule_time)
# 可见范围
_set_visibility(page, visibility)
# 原创声明
if is_original:
try:
_set_original(page)
logger.info("已声明原创")
except Exception as e:
logger.warning("设置原创声明失败: %s", e)
# 点击发布
page.click_element(PUBLISH_BUTTON)
time.sleep(3)
logger.info("发布完成")
def _find_content_element(page: Page) -> str:
"""查找内容输入框(兼容两种 UI)。"""
if page.has_element(CONTENT_EDITOR):
return CONTENT_EDITOR
# 查找带 placeholder 的 p 元素的 textbox 父元素
found = page.evaluate(
"""
(() => {
const ps = document.querySelectorAll('p');
for (const p of ps) {
const placeholder = p.getAttribute('data-placeholder');
if (placeholder && placeholder.includes('输入正文描述')) {
let current = p;
for (let i = 0; i < 5; i++) {
current = current.parentElement;
if (!current) break;
if (current.getAttribute('role') === 'textbox') {
return 'found';
}
}
}
}
return '';
})()
"""
)
if found == "found":
return "[role='textbox']"
raise PublishError("没有找到内容输入框")
def _check_title_max_length(page: Page) -> None:
"""检查标题长度是否超限。"""
text = page.get_element_text(TITLE_MAX_SUFFIX)
if text:
parts = text.split("/")
if len(parts) == 2:
raise TitleTooLongError(parts[0], parts[1])
raise TitleTooLongError(text, "?")
def _check_content_max_length(page: Page) -> None:
"""检查正文长度是否超限。"""
text = page.get_element_text(CONTENT_LENGTH_ERROR)
if text:
parts = text.split("/")
if len(parts) == 2:
raise ContentTooLongError(parts[0], parts[1])
raise ContentTooLongError(text, "?")
# ========== 标签输入 ==========
def _input_tags(page: Page, content_selector: str, tags: list[str]) -> None:
"""输入标签。"""
time.sleep(1)
# 移动光标到正文末尾(20次 ArrowDown)
for _ in range(20):
page.press_key("ArrowDown")
time.sleep(0.01)
# 按两次回车换行
page.press_key("Enter")
page.press_key("Enter")
time.sleep(1)
for tag in tags:
tag = tag.lstrip("#")
_input_single_tag(page, content_selector, tag)
def _input_single_tag(page: Page, content_selector: str, tag: str) -> None:
"""输入单个标签。"""
# 输入 #
page.type_text("#", delay_ms=0)
time.sleep(0.2)
# 逐字输入标签
for char in tag:
page.type_text(char, delay_ms=50)
time.sleep(1)
# 尝试点击标签联想
if page.has_element(TAG_TOPIC_CONTAINER):
item_selector = f"{TAG_TOPIC_CONTAINER} {TAG_FIRST_ITEM}"
if page.has_element(item_selector):
page.click_element(item_selector)
logger.info("点击标签联想: %s", tag)
time.sleep(0.5)
return
# 没有联想,直接空格
logger.warning("未找到标签联想,直接输入空格: %s", tag)
page.type_text(" ", delay_ms=0)
time.sleep(0.5)
# ========== 定时发布 ==========
def _set_schedule_publish(page: Page, schedule_time: str) -> None:
"""设置定时发布。"""
from datetime import datetime
# 解析 ISO8601 时间
try:
dt = datetime.fromisoformat(schedule_time)
except ValueError as e:
raise PublishError(f"定时发布时间格式错误: {e}") from e
# 点击定时发布开关
page.click_element(SCHEDULE_SWITCH)
time.sleep(0.8)
# 设置日期时间
datetime_str = dt.strftime("%Y-%m-%d %H:%M")
page.select_all_text(DATETIME_INPUT)
page.input_text(DATETIME_INPUT, datetime_str)
time.sleep(0.5)
logger.info("已设置定时发布: %s", datetime_str)
# ========== 可见范围 ==========
def _set_visibility(page: Page, visibility: str) -> None:
"""设置可见范围。"""
if not visibility or visibility == "公开可见":
logger.info("可见范围: 公开可见(默认)")
return
supported = {"仅自己可见", "仅互关好友可见"}
if visibility not in supported:
raise PublishError(
f"不支持的可见范围: {visibility},支持: 公开可见、仅自己可见、仅互关好友可见"
)
# 点击下拉框
page.click_element(VISIBILITY_DROPDOWN)
time.sleep(0.5)
# 查找并点击目标选项
clicked = page.evaluate(
f"""
(() => {{
const opts = document.querySelectorAll({json.dumps(VISIBILITY_OPTIONS)});
for (const opt of opts) {{
if (opt.textContent.includes({json.dumps(visibility)})) {{
opt.click();
return true;
}}
}}
return false;
}})()
"""
)
if not clicked:
raise PublishError(f"未找到可见范围选项: {visibility}")
logger.info("已设置可见范围: %s", visibility)
time.sleep(0.2)
# ========== 原创声明 ==========
def _set_original(page: Page) -> None:
"""设置原创声明。"""
# 查找原创声明卡片并点击开关
result = page.evaluate(
f"""
(() => {{
const cards = document.querySelectorAll({json.dumps(ORIGINAL_SWITCH_CARD)});
for (const card of cards) {{
if (!card.textContent.includes('原创声明')) continue;
const sw = card.querySelector({json.dumps(ORIGINAL_SWITCH)});
if (!sw) continue;
const input = sw.querySelector('input[type="checkbox"]');
if (input && input.checked) return 'already_on';
sw.click();
return 'clicked';
}}
return 'not_found';
}})()
"""
)
if result == "already_on":
logger.info("原创声明已开启")
return
if result == "not_found":
raise PublishError("未找到原创声明选项")
time.sleep(0.5)
# 处理确认弹窗
_confirm_original_declaration(page)
def _confirm_original_declaration(page: Page) -> None:
"""处理原创声明确认弹窗。"""
time.sleep(0.8)
# 勾选 checkbox
page.evaluate(
"""
(() => {
const footers = document.querySelectorAll('div.footer');
for (const footer of footers) {
if (!footer.textContent.includes('原创声明须知')) continue;
const cb = footer.querySelector('div.d-checkbox input[type="checkbox"]');
if (cb && !cb.checked) cb.click();
return;
}
})()
"""
)
time.sleep(0.5)
# 点击声明原创按钮
result = page.evaluate(
"""
(() => {
const footers = document.querySelectorAll('div.footer');
for (const footer of footers) {
if (!footer.textContent.includes('声明原创')) continue;
const btn = footer.querySelector('button.custom-button');
if (btn) {
if (btn.classList.contains('disabled') || btn.disabled) {
const cb = footer.querySelector('div.d-checkbox input[type="checkbox"]');
if (cb && !cb.checked) cb.click();
return 'button_disabled';
}
btn.click();
return 'clicked';
}
}
return 'button_not_found';
})()
"""
)
if result == "button_not_found":
raise PublishError("未找到声明原创按钮")
if result == "button_disabled":
raise PublishError("声明原创按钮仍处于禁用状态")
logger.info("已成功点击声明原创按钮")
time.sleep(0.3)
... ...
"""视频发布,对应 Go xiaohongshu/publish_video.go。"""
from __future__ import annotations
import logging
import os
import time
from .cdp import Page
from .errors import PublishError, UploadTimeoutError
from .publish import (
_click_publish_tab,
_find_content_element,
_input_tags,
_navigate_to_publish_page,
_set_schedule_publish,
_set_visibility,
)
from .selectors import (
FILE_INPUT,
PUBLISH_BUTTON,
TITLE_INPUT,
UPLOAD_INPUT,
)
from .types import PublishVideoContent
logger = logging.getLogger(__name__)
def publish_video_content(page: Page, content: PublishVideoContent) -> None:
"""发布视频内容。
Args:
page: CDP 页面对象。
content: 视频发布内容。
Raises:
PublishError: 发布失败。
UploadTimeoutError: 上传/处理超时。
"""
if not content.video_path:
raise PublishError("视频不能为空")
# 导航到发布页
_navigate_to_publish_page(page)
# 点击"上传视频" TAB
_click_publish_tab(page, "上传视频")
time.sleep(1)
# 上传视频
_upload_video(page, content.video_path)
# 提交
_submit_publish_video(
page,
content.title,
content.content,
content.tags,
content.schedule_time,
content.visibility,
)
def _upload_video(page: Page, video_path: str) -> None:
"""上传视频文件。"""
if not os.path.exists(video_path):
raise PublishError(f"视频文件不存在: {video_path}")
# 查找上传输入框
selector = UPLOAD_INPUT if page.has_element(UPLOAD_INPUT) else FILE_INPUT
page.set_file_input(selector, [video_path])
# 等待发布按钮可点击(视频处理完成)
_wait_for_publish_button_clickable(page)
logger.info("视频上传/处理完成")
def _wait_for_publish_button_clickable(page: Page) -> None:
"""等待发布按钮可点击(视频处理可能需要较长时间)。"""
max_wait = 600.0 # 10 分钟
start = time.monotonic()
logger.info("开始等待发布按钮可点击(视频)")
while time.monotonic() - start < max_wait:
clickable = page.evaluate(
f"""
(() => {{
const btn = document.querySelector({_js_str(PUBLISH_BUTTON)});
if (!btn) return false;
const rect = btn.getBoundingClientRect();
if (rect.width === 0 || rect.height === 0) return false;
if (btn.disabled) return false;
if (btn.classList.contains('disabled')) return false;
return true;
}})()
"""
)
if clickable:
return
time.sleep(1)
raise UploadTimeoutError("等待发布按钮可点击超时(10分钟)")
def _submit_publish_video(
page: Page,
title: str,
content: str,
tags: list[str],
schedule_time: str | None,
visibility: str,
) -> None:
"""填写视频表单并提交。"""
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(1)
# 正文 + 标签
content_selector = _find_content_element(page)
page.input_content_editable(content_selector, content)
# 回点标题
time.sleep(1)
page.click_element(TITLE_INPUT)
if tags:
_input_tags(page, content_selector, tags)
time.sleep(1)
# 定时发布
if schedule_time:
_set_schedule_publish(page, schedule_time)
# 可见范围
_set_visibility(page, visibility)
# 等待发布按钮可点击
_wait_for_publish_button_clickable(page)
# 点击发布
page.click_element(PUBLISH_BUTTON)
time.sleep(3)
logger.info("视频发布完成")
def _js_str(s: str) -> str:
"""将 Python 字符串转为 JS 字面量。"""
import json
return json.dumps(s)
... ...
"""搜索 Feeds,对应 Go xiaohongshu/search.go。"""
from __future__ import annotations
import json
import logging
import time
from .cdp import Page
from .errors import NoFeedsError
from .selectors import FILTER_BUTTON, FILTER_PANEL
from .types import Feed, FilterOption
from .urls import make_search_url
logger = logging.getLogger(__name__)
# 筛选选项映射表:{筛选组索引: [(标签索引, 文本), ...]}
_FILTER_OPTIONS: dict[int, list[tuple[int, str]]] = {
1: [(1, "综合"), (2, "最新"), (3, "最多点赞"), (4, "最多评论"), (5, "最多收藏")],
2: [(1, "不限"), (2, "视频"), (3, "图文")],
3: [(1, "不限"), (2, "一天内"), (3, "一周内"), (4, "半年内")],
4: [(1, "不限"), (2, "已看过"), (3, "未看过"), (4, "已关注")],
5: [(1, "不限"), (2, "同城"), (3, "附近")],
}
# 从 __INITIAL_STATE__ 提取搜索结果的 JS
_EXTRACT_SEARCH_JS = """
(() => {
if (window.__INITIAL_STATE__ &&
window.__INITIAL_STATE__.search &&
window.__INITIAL_STATE__.search.feeds) {
const feeds = window.__INITIAL_STATE__.search.feeds;
const feedsData = feeds.value !== undefined ? feeds.value : feeds._value;
if (feedsData) {
return JSON.stringify(feedsData);
}
}
return "";
})()
"""
def _find_internal_option(group_index: int, text: str) -> tuple[int, int]:
"""查找内部筛选选项索引。
Returns:
(filters_index, tags_index)
Raises:
ValueError: 未找到匹配的选项。
"""
options = _FILTER_OPTIONS.get(group_index)
if not options:
raise ValueError(f"筛选组 {group_index} 不存在")
for tags_index, option_text in options:
if option_text == text:
return group_index, tags_index
valid = [t for _, t in options]
raise ValueError(f"在筛选组 {group_index} 中未找到 '{text}',有效值: {valid}")
def _convert_filters(filter_opt: FilterOption) -> list[tuple[int, int]]:
"""将 FilterOption 转换为内部 (filters_index, tags_index) 列表。"""
result: list[tuple[int, int]] = []
if filter_opt.sort_by:
result.append(_find_internal_option(1, filter_opt.sort_by))
if filter_opt.note_type:
result.append(_find_internal_option(2, filter_opt.note_type))
if filter_opt.publish_time:
result.append(_find_internal_option(3, filter_opt.publish_time))
if filter_opt.search_scope:
result.append(_find_internal_option(4, filter_opt.search_scope))
if filter_opt.location:
result.append(_find_internal_option(5, filter_opt.location))
return result
def search_feeds(
page: Page,
keyword: str,
filter_option: FilterOption | None = None,
) -> list[Feed]:
"""搜索 Feeds。
Args:
page: CDP 页面对象。
keyword: 搜索关键词。
filter_option: 可选筛选条件。
Raises:
NoFeedsError: 没有捕获到搜索结果。
ValueError: 筛选选项无效。
"""
search_url = make_search_url(keyword)
page.navigate(search_url)
page.wait_for_load()
page.wait_dom_stable()
# 等待 __INITIAL_STATE__ 初始化
_wait_for_initial_state(page)
# 应用筛选条件
if filter_option:
internal_filters = _convert_filters(filter_option)
if internal_filters:
_apply_filters(page, internal_filters)
# 提取搜索结果
result = page.evaluate(_EXTRACT_SEARCH_JS)
if not result:
raise NoFeedsError()
feeds_data = json.loads(result)
return [Feed.from_dict(f) for f in feeds_data]
def _wait_for_initial_state(page: Page, timeout: float = 10.0) -> None:
"""等待 __INITIAL_STATE__ 就绪。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
ready = page.evaluate("window.__INITIAL_STATE__ !== undefined")
if ready:
return
time.sleep(0.5)
logger.warning("等待 __INITIAL_STATE__ 超时")
def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
"""应用筛选条件。"""
# 悬停筛选按钮
page.hover_element(FILTER_BUTTON)
# 等待筛选面板出现
deadline = time.monotonic() + 5.0
while time.monotonic() < deadline:
if page.has_element(FILTER_PANEL):
break
time.sleep(0.3)
# 点击各筛选项
for filters_index, tags_index in filters:
selector = (
f"div.filter-panel div.filters:nth-child({filters_index}) "
f"div.tags:nth-child({tags_index})"
)
page.click_element(selector)
time.sleep(0.3)
# 等待页面更新
page.wait_dom_stable()
_wait_for_initial_state(page)
... ...
"""小红书页面 CSS 选择器常量。"""
# ========== 登录 ==========
LOGIN_STATUS = ".main-container .user .link-wrapper .channel"
QRCODE_IMG = ".login-container .qrcode-img"
# ========== 首页 / 搜索 ==========
FILTER_BUTTON = "div.filter"
FILTER_PANEL = "div.filter-panel"
# ========== Feed 详情 ==========
COMMENTS_CONTAINER = ".comments-container"
PARENT_COMMENT = ".parent-comment"
NO_COMMENTS_TEXT = ".no-comments-text"
END_CONTAINER = ".end-container"
TOTAL_COMMENT = ".comments-container .total"
SHOW_MORE_BUTTON = ".show-more"
NOTE_SCROLLER = ".note-scroller"
INTERACTION_CONTAINER = ".interaction-container"
# 页面不可访问容器
ACCESS_ERROR_WRAPPER = ".access-wrapper, .error-wrapper, .not-found-wrapper, .blocked-wrapper"
# ========== 评论输入 ==========
COMMENT_INPUT_TRIGGER = "div.input-box div.content-edit span"
COMMENT_INPUT_FIELD = "div.input-box div.content-edit p.content-input"
COMMENT_SUBMIT_BUTTON = "div.bottom button.submit"
REPLY_BUTTON = ".right .interactions .reply"
# ========== 点赞 / 收藏 ==========
LIKE_BUTTON = ".interact-container .left .like-lottie"
COLLECT_BUTTON = ".interact-container .left .reds-icon.collect-icon"
# ========== 发布页 ==========
UPLOAD_CONTENT = "div.upload-content"
CREATOR_TAB = "div.creator-tab"
UPLOAD_INPUT = ".upload-input"
FILE_INPUT = 'input[type="file"]'
TITLE_INPUT = "div.d-input input"
CONTENT_EDITOR = "div.ql-editor"
IMAGE_PREVIEW = ".img-preview-area .pr"
PUBLISH_BUTTON = ".publish-page-publish-btn button.bg-red"
# 标题/正文长度校验
TITLE_MAX_SUFFIX = "div.title-container div.max_suffix"
CONTENT_LENGTH_ERROR = "div.edit-container div.length-error"
# 可见范围
VISIBILITY_DROPDOWN = "div.permission-card-wrapper div.d-select-content"
VISIBILITY_OPTIONS = "div.d-options-wrapper div.d-grid-item div.custom-option"
# 定时发布
SCHEDULE_SWITCH = ".post-time-wrapper .d-switch"
DATETIME_INPUT = ".date-picker-container input"
# 原创声明
ORIGINAL_SWITCH_CARD = "div.custom-switch-card"
ORIGINAL_SWITCH = "div.d-switch"
# 标签联想
TAG_TOPIC_CONTAINER = "#creator-editor-topic-container"
TAG_FIRST_ITEM = ".item"
# 弹窗
POPOVER = "div.d-popover"
# ========== 用户主页 ==========
SIDEBAR_PROFILE = "div.main-container li.user.side-bar-component a.link-wrapper span.channel"
... ...
"""反检测 JS 注入 + Chrome 启动参数,对应 go-rod/stealth。"""
# 反检测 JS 脚本:在页面加载时注入
STEALTH_JS = """
(() => {
// 1. navigator.webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
configurable: true,
});
// 2. chrome.runtime
if (!window.chrome) {
window.chrome = {};
}
if (!window.chrome.runtime) {
window.chrome.runtime = {
connect: () => {},
sendMessage: () => {},
};
}
// 3. plugins
Object.defineProperty(navigator, 'plugins', {
get: () => {
return [
{
0: {type: 'application/x-google-chrome-pdf'},
description: 'Portable Document Format',
filename: 'internal-pdf-viewer',
length: 1,
name: 'Chrome PDF Plugin',
},
{
0: {type: 'application/pdf'},
description: '',
filename: 'mhjfbmdgcfjbbpaeojofohoefgiehjai',
length: 1,
name: 'Chrome PDF Viewer',
},
{
0: {type: 'application/x-nacl'},
description: '',
filename: 'internal-nacl-plugin',
length: 1,
name: 'Native Client',
},
];
},
configurable: true,
});
// 4. languages
Object.defineProperty(navigator, 'languages', {
get: () => ['zh-CN', 'zh', 'en-US', 'en'],
configurable: true,
});
// 5. permissions
const originalQuery = window.navigator.permissions?.query;
if (originalQuery) {
window.navigator.permissions.query = (parameters) =>
parameters.name === 'notifications'
? Promise.resolve({ state: Notification.permission })
: originalQuery(parameters);
}
// 6. WebGL vendor/renderer
const getParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) return 'Intel Inc.';
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
})();
"""
# Chrome 启动参数(反检测相关)
STEALTH_ARGS = [
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
"--no-first-run",
"--no-default-browser-check",
"--disable-background-timer-throttling",
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-component-update",
]
... ...
"""小红书数据类型定义,对应 Go types.go。"""
from __future__ import annotations
from dataclasses import dataclass, field
# ========== Feed 列表 ==========
@dataclass
class ImageInfo:
image_scene: str = ""
url: str = ""
@classmethod
def from_dict(cls, d: dict) -> ImageInfo:
return cls(
image_scene=d.get("imageScene", ""),
url=d.get("url", ""),
)
@dataclass
class VideoCapability:
duration: int = 0 # 秒
@classmethod
def from_dict(cls, d: dict) -> VideoCapability:
return cls(duration=d.get("duration", 0))
@dataclass
class Video:
capa: VideoCapability = field(default_factory=VideoCapability)
@classmethod
def from_dict(cls, d: dict) -> Video:
return cls(capa=VideoCapability.from_dict(d.get("capa", {})))
@dataclass
class Cover:
width: int = 0
height: int = 0
url: str = ""
file_id: str = ""
url_pre: str = ""
url_default: str = ""
info_list: list[ImageInfo] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> Cover:
return cls(
width=d.get("width", 0),
height=d.get("height", 0),
url=d.get("url", ""),
file_id=d.get("fileId", ""),
url_pre=d.get("urlPre", ""),
url_default=d.get("urlDefault", ""),
info_list=[ImageInfo.from_dict(i) for i in d.get("infoList", [])],
)
@dataclass
class User:
user_id: str = ""
nickname: str = ""
nick_name: str = ""
avatar: str = ""
@classmethod
def from_dict(cls, d: dict) -> User:
return cls(
user_id=d.get("userId", ""),
nickname=d.get("nickname", ""),
nick_name=d.get("nickName", ""),
avatar=d.get("avatar", ""),
)
@dataclass
class InteractInfo:
liked: bool = False
liked_count: str = ""
shared_count: str = ""
comment_count: str = ""
collected_count: str = ""
collected: bool = False
@classmethod
def from_dict(cls, d: dict) -> InteractInfo:
return cls(
liked=d.get("liked", False),
liked_count=d.get("likedCount", ""),
shared_count=d.get("sharedCount", ""),
comment_count=d.get("commentCount", ""),
collected_count=d.get("collectedCount", ""),
collected=d.get("collected", False),
)
@dataclass
class NoteCard:
type: str = ""
display_title: str = ""
user: User = field(default_factory=User)
interact_info: InteractInfo = field(default_factory=InteractInfo)
cover: Cover = field(default_factory=Cover)
video: Video | None = None
@classmethod
def from_dict(cls, d: dict) -> NoteCard:
video_data = d.get("video")
return cls(
type=d.get("type", ""),
display_title=d.get("displayTitle", ""),
user=User.from_dict(d.get("user", {})),
interact_info=InteractInfo.from_dict(d.get("interactInfo", {})),
cover=Cover.from_dict(d.get("cover", {})),
video=Video.from_dict(video_data) if video_data else None,
)
@dataclass
class Feed:
xsec_token: str = ""
id: str = ""
model_type: str = ""
note_card: NoteCard = field(default_factory=NoteCard)
index: int = 0
@classmethod
def from_dict(cls, d: dict) -> Feed:
return cls(
xsec_token=d.get("xsecToken", ""),
id=d.get("id", ""),
model_type=d.get("modelType", ""),
note_card=NoteCard.from_dict(d.get("noteCard", {})),
index=d.get("index", 0),
)
def to_dict(self) -> dict:
"""序列化为 JSON 兼容的字典。"""
result: dict = {
"id": self.id,
"xsecToken": self.xsec_token,
"modelType": self.model_type,
"index": self.index,
"displayTitle": self.note_card.display_title,
"type": self.note_card.type,
"user": {
"userId": self.note_card.user.user_id,
"nickname": self.note_card.user.nickname or self.note_card.user.nick_name,
},
"interactInfo": {
"likedCount": self.note_card.interact_info.liked_count,
"collectedCount": self.note_card.interact_info.collected_count,
"commentCount": self.note_card.interact_info.comment_count,
"sharedCount": self.note_card.interact_info.shared_count,
},
}
if self.note_card.video:
result["video"] = {"duration": self.note_card.video.capa.duration}
return result
# ========== Feed 详情 ==========
@dataclass
class DetailImageInfo:
width: int = 0
height: int = 0
url_default: str = ""
url_pre: str = ""
live_photo: bool = False
@classmethod
def from_dict(cls, d: dict) -> DetailImageInfo:
return cls(
width=d.get("width", 0),
height=d.get("height", 0),
url_default=d.get("urlDefault", ""),
url_pre=d.get("urlPre", ""),
live_photo=d.get("livePhoto", False),
)
@dataclass
class Comment:
id: str = ""
note_id: str = ""
content: str = ""
like_count: str = ""
create_time: int = 0
ip_location: str = ""
liked: bool = False
user_info: User = field(default_factory=User)
sub_comment_count: str = ""
sub_comments: list[Comment] = field(default_factory=list)
show_tags: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> Comment:
return cls(
id=d.get("id", ""),
note_id=d.get("noteId", ""),
content=d.get("content", ""),
like_count=d.get("likeCount", ""),
create_time=d.get("createTime", 0),
ip_location=d.get("ipLocation", ""),
liked=d.get("liked", False),
user_info=User.from_dict(d.get("userInfo", {})),
sub_comment_count=d.get("subCommentCount", ""),
sub_comments=[cls.from_dict(c) for c in d.get("subComments", []) or []],
show_tags=d.get("showTags", []) or [],
)
def to_dict(self) -> dict:
result: dict = {
"id": self.id,
"content": self.content,
"likeCount": self.like_count,
"createTime": self.create_time,
"ipLocation": self.ip_location,
"user": {
"userId": self.user_info.user_id,
"nickname": self.user_info.nickname or self.user_info.nick_name,
},
"subCommentCount": self.sub_comment_count,
}
if self.sub_comments:
result["subComments"] = [c.to_dict() for c in self.sub_comments]
return result
@dataclass
class CommentList:
list_: list[Comment] = field(default_factory=list)
cursor: str = ""
has_more: bool = False
@classmethod
def from_dict(cls, d: dict) -> CommentList:
return cls(
list_=[Comment.from_dict(c) for c in d.get("list", []) or []],
cursor=d.get("cursor", ""),
has_more=d.get("hasMore", False),
)
@dataclass
class FeedDetail:
note_id: str = ""
xsec_token: str = ""
title: str = ""
desc: str = ""
type: str = ""
time: int = 0
ip_location: str = ""
user: User = field(default_factory=User)
interact_info: InteractInfo = field(default_factory=InteractInfo)
image_list: list[DetailImageInfo] = field(default_factory=list)
@classmethod
def from_dict(cls, d: dict) -> FeedDetail:
return cls(
note_id=d.get("noteId", ""),
xsec_token=d.get("xsecToken", ""),
title=d.get("title", ""),
desc=d.get("desc", ""),
type=d.get("type", ""),
time=d.get("time", 0),
ip_location=d.get("ipLocation", ""),
user=User.from_dict(d.get("user", {})),
interact_info=InteractInfo.from_dict(d.get("interactInfo", {})),
image_list=[DetailImageInfo.from_dict(i) for i in d.get("imageList", []) or []],
)
def to_dict(self) -> dict:
return {
"noteId": self.note_id,
"title": self.title,
"desc": self.desc,
"type": self.type,
"time": self.time,
"ipLocation": self.ip_location,
"user": {
"userId": self.user.user_id,
"nickname": self.user.nickname or self.user.nick_name,
},
"interactInfo": {
"liked": self.interact_info.liked,
"likedCount": self.interact_info.liked_count,
"collectedCount": self.interact_info.collected_count,
"collected": self.interact_info.collected,
"commentCount": self.interact_info.comment_count,
"sharedCount": self.interact_info.shared_count,
},
"imageList": [
{
"width": img.width,
"height": img.height,
"urlDefault": img.url_default,
}
for img in self.image_list
],
}
@dataclass
class FeedDetailResponse:
note: FeedDetail = field(default_factory=FeedDetail)
comments: CommentList = field(default_factory=CommentList)
@classmethod
def from_dict(cls, d: dict) -> FeedDetailResponse:
return cls(
note=FeedDetail.from_dict(d.get("note", {})),
comments=CommentList.from_dict(d.get("comments", {})),
)
def to_dict(self) -> dict:
return {
"note": self.note.to_dict(),
"comments": [c.to_dict() for c in self.comments.list_],
}
# ========== 用户主页 ==========
@dataclass
class UserBasicInfo:
gender: int = 0
ip_location: str = ""
desc: str = ""
imageb: str = ""
nickname: str = ""
images: str = ""
red_id: str = ""
@classmethod
def from_dict(cls, d: dict) -> UserBasicInfo:
return cls(
gender=d.get("gender", 0),
ip_location=d.get("ipLocation", ""),
desc=d.get("desc", ""),
imageb=d.get("imageb", ""),
nickname=d.get("nickname", ""),
images=d.get("images", ""),
red_id=d.get("redId", ""),
)
@dataclass
class UserInteraction:
type: str = ""
name: str = ""
count: str = ""
@classmethod
def from_dict(cls, d: dict) -> UserInteraction:
return cls(
type=d.get("type", ""),
name=d.get("name", ""),
count=d.get("count", ""),
)
@dataclass
class UserProfileResponse:
user_basic_info: UserBasicInfo = field(default_factory=UserBasicInfo)
interactions: list[UserInteraction] = field(default_factory=list)
feeds: list[Feed] = field(default_factory=list)
def to_dict(self) -> dict:
return {
"basicInfo": {
"nickname": self.user_basic_info.nickname,
"redId": self.user_basic_info.red_id,
"desc": self.user_basic_info.desc,
"gender": self.user_basic_info.gender,
"ipLocation": self.user_basic_info.ip_location,
},
"interactions": [
{"type": i.type, "name": i.name, "count": i.count} for i in self.interactions
],
"feeds": [f.to_dict() for f in self.feeds],
}
# ========== 搜索 ==========
@dataclass
class FilterOption:
"""搜索筛选选项。"""
sort_by: str = "" # 综合|最新|最多点赞|最多评论|最多收藏
note_type: str = "" # 不限|视频|图文
publish_time: str = "" # 不限|一天内|一周内|半年内
search_scope: str = "" # 不限|已看过|未看过|已关注
location: str = "" # 不限|同城|附近
# ========== 发布 ==========
@dataclass
class PublishImageContent:
"""图文发布内容。"""
title: str = ""
content: str = ""
tags: list[str] = field(default_factory=list)
image_paths: list[str] = field(default_factory=list)
schedule_time: str | None = None # ISO8601 格式,None 表示立即发布
is_original: bool = False
visibility: str = "" # 公开可见(默认)|仅自己可见|仅互关好友可见
@dataclass
class PublishVideoContent:
"""视频发布内容。"""
title: str = ""
content: str = ""
tags: list[str] = field(default_factory=list)
video_path: str = ""
schedule_time: str | None = None # ISO8601 格式
visibility: str = "" # 公开可见(默认)|仅自己可见|仅互关好友可见
# ========== 互动 ==========
@dataclass
class ActionResult:
"""通用动作响应(点赞/收藏等)。"""
feed_id: str = ""
success: bool = False
message: str = ""
def to_dict(self) -> dict:
return {
"feed_id": self.feed_id,
"success": self.success,
"message": self.message,
}
# ========== 评论加载配置 ==========
@dataclass
class CommentLoadConfig:
"""评论加载配置。"""
click_more_replies: bool = False
max_replies_threshold: int = 10
max_comment_items: int = 0 # 0 = 不限
scroll_speed: str = "normal" # slow|normal|fast
... ...
"""小红书 URL 常量和构建函数。"""
from urllib.parse import urlencode
# 基础页面
EXPLORE_URL = "https://www.xiaohongshu.com/explore"
HOME_URL = "https://www.xiaohongshu.com"
PUBLISH_URL = "https://creator.xiaohongshu.com/publish/publish?source=official"
def make_feed_detail_url(feed_id: str, xsec_token: str) -> str:
"""构建 feed 详情页 URL。"""
return (
f"https://www.xiaohongshu.com/explore/{feed_id}?xsec_token={xsec_token}&xsec_source=pc_feed"
)
def make_search_url(keyword: str) -> str:
"""构建搜索结果页 URL。"""
params = urlencode({"keyword": keyword, "source": "web_explore_feed"})
return f"https://www.xiaohongshu.com/search_result?{params}"
def make_user_profile_url(user_id: str, xsec_token: str) -> str:
"""构建用户主页 URL。"""
return (
f"https://www.xiaohongshu.com/user/profile/{user_id}"
f"?xsec_token={xsec_token}&xsec_source=pc_note"
)
... ...