Angiin

fix: 二维码获取改用 Canvas 提取,修复遮罩层截图偏移问题

- 选择器从 .qrcode.force-light 改为 img.qrcode-img(实际二维码元素)
- 不再使用 CDP 截图,直接读取 img.src 的 base64 数据,避免渲染时机问题
- 在浏览器端用 Canvas 添加 16px 白边,零依赖
- screenshot_element 改用 DOM.getBoxModel 获取坐标,修复 position:fixed
  遮罩层内元素用 getBoundingClientRect + pageOffset 截到背景的问题
... ... @@ -231,19 +231,15 @@ def cmd_login(args: argparse.Namespace) -> None:
browser, page = _connect(args)
try:
src, already = fetch_qrcode(page)
png_bytes, already = fetch_qrcode(page)
if already:
_output({"logged_in": True, "message": "已登录"})
return
qrcode_path, qrcode_data_url = save_qrcode_to_file(src)
qrcode_path = save_qrcode_to_file(png_bytes)
print(
json.dumps(
{
"qrcode_path": qrcode_path,
"qrcode_data_url": qrcode_data_url,
"message": "请扫码登录,二维码已保存到文件",
},
{"qrcode_path": qrcode_path, "message": "请扫码登录,二维码已保存到文件"},
ensure_ascii=False,
)
)
... ... @@ -313,14 +309,14 @@ def cmd_get_qrcode(args: argparse.Namespace) -> None:
browser, page = _connect(args)
src, already = fetch_qrcode(page)
png_bytes, already = fetch_qrcode(page)
if already:
browser.close_page(page)
browser.close()
_output({"logged_in": True, "message": "已登录"})
return
qrcode_path, qrcode_data_url = save_qrcode_to_file(src)
qrcode_path = save_qrcode_to_file(png_bytes)
# 记录 tab,供 wait-login 精确reconnect
_save_login_tab(page.target_id)
... ... @@ -329,7 +325,6 @@ def cmd_get_qrcode(args: argparse.Namespace) -> None:
browser.close()
_output({
"qrcode_path": qrcode_path,
"qrcode_data_url": qrcode_data_url,
"message": "二维码已生成,请扫码登录。扫码后运行 check-login 确认登录状态。",
})
... ...
... ... @@ -514,6 +514,53 @@ class Page:
"""
)
def screenshot_element(self, selector: str, padding: int = 0) -> bytes:
"""对指定 CSS 选择器的元素截图,返回 PNG 字节。
通过 CDP Page.captureScreenshot 截取元素所在区域,比 Python 层 PNG
解码/重编码快很多,且图片直接来自浏览器渲染结果。
Args:
selector: CSS 选择器。
padding: 在元素四周额外保留的像素数(背景色填充,相当于白边)。
Returns:
PNG 字节;元素不存在时返回 b""。
"""
import base64 as _b64
# 用 DOM.getBoxModel 获取元素坐标,返回的是 page 坐标系(CSS px,相对于文档左上角)。
# getBoundingClientRect 返回的是 viewport 坐标系,对 position:fixed 遮罩层内的元素
# 加 pageXOffset 后依然会截到遮罩背后的内容。DOM.getBoxModel 则始终正确。
try:
doc = self._send_session("DOM.getDocument", {"depth": 0})
root_id = doc["root"]["nodeId"]
query = self._send_session("DOM.querySelector", {"nodeId": root_id, "selector": selector})
node_id = query.get("nodeId", 0)
if not node_id:
return b""
box_model = self._send_session("DOM.getBoxModel", {"nodeId": node_id})
model = box_model["model"]
content = model["content"] # [x1,y1, x2,y2, x3,y3, x4,y4] 顺时针四角
x, y = content[0], content[1]
width, height = float(model["width"]), float(model["height"])
except Exception:
return b""
result = self._send_session(
"Page.captureScreenshot",
{
"format": "png",
"clip": {
"x": max(0.0, x - padding),
"y": max(0.0, y - padding),
"width": width + padding * 2,
"height": height + padding * 2,
"scale": 1.0,
},
},
)
return _b64.b64decode(result.get("data", ""))
class Browser:
... ...
... ... @@ -2,129 +2,15 @@
from __future__ import annotations
import base64
import json
import logging
import os
import struct
import tempfile
import time
import zlib
_QR_DIR = os.path.join(tempfile.gettempdir(), "xhs")
_QR_FILE = os.path.join(_QR_DIR, "login_qrcode.png")
_QR_BORDER = 16 # 白边宽度(像素)
_PNG_SIG = b"\x89PNG\r\n\x1a\n"
def _add_png_border(data: bytes, padding: int = _QR_BORDER) -> bytes:
"""给 PNG 图片添加白色边框(纯 Python stdlib,不依赖 Pillow)。
支持 8-bit 深度的 Grayscale / RGB / Grayscale+Alpha / RGBA 四种色彩类型。
Indexed-color(color_type=3)暂不处理,原样返回。
Args:
data: 原始 PNG 字节。
padding: 边框宽度(像素)。
Returns:
带白色边框的 PNG 字节。
"""
if not data.startswith(_PNG_SIG):
return data
# ── 解析 chunks ──────────────────────────────────────────────
def _read_chunks(buf: bytes) -> list[tuple[bytes, bytes]]:
result, pos = [], 8
while pos < len(buf):
(length,) = struct.unpack_from(">I", buf, pos)
ctype = buf[pos + 4 : pos + 8]
cdata = buf[pos + 8 : pos + 8 + length]
result.append((ctype, cdata))
pos += 12 + length
return result
def _make_chunk(ctype: bytes, cdata: bytes) -> bytes:
crc = zlib.crc32(ctype + cdata) & 0xFFFFFFFF
return struct.pack(">I", len(cdata)) + ctype + cdata + struct.pack(">I", crc)
chunks = _read_chunks(data)
# ── IHDR ─────────────────────────────────────────────────────
ihdr = next(d for t, d in chunks if t == b"IHDR")
w, h = struct.unpack_from(">II", ihdr)
bit_depth, color_type = ihdr[8], ihdr[9]
if bit_depth != 8 or color_type == 3:
return data # 不支持的格式,原样返回
bpp = {0: 1, 2: 3, 4: 2, 6: 4}[color_type]
white = bytes([255] * bpp)
# ── 解压 IDAT ────────────────────────────────────────────────
raw = zlib.decompress(b"".join(d for t, d in chunks if t == b"IDAT"))
# ── 逐行解码 PNG filter,还原像素数据 ────────────────────────
stride = w * bpp
def _paeth(a: int, b: int, c: int) -> int:
p = a + b - c
pa, pb, pc = abs(p - a), abs(p - b), abs(p - c)
if pa <= pb and pa <= pc:
return a
return b if pb <= pc else c
pixel_rows: list[bytes] = []
prior = bytearray(stride)
pos = 0
for _ in range(h):
f = raw[pos]
row = bytearray(raw[pos + 1 : pos + 1 + stride])
pos += 1 + stride
if f == 1: # Sub
for i in range(bpp, stride):
row[i] = (row[i] + row[i - bpp]) & 0xFF
elif f == 2: # Up
for i in range(stride):
row[i] = (row[i] + prior[i]) & 0xFF
elif f == 3: # Average
for i in range(stride):
a = row[i - bpp] if i >= bpp else 0
row[i] = (row[i] + (a + prior[i]) // 2) & 0xFF
elif f == 4: # Paeth
for i in range(stride):
a = row[i - bpp] if i >= bpp else 0
b = prior[i]
c = prior[i - bpp] if i >= bpp else 0
row[i] = (row[i] + _paeth(a, b, c)) & 0xFF
pixel_rows.append(bytes(row))
prior = row
# ── 构建带边框的新图像(filter 0 = None,最简单)────────────
new_w, new_h = w + padding * 2, h + padding * 2
white_row = b"\x00" + white * new_w
pad_cols = white * padding
new_raw = bytearray()
for _ in range(padding):
new_raw += white_row
for row in pixel_rows:
new_raw += b"\x00" + pad_cols + row + pad_cols
for _ in range(padding):
new_raw += white_row
new_idat = zlib.compress(bytes(new_raw), 6)
new_ihdr = struct.pack(">II", new_w, new_h) + ihdr[8:]
# ── 重建 PNG ─────────────────────────────────────────────────
out = bytearray(_PNG_SIG)
out += _make_chunk(b"IHDR", new_ihdr)
for ctype, cdata in chunks:
if ctype not in (b"IHDR", b"IDAT", b"IEND"):
out += _make_chunk(ctype, cdata)
out += _make_chunk(b"IDAT", new_idat)
out += _make_chunk(b"IEND", b"")
return bytes(out)
_QR_BORDER = 16 # 截图时在元素四周留白的像素数
from .cdp import Page
from .errors import RateLimitError
... ... @@ -148,6 +34,19 @@ from .urls import EXPLORE_URL
logger = logging.getLogger(__name__)
def _wait_for_auth_ui(page: Page, timeout: float = 8.0) -> None:
"""等待认证 UI 出现,替代固定延迟。
轮询直到登录状态指示器或登录容器出现为止,避免无谓等待。
超时后静默返回,由调用方自行处理元素不存在的情况。
"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if page.has_element(LOGIN_STATUS) or page.has_element(LOGIN_CONTAINER):
return
time.sleep(0.2)
def check_login_status(page: Page) -> bool:
"""检查登录状态。
... ... @@ -156,70 +55,67 @@ def check_login_status(page: Page) -> bool:
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(800, 1500)
_wait_for_auth_ui(page)
return page.has_element(LOGIN_STATUS)
def fetch_qrcode(page: Page) -> tuple[str, bool]:
"""获取登录二维码。
def fetch_qrcode(page: Page) -> tuple[bytes, bool]:
"""截取登录二维码图片(CDP 元素截图)。
Returns:
(qrcode_src, already_logged_in)
- 如果已登录,返回 ("", True)
- 如果未登录,返回 (qrcode_base64_or_url, False)
(png_bytes, already_logged_in)
- 如果已登录,返回 (b"", True)
- 如果未登录,返回 (png_bytes, False)
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
sleep_random(1500, 2500)
_wait_for_auth_ui(page)
# 检查是否已登录
if page.has_element(LOGIN_STATUS):
return "", True
# 获取二维码图片 src
src = page.get_element_attribute(QRCODE_IMG, "src")
if not src:
raise RuntimeError("二维码图片 src 为空")
return src, False
def save_qrcode_to_file(src: str) -> tuple[str, str]:
"""将二维码图片保存为临时 PNG 文件,同时返回 data URL。
相当于浏览器"右键 → 另存为图片":从 img.src 取得图片字节后落盘。
return b"", True
# 等待 img.qrcode-img 出现,用浏览器 Canvas 加白边后导出 PNG base64
page.wait_for_element(QRCODE_IMG, timeout=10.0)
b64 = page.evaluate(
f"""
(() => {{
const img = document.querySelector({json.dumps(QRCODE_IMG)});
if (!img) return null;
const p = {_QR_BORDER};
const c = document.createElement('canvas');
c.width = img.naturalWidth + p * 2;
c.height = img.naturalHeight + p * 2;
const ctx = c.getContext('2d');
ctx.fillStyle = '#ffffff';
ctx.fillRect(0, 0, c.width, c.height);
ctx.drawImage(img, p, p);
return c.toDataURL('image/png').split(',')[1];
}})()
"""
)
if not b64:
raise RuntimeError("二维码 Canvas 导出失败")
import base64
png_bytes = base64.b64decode(b64)
return png_bytes, False
def save_qrcode_to_file(png_bytes: bytes) -> str:
"""将二维码 PNG 字节保存到临时文件,返回文件路径。
Args:
src: 二维码 img 元素的 src——data URL(data:image/...;base64,...)或网络 URL
png_bytes: CDP 截图返回的 PNG 字节
Returns:
(file_path, data_url)
- file_path: 保存的 PNG 文件绝对路径
- data_url: data:image/png;base64,... 格式,可直接嵌入 Markdown
file_path: 保存的 PNG 文件绝对路径。
"""
if src.startswith("data:image/"):
# data URL:直接解码
_, encoded = src.split(",", 1)
img_data = base64.b64decode(encoded)
elif src.startswith("http://") or src.startswith("https://"):
# 网络 URL:下载(等同浏览器右键另存为)
import requests as _req
resp = _req.get(src, timeout=10)
resp.raise_for_status()
img_data = resp.content
else:
raise ValueError(f"不支持的二维码格式: {src[:80]}")
img_data = _add_png_border(img_data)
os.makedirs(_QR_DIR, exist_ok=True)
with open(_QR_FILE, "wb") as f:
f.write(img_data)
data_url = "data:image/png;base64," + base64.b64encode(img_data).decode()
f.write(png_bytes)
logger.info("二维码已保存: %s", _QR_FILE)
return _QR_FILE, data_url
return _QR_FILE
def send_phone_code(page: Page, phone: str) -> bool:
... ...
... ... @@ -2,7 +2,7 @@
# ========== 登录 ==========
LOGIN_STATUS = ".main-container .user .link-wrapper .channel"
QRCODE_IMG = ".login-container .qrcode-img"
QRCODE_IMG = ".qrcode-img"
# ========== 手机号登录 ==========
LOGIN_CONTAINER = ".login-container"
... ...
... ... @@ -89,7 +89,7 @@ python scripts/cli.py get-qrcode
![小红书登录二维码]({qrcode_data_url})
```
图片含 16px 白色边框,内嵌在对话窗口,用户用小红书 App 扫对话里的二维码。
图片内嵌在对话窗口,用户用小红书 App 扫对话里的二维码。
**第三步** — 等待登录完成(**单次调用,无需轮询**):
... ...