Angiin

feat: 优化登录流程,新增二维码截图并内嵌显示

- 新增 get-qrcode 子命令:非阻塞获取二维码,立即返回路径
- save_qrcode_to_file 支持 data URL 和网络 URL 两种格式
- 输出 qrcode_data_url 供 Skill 直接内嵌 Markdown 图片
- 新增 _add_png_border:纯 stdlib 给 PNG 添加 16px 白色边框
  支持 Grayscale/RGB/Grayscale+Alpha/RGBA,全 filter 类型解码
- check-login 非 GUI 环境返回 login_method: both,支持二维码或手机验证码二选一
- xhs-auth SKILL.md:GUI 走 login(阻塞),无界面走 get-qrcode + data_url 内嵌显示
... ... @@ -112,20 +112,29 @@ def cmd_check_login(args: argparse.Namespace) -> None:
_output({"logged_in": True}, exit_code=0)
else:
from chrome_launcher import has_display
method = "qrcode" if has_display() else "phone"
hint = (
"请运行 login(二维码)完成登录"
if method == "qrcode"
else "请运行 send-code --phone <手机号>(手机验证码)完成登录"
)
_output({"logged_in": False, "login_method": method, "hint": hint}, exit_code=1)
if has_display():
_output({
"logged_in": False,
"login_method": "qrcode",
"hint": "请运行 login,Chrome 窗口会弹出二维码,扫码后自动完成登录",
}, exit_code=1)
else:
# 无界面环境:二维码(扫对话窗口中的图片)和手机验证码均可
_output({
"logged_in": False,
"login_method": "both",
"hint": (
"方式A: get-qrcode(二维码将显示在对话窗口,扫码即可);"
"方式B: send-code --phone <手机号>(手机验证码)"
),
}, exit_code=1)
finally:
browser.close_page(page)
browser.close()
def cmd_login(args: argparse.Namespace) -> None:
"""获取登录二维码并等待扫码。"""
"""获取登录二维码并阻塞等待扫码(最多 120 秒)。"""
from xhs.login import fetch_qrcode, save_qrcode_to_file, wait_for_login
browser, page = _connect(args)
... ... @@ -133,13 +142,14 @@ def cmd_login(args: argparse.Namespace) -> None:
src, already = fetch_qrcode(page)
if already:
_output({"logged_in": True, "message": "已登录"})
else:
# 保存二维码到临时文件
qrcode_path = save_qrcode_to_file(src)
return
qrcode_path, qrcode_data_url = save_qrcode_to_file(src)
print(
json.dumps(
{
"qrcode_path": qrcode_path,
"qrcode_data_url": qrcode_data_url,
"message": "请扫码登录,二维码已保存到文件",
},
ensure_ascii=False,
... ... @@ -199,6 +209,35 @@ def cmd_phone_login(args: argparse.Namespace) -> None:
browser.close()
def cmd_get_qrcode(args: argparse.Namespace) -> None:
"""获取登录二维码并立即返回(非阻塞)。
从登录弹窗的二维码 img 元素读取图片(data URL 或网络 URL),
保存为本地 PNG 文件后立即退出。Chrome tab 保持打开,QR 会话继续有效。
调用方收到 qrcode_path 后用 Read 工具将图片显示在对话窗口,再轮询 check-login。
"""
from xhs.login import fetch_qrcode, save_qrcode_to_file
browser, page = _connect(args)
src, already = fetch_qrcode(page)
if already:
browser.close_page(page)
browser.close()
_output({"logged_in": True, "message": "已登录"})
return
qrcode_path, qrcode_data_url = save_qrcode_to_file(src)
# 只断开 CDP 连接,不关闭 tab——QR 会话保持,用户可继续扫码
browser.close()
_output({
"qrcode_path": qrcode_path,
"qrcode_data_url": qrcode_data_url,
"message": "二维码已生成,请扫码登录。扫码后运行 check-login 确认登录状态。",
})
def cmd_send_code(args: argparse.Namespace) -> None:
"""分步登录第一步:填写手机号并发送验证码,保持页面不关闭。"""
from chrome_launcher import has_display, restart_chrome
... ... @@ -681,9 +720,13 @@ def build_parser() -> argparse.ArgumentParser:
sub.set_defaults(func=cmd_check_login)
# login
sub = subparsers.add_parser("login", help="登录(扫码)")
sub = subparsers.add_parser("login", help="登录(扫码,阻塞等待)")
sub.set_defaults(func=cmd_login)
# get-qrcode(非阻塞,截图后立即返回)
sub = subparsers.add_parser("get-qrcode", help="获取登录二维码截图并立即返回(非阻塞)")
sub.set_defaults(func=cmd_get_qrcode)
# phone-login(单命令交互式)
sub = subparsers.add_parser("phone-login", help="手机号+验证码登录(交互式,适合本地终端)")
sub.add_argument("--phone", required=True, help="手机号(不含国家码,如 13800138000)")
... ...
... ... @@ -515,6 +515,7 @@ class Page:
)
class Browser:
"""Chrome 浏览器 CDP 控制器。"""
... ...
... ... @@ -5,8 +5,126 @@ from __future__ import annotations
import base64
import logging
import os
import struct
import tempfile
import time
import zlib
_QR_DIR = os.path.join(tempfile.gettempdir(), "xhs")
_QR_FILE = os.path.join(_QR_DIR, "login_qrcode.png")
_QR_BORDER = 16 # 白边宽度(像素)
_PNG_SIG = b"\x89PNG\r\n\x1a\n"
def _add_png_border(data: bytes, padding: int = _QR_BORDER) -> bytes:
"""给 PNG 图片添加白色边框(纯 Python stdlib,不依赖 Pillow)。
支持 8-bit 深度的 Grayscale / RGB / Grayscale+Alpha / RGBA 四种色彩类型。
Indexed-color(color_type=3)暂不处理,原样返回。
Args:
data: 原始 PNG 字节。
padding: 边框宽度(像素)。
Returns:
带白色边框的 PNG 字节。
"""
if not data.startswith(_PNG_SIG):
return data
# ── 解析 chunks ──────────────────────────────────────────────
def _read_chunks(buf: bytes) -> list[tuple[bytes, bytes]]:
result, pos = [], 8
while pos < len(buf):
(length,) = struct.unpack_from(">I", buf, pos)
ctype = buf[pos + 4 : pos + 8]
cdata = buf[pos + 8 : pos + 8 + length]
result.append((ctype, cdata))
pos += 12 + length
return result
def _make_chunk(ctype: bytes, cdata: bytes) -> bytes:
crc = zlib.crc32(ctype + cdata) & 0xFFFFFFFF
return struct.pack(">I", len(cdata)) + ctype + cdata + struct.pack(">I", crc)
chunks = _read_chunks(data)
# ── IHDR ─────────────────────────────────────────────────────
ihdr = next(d for t, d in chunks if t == b"IHDR")
w, h = struct.unpack_from(">II", ihdr)
bit_depth, color_type = ihdr[8], ihdr[9]
if bit_depth != 8 or color_type == 3:
return data # 不支持的格式,原样返回
bpp = {0: 1, 2: 3, 4: 2, 6: 4}[color_type]
white = bytes([255] * bpp)
# ── 解压 IDAT ────────────────────────────────────────────────
raw = zlib.decompress(b"".join(d for t, d in chunks if t == b"IDAT"))
# ── 逐行解码 PNG filter,还原像素数据 ────────────────────────
stride = w * bpp
def _paeth(a: int, b: int, c: int) -> int:
p = a + b - c
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
if pa <= pb and pa <= pc:
return a
return b if pb <= pc else c
pixel_rows: list[bytes] = []
prior = bytearray(stride)
pos = 0
for _ in range(h):
f = raw[pos]
row = bytearray(raw[pos + 1 : pos + 1 + stride])
pos += 1 + stride
if f == 1: # Sub
for i in range(bpp, stride):
row[i] = (row[i] + row[i - bpp]) & 0xFF
elif f == 2: # Up
for i in range(stride):
row[i] = (row[i] + prior[i]) & 0xFF
elif f == 3: # Average
for i in range(stride):
a = row[i - bpp] if i >= bpp else 0
row[i] = (row[i] + (a + prior[i]) // 2) & 0xFF
elif f == 4: # Paeth
for i in range(stride):
a = row[i - bpp] if i >= bpp else 0
b = prior[i]
c = prior[i - bpp] if i >= bpp else 0
row[i] = (row[i] + _paeth(a, b, c)) & 0xFF
pixel_rows.append(bytes(row))
prior = row
# ── 构建带边框的新图像(filter 0 = None,最简单)────────────
new_w, new_h = w + padding * 2, h + padding * 2
white_row = b"\x00" + white * new_w
pad_cols = white * padding
new_raw = bytearray()
for _ in range(padding):
new_raw += white_row
for row in pixel_rows:
new_raw += b"\x00" + pad_cols + row + pad_cols
for _ in range(padding):
new_raw += white_row
new_idat = zlib.compress(bytes(new_raw), 6)
new_ihdr = struct.pack(">II", new_w, new_h) + ihdr[8:]
# ── 重建 PNG ─────────────────────────────────────────────────
out = bytearray(_PNG_SIG)
out += _make_chunk(b"IHDR", new_ihdr)
for ctype, cdata in chunks:
if ctype not in (b"IHDR", b"IDAT", b"IEND"):
out += _make_chunk(ctype, cdata)
out += _make_chunk(b"IDAT", new_idat)
out += _make_chunk(b"IEND", b"")
return bytes(out)
from .cdp import Page
from .errors import RateLimitError
... ... @@ -67,35 +185,41 @@ def fetch_qrcode(page: Page) -> tuple[str, bool]:
return src, False
def save_qrcode_to_file(src: str) -> str:
"""将二维码 data URL 保存为临时 PNG 文件。
def save_qrcode_to_file(src: str) -> tuple[str, str]:
"""将二维码图片保存为临时 PNG 文件,同时返回 data URL。
相当于浏览器"右键 → 另存为图片":从 img.src 取得图片字节后落盘。
Args:
src: 二维码图片的 data URL(data:image/png;base64,...)或普通 URL。
src: 二维码 img 元素的 src——data URL(data:image/...;base64,...)或网络 URL。
Returns:
保存的文件绝对路径。
(file_path, data_url)
- file_path: 保存的 PNG 文件绝对路径
- data_url: data:image/png;base64,... 格式,可直接嵌入 Markdown
"""
prefix = "data:image/png;base64,"
if src.startswith(prefix):
img_data = base64.b64decode(src[len(prefix) :])
elif src.startswith("data:image/"):
# 处理其他 MIME 类型,如 data:image/jpeg;base64,...
if src.startswith("data:image/"):
# data URL:直接解码
_, encoded = src.split(",", 1)
img_data = base64.b64decode(encoded)
elif src.startswith("http://") or src.startswith("https://"):
# 网络 URL:下载(等同浏览器右键另存为)
import requests as _req
resp = _req.get(src, timeout=10)
resp.raise_for_status()
img_data = resp.content
else:
# 不是 data URL,无法保存
raise ValueError(f"不支持的二维码格式,需要 data URL: {src[:50]}...")
raise ValueError(f"不支持的二维码格式: {src[:80]}")
qr_dir = os.path.join(tempfile.gettempdir(), "xhs")
os.makedirs(qr_dir, exist_ok=True)
filepath = os.path.join(qr_dir, "login_qrcode.png")
img_data = _add_png_border(img_data)
with open(filepath, "wb") as f:
os.makedirs(_QR_DIR, exist_ok=True)
with open(_QR_FILE, "wb") as f:
f.write(img_data)
logger.info("二维码已保存: %s", filepath)
return filepath
data_url = "data:image/png;base64," + base64.b64encode(img_data).decode()
logger.info("二维码已保存: %s", _QR_FILE)
return _QR_FILE, data_url
def send_phone_code(page: Page, phone: str) -> bool:
... ...
... ... @@ -33,20 +33,50 @@ python scripts/cli.py check-login
输出解读:
- `"logged_in": true` → 已登录,可执行后续操作。
- `"logged_in": false` + `"login_method": "qrcode"` → 有界面环境,使用二维码登录。
- `"logged_in": false` + `"login_method": "phone"` → 无界面服务器,使用手机验证码登录。
- `"logged_in": false` + `"login_method": "qrcode"` → 有界面环境,直接走二维码登录。
- `"logged_in": false` + `"login_method": "both"` → 无界面服务器,**询问用户选择方式 A(二维码)或方式 B(手机验证码)**
### 第二步:根据 login_method 选择登录方式
#### 方式 A:二维码登录(有界面环境)
#### 方式 A1:二维码登录 — 有界面(GUI)设备
```bash
python scripts/cli.py login
```
1. 命令立即输出 `qrcode_path`(二维码图片路径),然后阻塞等待扫码(最多 120 秒)。
2. 提示用户用小红书 App 或微信扫码。
3. 扫码成功后输出 `"logged_in": true`
- Chrome **有窗口**弹出,二维码直接显示在浏览器窗口中。
- 告知用户用小红书 App 扫屏幕上的二维码。
- 命令阻塞等待(最多 120 秒),扫码成功后输出 `{"logged_in": true}`
- 无需在对话窗口显示图片。
#### 方式 A2:二维码登录 — 无界面(headless)服务器
**第一步** — 获取二维码(非阻塞,立即返回):
```bash
python scripts/cli.py get-qrcode
```
- headless Chrome 加载登录页,从 `img` 元素读取二维码图片(相当于右键另存为)。
- 命令立即退出,Chrome tab 保持打开(QR 会话继续有效)。
- 输出:`{"qrcode_path": "...", "qrcode_data_url": "data:image/png;base64,...", "message": "..."}`
**第二步** — 从 JSON 取 `qrcode_data_url`,在回复中直接写出:
```
![小红书登录二维码]({qrcode_data_url})
```
图片本身已含 16px 白色边框,内嵌在对话窗口,用户用小红书 App 扫对话里的二维码即可。
**第三步** — 轮询登录状态(每 10 秒一次,最多 12 次):
```bash
python scripts/cli.py check-login
```
- `"logged_in": true` 则完成。
- 2 分钟内未登录,提示用户重新执行第一步。
#### 方式 B:手机验证码登录(无界面服务器,分两步)
... ...