fix: 手机验证码登录时精确连回同一个浏览器标签页
之前验证码登录偶尔失败,是因为发送验证码和提交验证码时 连接的可能不是同一个标签页,导致小红书认为验证码无效。 现在发送验证码后会记录当前标签页的 ID,提交验证码时直接 连回这个标签页,不再靠猜。扫二维码等待登录也做了同样处理。
Showing
2 changed files
with
84 additions
and
2 deletions
| @@ -8,9 +8,33 @@ | @@ -8,9 +8,33 @@ | ||
| 8 | from __future__ import annotations | 8 | from __future__ import annotations |
| 9 | 9 | ||
| 10 | import argparse | 10 | import argparse |
| 11 | +import contextlib | ||
| 11 | import json | 12 | import json |
| 12 | import logging | 13 | import logging |
| 14 | +import os | ||
| 13 | import sys | 15 | import sys |
| 16 | +import tempfile | ||
| 17 | + | ||
| 18 | +# 记录登录用 tab 的 target_id,确保 verify-code / wait-login 连回精确的那个 tab | ||
| 19 | +_LOGIN_TAB_FILE = os.path.join(tempfile.gettempdir(), "xhs", "login_tab_id.txt") | ||
| 20 | + | ||
| 21 | + | ||
| 22 | +def _save_login_tab(target_id: str) -> None: | ||
| 23 | + os.makedirs(os.path.dirname(_LOGIN_TAB_FILE), exist_ok=True) | ||
| 24 | + with open(_LOGIN_TAB_FILE, "w") as f: | ||
| 25 | + f.write(target_id) | ||
| 26 | + | ||
| 27 | + | ||
| 28 | +def _load_login_tab() -> str | None: | ||
| 29 | + with contextlib.suppress(FileNotFoundError): | ||
| 30 | + data = open(_LOGIN_TAB_FILE).read().strip() | ||
| 31 | + return data or None | ||
| 32 | + return None | ||
| 33 | + | ||
| 34 | + | ||
| 35 | +def _clear_login_tab() -> None: | ||
| 36 | + with contextlib.suppress(FileNotFoundError): | ||
| 37 | + os.remove(_LOGIN_TAB_FILE) | ||
| 14 | 38 | ||
| 15 | # Windows 控制台默认编码(如 cp1252)不支持中文,强制 UTF-8 | 39 | # Windows 控制台默认编码(如 cp1252)不支持中文,强制 UTF-8 |
| 16 | if sys.stdout and hasattr(sys.stdout, "reconfigure"): | 40 | if sys.stdout and hasattr(sys.stdout, "reconfigure"): |
| @@ -48,6 +72,33 @@ def _connect(args: argparse.Namespace): | @@ -48,6 +72,33 @@ def _connect(args: argparse.Namespace): | ||
| 48 | return browser, page | 72 | return browser, page |
| 49 | 73 | ||
| 50 | 74 | ||
| 75 | +def _connect_saved_tab(args: argparse.Namespace): | ||
| 76 | + """连接到登录流程中记录的精确 tab(via _LOGIN_TAB_FILE),回退到第一个非空白 tab。""" | ||
| 77 | + from chrome_launcher import ensure_chrome, has_display | ||
| 78 | + from xhs.cdp import Browser | ||
| 79 | + | ||
| 80 | + if not ensure_chrome(port=args.port, headless=not has_display()): | ||
| 81 | + _output({"success": False, "error": "无法连接到 Chrome"}, exit_code=2) | ||
| 82 | + | ||
| 83 | + browser = Browser(host=args.host, port=args.port) | ||
| 84 | + browser.connect() | ||
| 85 | + | ||
| 86 | + target_id = _load_login_tab() | ||
| 87 | + if target_id: | ||
| 88 | + page = browser.get_page_by_target_id(target_id) | ||
| 89 | + if page: | ||
| 90 | + return browser, page | ||
| 91 | + logger.warning("保存的 tab (target_id=%s) 已失效,回退到第一个可用 tab", target_id) | ||
| 92 | + | ||
| 93 | + page = browser.get_existing_page() | ||
| 94 | + if not page: | ||
| 95 | + _output( | ||
| 96 | + {"success": False, "error": "未找到已打开的登录页面,请重新执行登录前置步骤"}, | ||
| 97 | + exit_code=2, | ||
| 98 | + ) | ||
| 99 | + return browser, page | ||
| 100 | + | ||
| 101 | + | ||
| 51 | def _connect_existing(args: argparse.Namespace): | 102 | def _connect_existing(args: argparse.Namespace): |
| 52 | """连接到 Chrome 并复用已有页面(用于分步发布的后续步骤)。""" | 103 | """连接到 Chrome 并复用已有页面(用于分步发布的后续步骤)。""" |
| 53 | from chrome_launcher import ensure_chrome, has_display | 104 | from chrome_launcher import ensure_chrome, has_display |
| @@ -234,6 +285,9 @@ def cmd_get_qrcode(args: argparse.Namespace) -> None: | @@ -234,6 +285,9 @@ def cmd_get_qrcode(args: argparse.Namespace) -> None: | ||
| 234 | 285 | ||
| 235 | qrcode_path, qrcode_data_url = save_qrcode_to_file(src) | 286 | qrcode_path, qrcode_data_url = save_qrcode_to_file(src) |
| 236 | 287 | ||
| 288 | + # 记录 tab,供 wait-login 精确reconnect | ||
| 289 | + _save_login_tab(page.target_id) | ||
| 290 | + | ||
| 237 | # 只断开 CDP 连接,不关闭 tab——QR 会话保持,用户可继续扫码 | 291 | # 只断开 CDP 连接,不关闭 tab——QR 会话保持,用户可继续扫码 |
| 238 | browser.close() | 292 | browser.close() |
| 239 | _output({ | 293 | _output({ |
| @@ -250,9 +304,11 @@ def cmd_wait_login(args: argparse.Namespace) -> None: | @@ -250,9 +304,11 @@ def cmd_wait_login(args: argparse.Namespace) -> None: | ||
| 250 | """ | 304 | """ |
| 251 | from xhs.login import wait_for_login | 305 | from xhs.login import wait_for_login |
| 252 | 306 | ||
| 253 | - browser, page = _connect_existing(args) | 307 | + browser, page = _connect_saved_tab(args) |
| 254 | try: | 308 | try: |
| 255 | success = wait_for_login(page, timeout=args.timeout) | 309 | success = wait_for_login(page, timeout=args.timeout) |
| 310 | + if success: | ||
| 311 | + _clear_login_tab() | ||
| 256 | _output( | 312 | _output( |
| 257 | { | 313 | { |
| 258 | "logged_in": success, | 314 | "logged_in": success, |
| @@ -278,6 +334,8 @@ def cmd_send_code(args: argparse.Namespace) -> None: | @@ -278,6 +334,8 @@ def cmd_send_code(args: argparse.Namespace) -> None: | ||
| 278 | _output({"logged_in": True, "message": "已登录,无需重新登录"}) | 334 | _output({"logged_in": True, "message": "已登录,无需重新登录"}) |
| 279 | return | 335 | return |
| 280 | 336 | ||
| 337 | + # 记录 tab,供 verify-code 精确 reconnect | ||
| 338 | + _save_login_tab(page.target_id) | ||
| 281 | _output({ | 339 | _output({ |
| 282 | "status": "code_sent", | 340 | "status": "code_sent", |
| 283 | "message": f"验证码已发送至 {args.phone[:3]}****{args.phone[-4:]},请运行 verify-code --code <验证码>", | 341 | "message": f"验证码已发送至 {args.phone[:3]}****{args.phone[-4:]},请运行 verify-code --code <验证码>", |
| @@ -299,9 +357,11 @@ def cmd_verify_code(args: argparse.Namespace) -> None: | @@ -299,9 +357,11 @@ def cmd_verify_code(args: argparse.Namespace) -> None: | ||
| 299 | """分步登录第二步:在已有页面上填写验证码并提交。""" | 357 | """分步登录第二步:在已有页面上填写验证码并提交。""" |
| 300 | from xhs.login import submit_phone_code | 358 | from xhs.login import submit_phone_code |
| 301 | 359 | ||
| 302 | - browser, page = _connect_existing(args) | 360 | + browser, page = _connect_saved_tab(args) |
| 303 | try: | 361 | try: |
| 304 | success = submit_phone_code(page, args.code) | 362 | success = submit_phone_code(page, args.code) |
| 363 | + if success: | ||
| 364 | + _clear_login_tab() | ||
| 305 | _output( | 365 | _output( |
| 306 | {"logged_in": success, "message": "登录成功" if success else "验证码错误或超时"}, | 366 | {"logged_in": success, "message": "登录成功" if success else "验证码错误或超时"}, |
| 307 | exit_code=0 if success else 2, | 367 | exit_code=0 if success else 2, |
| @@ -590,6 +590,28 @@ class Browser: | @@ -590,6 +590,28 @@ class Browser: | ||
| 590 | 590 | ||
| 591 | return page | 591 | return page |
| 592 | 592 | ||
| 593 | + def get_page_by_target_id(self, target_id: str) -> Page | None: | ||
| 594 | + """通过 target_id 精确连接到指定 tab。""" | ||
| 595 | + if not self._cdp: | ||
| 596 | + self.connect() | ||
| 597 | + assert self._cdp is not None | ||
| 598 | + try: | ||
| 599 | + result = self._cdp.send( | ||
| 600 | + "Target.attachToTarget", | ||
| 601 | + {"targetId": target_id, "flatten": True}, | ||
| 602 | + ) | ||
| 603 | + except Exception: | ||
| 604 | + return None | ||
| 605 | + session_id = result.get("sessionId") | ||
| 606 | + if not session_id: | ||
| 607 | + return None | ||
| 608 | + page = Page(self._cdp, target_id, session_id) | ||
| 609 | + page._send_session("Page.enable") | ||
| 610 | + page._send_session("DOM.enable") | ||
| 611 | + page._send_session("Runtime.enable") | ||
| 612 | + page.inject_stealth() | ||
| 613 | + return page | ||
| 614 | + | ||
| 593 | def get_existing_page(self) -> Page | None: | 615 | def get_existing_page(self) -> Page | None: |
| 594 | """获取已有页面(取第一个非 about:blank 的 page target)。""" | 616 | """获取已有页面(取第一个非 about:blank 的 page target)。""" |
| 595 | if not self._cdp: | 617 | if not self._cdp: |
-
Please register or login to post a comment