Angiin
Committed by GitHub

Merge pull request #3 from Angiin/feat/anti-detection

feat: 增强反检测能力 — JS 伪装、CDP 真实交互、随机延迟
... ... @@ -2,14 +2,17 @@
from __future__ import annotations
import contextlib
import json
import logging
import os
import platform
import shutil
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
from xhs.stealth import STEALTH_ARGS
... ... @@ -18,6 +21,9 @@ logger = logging.getLogger(__name__)
# 默认远程调试端口
DEFAULT_PORT = 9222
# 全局进程追踪
_chrome_process: subprocess.Popen | None = None
# 各平台 Chrome 默认路径
_CHROME_PATHS: dict[str, list[str]] = {
"Darwin": [
... ... @@ -38,6 +44,22 @@ _CHROME_PATHS: dict[str, list[str]] = {
}
def _get_default_data_dir() -> str:
"""返回默认 Chrome Profile 目录路径。"""
return str(Path.home() / ".xhs" / "chrome-profile")
def is_port_open(port: int, host: str = "127.0.0.1") -> bool:
"""TCP socket 级端口检测(秒级响应)。"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.connect((host, port))
return True
except (ConnectionRefusedError, TimeoutError, OSError):
return False
def find_chrome() -> str | None:
"""查找 Chrome 可执行文件路径。"""
# 环境变量优先
... ... @@ -45,13 +67,28 @@ def find_chrome() -> str | None:
if env_path and os.path.isfile(env_path):
return env_path
# which/where 查找
chrome = shutil.which("google-chrome") or shutil.which("chromium")
# which/where 查找(含 Windows chrome.exe)
chrome = (
shutil.which("google-chrome")
or shutil.which("chromium")
or shutil.which("chrome")
or shutil.which("chrome.exe")
)
if chrome:
return chrome
# 平台默认路径
system = platform.system()
# Windows: 额外检查环境变量路径
if system == "Windows":
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
base = os.environ.get(env_var, "")
if base:
candidate = os.path.join(base, "Google", "Chrome", "Application", "chrome.exe")
if os.path.isfile(candidate):
return candidate
for path in _CHROME_PATHS.get(system, []):
if os.path.isfile(path):
return path
... ... @@ -59,55 +96,70 @@ def find_chrome() -> str | None:
return None
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行(TCP 级检测)。"""
return is_port_open(port)
def launch_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen:
) -> subprocess.Popen | None:
"""启动 Chrome 进程(带远程调试端口)。
Args:
port: 远程调试端口。
headless: 是否无头模式。
user_data_dir: 用户数据目录(Profile 隔离)。
user_data_dir: 用户数据目录(Profile 隔离),默认 ~/.xhs/chrome-profile
chrome_bin: Chrome 可执行文件路径。
Returns:
Chrome 子进程。
Chrome 子进程,若已在运行则返回 None
Raises:
FileNotFoundError: 未找到 Chrome。
"""
global _chrome_process
# 已在运行则跳过
if is_port_open(port):
logger.info("Chrome 已在运行 (port=%d),跳过启动", port)
return None
if not chrome_bin:
chrome_bin = find_chrome()
if not chrome_bin:
raise FileNotFoundError("未找到 Chrome,请设置 CHROME_BIN 环境变量或安装 Chrome")
# 默认 user-data-dir
if not user_data_dir:
user_data_dir = _get_default_data_dir()
args = [
chrome_bin,
f"--remote-debugging-port={port}",
f"--user-data-dir={user_data_dir}",
*STEALTH_ARGS,
]
if headless:
args.append("--headless=new")
if user_data_dir:
args.append(f"--user-data-dir={user_data_dir}")
# 代理
proxy = os.getenv("XHS_PROXY")
if proxy:
args.append(f"--proxy-server={proxy}")
logger.info("使用代理: %s", _mask_proxy(proxy))
logger.info("启动 Chrome: port=%d, headless=%s", port, headless)
logger.info("启动 Chrome: port=%d, headless=%s, profile=%s", port, headless, user_data_dir)
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_chrome_process = process
# 等待 Chrome 准备就绪
_wait_for_chrome(port)
... ... @@ -120,7 +172,7 @@ def close_chrome(process: subprocess.Popen) -> None:
return
try:
process.send_signal(signal.SIGTERM)
process.terminate()
process.wait(timeout=5)
except (subprocess.TimeoutExpired, OSError):
process.kill()
... ... @@ -129,29 +181,20 @@ def close_chrome(process: subprocess.Popen) -> None:
logger.info("Chrome 进程已关闭")
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行。"""
import requests
try:
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
return resp.status_code == 200
except (requests.ConnectionError, requests.Timeout):
return False
def kill_chrome(port: int = DEFAULT_PORT) -> None:
"""关闭指定端口的 Chrome 实例。
尝试通过 CDP Browser.close 命令关闭,失败则使用进程信号
策略: CDP Browser.close → terminate 追踪进程 → 端口查找终止进程
Args:
port: Chrome 调试端口。
"""
import requests
global _chrome_process
# 策略1: 通过 CDP 关闭
try:
import requests
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if resp.status_code == 200:
ws_url = resp.json().get("webSocketDebuggerUrl")
... ... @@ -163,32 +206,70 @@ def kill_chrome(port: int = DEFAULT_PORT) -> None:
ws.close()
logger.info("通过 CDP Browser.close 关闭 Chrome (port=%d)", port)
time.sleep(1)
return
except Exception:
pass
# 策略2: 通过 lsof 查找并 kill 进程
try:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
import contextlib
pids = result.stdout.strip().split("\n")
# 策略2: terminate 追踪的子进程
if _chrome_process and _chrome_process.poll() is None:
try:
_chrome_process.terminate()
_chrome_process.wait(timeout=5)
logger.info("通过 terminate 关闭追踪的 Chrome 进程")
except Exception:
with contextlib.suppress(Exception):
_chrome_process.kill()
_chrome_process = None
# 策略3: 通过端口查找并终止进程(跨平台)
if is_port_open(port):
pids = _find_pids_by_port(port)
if pids:
for pid in pids:
with contextlib.suppress(OSError, ValueError):
os.kill(int(pid), signal.SIGTERM)
logger.info("通过 SIGTERM 关闭 Chrome 进程 (port=%d)", port)
time.sleep(1)
_kill_pid(pid)
logger.info("通过进程终止关闭 Chrome (port=%d)", port)
# 等待端口释放(最多 5s)
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if not is_port_open(port):
return
except Exception:
pass
time.sleep(0.5)
if is_port_open(port):
logger.warning("端口 %d 仍被占用,kill 可能未完全生效", port)
logger.warning("未能关闭 Chrome (port=%d)", port)
def ensure_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> bool:
"""确保 Chrome 在指定端口可用(一站式入口)。
如果 Chrome 已在运行,直接返回 True。
否则尝试启动 Chrome 并等待端口就绪。
Args:
port: 远程调试端口。
headless: 是否无头模式(仅新启动时生效)。
user_data_dir: 用户数据目录。
chrome_bin: Chrome 可执行文件路径。
Returns:
True 表示 Chrome 可用,False 表示启动失败。
"""
if is_port_open(port):
return True
try:
launch_chrome(
port=port, headless=headless, user_data_dir=user_data_dir, chrome_bin=chrome_bin,
)
return is_port_open(port)
except FileNotFoundError as e:
logger.error("启动 Chrome 失败: %s", e)
return False
def restart_chrome(
... ... @@ -196,7 +277,7 @@ def restart_chrome(
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen:
) -> subprocess.Popen | None:
"""重启 Chrome:关闭当前实例后以新模式重新启动。
Args:
... ... @@ -206,7 +287,7 @@ def restart_chrome(
chrome_bin: Chrome 可执行文件路径。
Returns:
新的 Chrome 子进程。
新的 Chrome 子进程,或 None
"""
logger.info("重启 Chrome: port=%d, headless=%s", port, headless)
kill_chrome(port)
... ... @@ -220,16 +301,70 @@ def restart_chrome(
def _wait_for_chrome(port: int, timeout: float = 15.0) -> None:
"""等待 Chrome 调试端口就绪。"""
"""等待 Chrome 调试端口就绪(TCP 级检测)。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if is_chrome_running(port):
if is_port_open(port):
logger.info("Chrome 已就绪 (port=%d)", port)
return
time.sleep(0.5)
logger.warning("等待 Chrome 就绪超时 (port=%d)", port)
def _find_pids_by_port(port: int) -> list[int]:
"""查找占用指定端口的进程 PID(跨平台)。"""
try:
if sys.platform == "win32":
result = subprocess.run(
["netstat", "-ano", "-p", "TCP"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return []
pids: list[int] = []
for line in result.stdout.splitlines():
if f":{port}" in line and "LISTENING" in line:
parts = line.split()
with contextlib.suppress(ValueError, IndexError):
pids.append(int(parts[-1]))
return list(set(pids))
else:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0 or not result.stdout.strip():
return []
pids = []
for p in result.stdout.strip().split("\n"):
with contextlib.suppress(ValueError):
pids.append(int(p))
return pids
except Exception:
return []
def _kill_pid(pid: int) -> None:
"""终止指定 PID 的进程(跨平台)。"""
try:
if sys.platform == "win32":
subprocess.run(
["taskkill", "/PID", str(pid), "/F"],
capture_output=True,
timeout=5,
)
else:
import signal
os.kill(pid, signal.SIGTERM)
except Exception:
logger.debug("终止进程 %d 失败", pid)
def _mask_proxy(proxy_url: str) -> str:
"""隐藏代理 URL 中的敏感信息。"""
from urllib.parse import urlparse
... ...
... ... @@ -12,6 +12,12 @@ import json
import logging
import sys
# Windows 控制台默认编码(如 cp1252)不支持中文,强制 UTF-8
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr and hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
... ... @@ -27,14 +33,43 @@ def _output(data: dict, exit_code: int = 0) -> None:
def _connect(args: argparse.Namespace):
"""连接到 Chrome 并返回 (browser, page)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port):
_output(
{"success": False, "error": "无法启动 Chrome,请检查 Chrome 是否已安装"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.new_page()
return browser, page
def _connect_existing(args: argparse.Namespace):
"""连接到 Chrome 并复用已有页面(用于分步发布的后续步骤)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port):
_output(
{"success": False, "error": "无法连接到 Chrome"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.get_existing_page()
if not page:
_output(
{"success": False, "error": "未找到已打开的页面,请先执行前置步骤"},
exit_code=2,
)
return browser, page
def _headless_fallback(port: int) -> None:
"""Headless 模式未登录时自动降级到有窗口模式。"""
from chrome_launcher import restart_chrome
... ... @@ -332,7 +367,7 @@ def cmd_fill_publish(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,让用户在浏览器中预览
browser.close()
... ... @@ -368,15 +403,15 @@ def cmd_fill_publish_video(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,让用户在浏览器中预览
browser.close()
def cmd_click_publish(args: argparse.Namespace) -> None:
"""点击发布按钮(在用户确认后调用)。"""
"""点击发布按钮(在用户确认后调用)。复用已有的发布页 tab。"""
from xhs.publish import click_publish_button
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
click_publish_button(page)
_output({"success": True, "status": "发布完成"})
... ... @@ -410,15 +445,15 @@ def cmd_long_article(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,后续 select-template / next-step 需要复用
browser.close()
def cmd_select_template(args: argparse.Namespace) -> None:
"""选择排版模板。"""
"""选择排版模板。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import select_template
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
selected = select_template(page, args.name)
if selected:
... ... @@ -429,23 +464,23 @@ def cmd_select_template(args: argparse.Namespace) -> None:
exit_code=2,
)
finally:
browser.close_page(page)
# 不关闭页面,后续 next-step 需要复用
browser.close()
def cmd_next_step(args: argparse.Namespace) -> None:
"""点击下一步 + 填写发布页描述。"""
"""点击下一步 + 填写发布页描述。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import click_next_and_fill_description
with open(args.content_file, encoding="utf-8") as f:
description = f.read().strip()
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
click_next_and_fill_description(page, description)
_output({"success": True, "status": "已进入发布页,等待确认发布"})
finally:
browser.close_page(page)
# 不关闭页面,等待 click-publish
browser.close()
... ...
... ... @@ -71,7 +71,7 @@ class RunLock:
# 检查进程是否存在
os.kill(pid, 0)
return False
except (FileNotFoundError, ValueError, ProcessLookupError, PermissionError):
except (ValueError, OSError):
return True
def _force_release(self) -> None:
... ...
... ... @@ -7,6 +7,7 @@ from __future__ import annotations
import json
import logging
import random
import time
from typing import Any
... ... @@ -14,7 +15,7 @@ import requests
import websockets.sync.client as ws_client
from .errors import CDPError, ElementNotFoundError
from .stealth import STEALTH_JS
from .stealth import REALISTIC_UA, STEALTH_JS
logger = logging.getLogger(__name__)
... ... @@ -211,15 +212,25 @@ class Page:
raise ElementNotFoundError(selector)
def click_element(self, selector: str) -> None:
"""点击指定选择器的元素。"""
self.evaluate(
"""点击指定选择器的元素(通过 CDP Input 事件,isTrusted=true)。"""
box = self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.click();
if (!el) return null;
el.scrollIntoView({{block: 'center'}});
const rect = el.getBoundingClientRect();
return {{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}};
}})()
"""
)
if not box:
return
x = box["x"] + random.uniform(-3, 3)
y = box["y"] + random.uniform(-3, 3)
self.mouse_move(x, y)
time.sleep(random.uniform(0.03, 0.08))
self.mouse_click(x, y)
def input_text(self, selector: str, text: str) -> None:
"""向指定选择器的元素输入文本。"""
... ... @@ -237,18 +248,59 @@ class Page:
)
def input_content_editable(self, selector: str, text: str) -> None:
"""向 contentEditable 元素输入文本(如 div.ql-editor)。"""
"""向 contentEditable 元素输入文本(CDP 逐字输入,模拟真实打字)。"""
# 1. focus 元素
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.textContent = {json.dumps(text)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
if (el) el.focus();
}})()
"""
)
time.sleep(0.1)
# 2. 全选清空(Ctrl+A + Backspace)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyDown",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyUp",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
time.sleep(0.1)
# 3. 逐字输入(随机 30-80ms 间隔,换行符转为 Enter 键)
for char in text:
if char == "\n":
self.press_key("Enter")
else:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "text": char},
)
time.sleep(random.uniform(0.03, 0.08))
def get_element_text(self, selector: str) -> str | None:
"""获取元素文本内容。"""
... ... @@ -500,14 +552,31 @@ class Browser:
page = Page(self._cdp, target_id, session_id)
# 注入反检测(必须在 enable domains 之前)
page.inject_stealth()
# UA 覆盖
page._send_session(
"Emulation.setUserAgentOverride",
{"userAgent": REALISTIC_UA},
)
# 随机 viewport(模拟真实屏幕尺寸)
page._send_session(
"Emulation.setDeviceMetricsOverride",
{
"width": random.randint(1366, 1920),
"height": random.randint(768, 1080),
"deviceScaleFactor": 1,
"mobile": False,
},
)
# 启用必要的 domain
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
# 注入反检测
page.inject_stealth()
return page
def get_existing_page(self) -> Page | None:
... ...
... ... @@ -3,10 +3,10 @@
from __future__ import annotations
import logging
import time
from .cdp import Page
from .feed_detail import _check_end_container, _check_page_accessible, _get_comment_count
from .human import sleep_random
from .selectors import (
COMMENT_INPUT_FIELD,
COMMENT_INPUT_TRIGGER,
... ... @@ -37,7 +37,7 @@ def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> Non
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
sleep_random(800, 1500)
_check_page_accessible(page)
... ... @@ -46,27 +46,16 @@ def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> Non
raise RuntimeError("未找到评论输入框,该帖子可能不支持评论或网页端不可访问")
page.click_element(COMMENT_INPUT_TRIGGER)
time.sleep(0.5)
sleep_random(400, 800)
# 输入评论内容
# 输入评论内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(1)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(1)
sleep_random(800, 1500)
logger.info("评论发送成功: feed=%s", feed_id)
... ... @@ -103,42 +92,31 @@ def reply_comment(
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
sleep_random(800, 1500)
_check_page_accessible(page)
time.sleep(2)
sleep_random(1500, 2500)
# 查找目标评论
comment_found = _find_and_scroll_to_comment(page, comment_id, user_id)
if not comment_found:
raise RuntimeError(f"未找到评论 (commentID: {comment_id}, userID: {user_id})")
time.sleep(1)
sleep_random(800, 1500)
# 点击回复按钮
reply_selector = f"#comment-{comment_id} {REPLY_BUTTON}" if comment_id else REPLY_BUTTON
page.click_element(reply_selector)
time.sleep(1)
sleep_random(800, 1500)
# 输入回复内容
# 输入回复内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(0.5)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(2)
sleep_random(1500, 2500)
logger.info("回复评论成功")
... ... @@ -154,7 +132,7 @@ def _find_and_scroll_to_comment(
# 先滚动到评论区
page.scroll_element_into_view(".comments-container")
time.sleep(1)
sleep_random(800, 1500)
last_count = 0
stagnant = 0
... ... @@ -179,11 +157,11 @@ def _find_and_scroll_to_comment(
# 滚动到最后一条评论
if current_count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, current_count - 1)
time.sleep(0.3)
sleep_random(200, 500)
# 继续滚动
page.evaluate("window.scrollBy(0, window.innerHeight * 0.8)")
time.sleep(0.5)
sleep_random(400, 800)
# 通过 commentID 查找
if comment_id:
... ... @@ -215,7 +193,7 @@ def _find_and_scroll_to_comment(
logger.info("通过 userID 找到评论 (尝试 %d 次)", attempt + 1)
return True
time.sleep(0.8)
sleep_random(600, 1200)
return False
... ...
... ... @@ -58,6 +58,15 @@ _INACCESSIBLE_KEYWORDS = [
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
"Isn't Available",
"isn't available",
]
# 扫码验证关键词(触发反爬机制)
_SCAN_QRCODE_KEYWORDS = [
"扫码查看",
"打开小红书App扫码",
"请使用小红书App扫码",
]
_REPLY_COUNT_RE = re.compile(r"展开\s*(\d+)\s*条回复")
... ... @@ -110,10 +119,10 @@ def get_feed_detail(
else:
raise RuntimeError("页面导航失败")
sleep_random(1000, 1000)
sleep_random(800, 1500)
# 检查页面可访问性
_check_page_accessible(page)
# 检查页面可访问性(扫码验证时自动等待重试)
_check_page_accessible(page, url)
# 加载全部评论
if load_all_comments:
... ... @@ -128,8 +137,11 @@ def get_feed_detail(
# ========== 页面检查 ==========
def _check_page_accessible(page: Page) -> None:
"""检查页面是否可访问。"""
def _check_page_accessible(page: Page, url: str = "") -> None:
"""检查页面是否可访问。
扫码验证场景:等待 10 秒后自动重新访问,验证消失则继续,否则报错。
"""
time.sleep(0.5)
text = page.get_element_text(ACCESS_ERROR_WRAPPER)
... ... @@ -137,6 +149,28 @@ def _check_page_accessible(page: Page) -> None:
return
text = text.strip()
# 检测扫码验证(反爬机制触发)→ 等待后重试
if _is_scan_qrcode_verification(text) and url:
logger.warning("触发小红书扫码验证,等待 10 秒后重新访问...")
time.sleep(10)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
retry_text = page.get_element_text(ACCESS_ERROR_WRAPPER)
if retry_text and _is_scan_qrcode_verification(retry_text.strip()):
raise PageNotAccessibleError(
"触发了小红书验证,需要在浏览器中扫码完成验证后重试。"
"这通常是小红书的反爬机制,请稍后再试或在 Chrome 中手动打开该笔记完成验证"
)
if not retry_text or not retry_text.strip():
logger.info("验证已消失,继续加载笔记")
return
# 重试后仍有其他错误,继续走下面的关键词检测
text = retry_text.strip()
for kw in _INACCESSIBLE_KEYWORDS:
if kw in text:
raise PageNotAccessibleError(kw)
... ... @@ -145,6 +179,11 @@ def _check_page_accessible(page: Page) -> None:
raise PageNotAccessibleError(text)
def _is_scan_qrcode_verification(text: str) -> bool:
"""判断页面文本是否为扫码验证。"""
return any(kw in text for kw in _SCAN_QRCODE_KEYWORDS)
# ========== 数据提取 ==========
... ...
... ... @@ -32,6 +32,11 @@ def sleep_random(min_ms: int, max_ms: int) -> None:
time.sleep(delay)
def navigation_delay() -> None:
"""页面导航后的随机等待,模拟人类阅读。"""
sleep_random(1000, 2500)
def get_scroll_interval(speed: str) -> float:
"""根据速度获取滚动间隔(秒)。"""
if speed == "slow":
... ...
... ... @@ -9,6 +9,7 @@ import tempfile
import time
from .cdp import Page
from .human import sleep_random
from .selectors import LOGIN_STATUS, QRCODE_IMG
from .urls import EXPLORE_URL
... ... @@ -23,7 +24,7 @@ def check_login_status(page: Page) -> bool:
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(1)
sleep_random(800, 1500)
return page.has_element(LOGIN_STATUS)
... ... @@ -38,7 +39,7 @@ def fetch_qrcode(page: Page) -> tuple[str, bool]:
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(2)
sleep_random(1500, 2500)
# 检查是否已登录
if page.has_element(LOGIN_STATUS):
... ...
... ... @@ -5,6 +5,7 @@ from __future__ import annotations
import json
import logging
import random
import re
import time
from .cdp import Page
... ... @@ -127,27 +128,31 @@ def _navigate_to_publish_page(page: Page) -> None:
"""导航到发布页面。"""
page.navigate(PUBLISH_URL)
page.wait_for_load(timeout=300)
time.sleep(2)
time.sleep(3)
page.wait_dom_stable()
time.sleep(1)
time.sleep(2)
def _click_publish_tab(page: Page, tab_name: str) -> None:
"""点击发布页 TAB(上传图文/上传视频)。"""
page.wait_for_element(UPLOAD_CONTENT, timeout=15)
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
# 查找匹配的 TAB
# 查找匹配的 TAB(支持多种结构)
found = page.evaluate(
f"""
(() => {{
const tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
// 策略1: 查找 div.creator-tab(过滤隐藏元素)
let tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
for (const tab of tabs) {{
if (tab.textContent.trim() === {json.dumps(tab_name)}) {{
// 检查是否被遮挡
const titleSpan = tab.querySelector('span.title');
const tabText = titleSpan ? titleSpan.textContent.trim() : tab.textContent.trim();
if (tabText === {json.dumps(tab_name)}) {{
const rect = tab.getBoundingClientRect();
const style = window.getComputedStyle(tab);
// 跳过隐藏或被移出视口的元素
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const target = document.elementFromPoint(x, y);
... ... @@ -158,6 +163,21 @@ def _click_publish_tab(page: Page, tab_name: str) -> None:
return 'blocked';
}}
}}
// 策略2: 查找任意包含目标文本的元素
const allElements = document.querySelectorAll('*');
for (const el of allElements) {{
if (el.children.length === 0 && el.textContent.trim() === {json.dumps(tab_name)}) {{
const rect = el.getBoundingClientRect();
const style = window.getComputedStyle(el);
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
el.click();
return 'clicked';
}}
}}
return 'not_found';
}})()
"""
... ... @@ -172,6 +192,19 @@ def _click_publish_tab(page: Page, tab_name: str) -> None:
time.sleep(0.2)
# 调试:输出页面信息
debug_info = page.evaluate("""
(() => {
const creatorTabs = document.querySelectorAll('div.creator-tab');
const tabTexts = Array.from(creatorTabs).map(t => ({
text: t.textContent.trim(),
html: t.outerHTML.substring(0, 200)
}));
const url = window.location.href;
return JSON.stringify({url, tabCount: creatorTabs.length, tabs: tabTexts});
})()
""")
logger.error("调试信息: %s", debug_info)
raise PublishError(f"没有找到发布 TAB - {tab_name}")
... ... @@ -223,6 +256,34 @@ def _wait_for_upload_complete(page: Page, expected_count: int) -> None:
# ========== 表单提交 ==========
def _extract_hashtags_from_content(content: str, tags: list[str]) -> tuple[str, list[str]]:
"""从正文末尾提取 hashtag 行,合并到 tags 列表。
Returns:
(cleaned_content, merged_tags)
"""
lines = content.rstrip().split("\n")
# 检查最后一行是否全是 #tag 格式
if lines:
last_line = lines[-1].strip()
hashtag_pattern = re.compile(r"^(#\S+\s*)+$")
if hashtag_pattern.match(last_line):
# 提取 hashtag
extracted = re.findall(r"#(\S+)", last_line)
# 合并到 tags(去重)
existing = {t.lstrip("#") for t in tags}
merged = list(tags)
for t in extracted:
if t not in existing:
merged.append(t)
existing.add(t)
# 去掉最后一行
cleaned = "\n".join(lines[:-1]).rstrip()
logger.info("从正文末尾提取 %d 个标签,合并后共 %d 个", len(extracted), len(merged))
return cleaned, merged
return content, list(tags)
def _fill_publish_form(
page: Page,
title: str,
... ... @@ -233,6 +294,9 @@ def _fill_publish_form(
visibility: str,
) -> None:
"""填写表单(不点击发布)。"""
# 从正文末尾提取 hashtag 并合并到 tags
content, tags = _extract_hashtags_from_content(content, tags)
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(0.5)
... ... @@ -334,6 +398,10 @@ def _input_tags(page: Page, content_selector: str, tags: list[str]) -> None:
"""输入标签。"""
time.sleep(1)
# 先点击正文编辑器,确保焦点在正文而非标题
page.click_element(content_selector)
time.sleep(0.3)
# 移动光标到正文末尾(20次 ArrowDown)
for _ in range(20):
page.press_key("ArrowDown")
... ... @@ -353,27 +421,32 @@ def _input_single_tag(page: Page, content_selector: str, tag: str) -> None:
"""输入单个标签。"""
# 输入 #
page.type_text("#", delay_ms=0)
time.sleep(0.2)
time.sleep(0.3)
# 逐字输入标签
# 逐字输入标签(随机间隔模拟真实输入)
for char in tag:
page.type_text(char, delay_ms=50)
page.type_text(char, delay_ms=0)
time.sleep(random.uniform(0.05, 0.12))
time.sleep(1)
# 等待标签联想出现(最多 3 秒)
deadline = time.monotonic() + 3.0
clicked = False
while time.monotonic() < deadline:
time.sleep(0.5)
if page.has_element(TAG_TOPIC_CONTAINER):
item_selector = f"{TAG_TOPIC_CONTAINER} {TAG_FIRST_ITEM}"
if page.has_element(item_selector):
page.click_element(item_selector)
logger.info("点击标签联想: %s", tag)
clicked = True
break
# 尝试点击标签联想
if page.has_element(TAG_TOPIC_CONTAINER):
item_selector = f"{TAG_TOPIC_CONTAINER} {TAG_FIRST_ITEM}"
if page.has_element(item_selector):
page.click_element(item_selector)
logger.info("点击标签联想: %s", tag)
time.sleep(0.5)
return
if not clicked:
# 没有联想,直接空格
logger.warning("未找到标签联想,直接输入空格: %s", tag)
page.type_text(" ", delay_ms=0)
# 没有联想,直接空格
logger.warning("未找到标签联想,直接输入空格: %s", tag)
page.type_text(" ", delay_ms=0)
time.sleep(0.5)
time.sleep(0.8)
# ========== 定时发布 ==========
... ...
... ... @@ -5,6 +5,7 @@ from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from .cdp import Page
from .errors import PublishError
... ... @@ -217,14 +218,14 @@ def _fill_long_content(page: Page, content: str) -> None:
def _insert_images_to_editor(page: Page, image_paths: list[str]) -> None:
"""将图片插入到编辑器中。"""
for img_path in image_paths:
normalized = img_path.replace("\\", "/")
file_uri = Path(img_path).resolve().as_uri()
page.evaluate(
f"""
(() => {{
const editor = document.querySelector({json.dumps(CONTENT_EDITOR)});
if (!editor) return false;
const img = document.createElement('img');
img.src = 'file:///' + {json.dumps(normalized)};
img.src = {json.dumps(file_uri)};
editor.appendChild(img);
editor.dispatchEvent(new Event('input', {{ bubbles: true }}));
return true;
... ...
... ... @@ -8,6 +8,7 @@ import time
from .cdp import Page
from .errors import NoFeedsError
from .human import sleep_random
from .selectors import FILTER_BUTTON, FILTER_PANEL
from .types import Feed, FilterOption
from .urls import make_search_url
... ... @@ -139,7 +140,7 @@ def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
while time.monotonic() < deadline:
if page.has_element(FILTER_PANEL):
break
time.sleep(0.3)
sleep_random(300, 600)
# 点击各筛选项
for filters_index, tags_index in filters:
... ... @@ -148,7 +149,7 @@ def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
f"div.tags:nth-child({tags_index})"
)
page.click_element(selector)
time.sleep(0.3)
sleep_random(300, 600)
# 等待页面更新
page.wait_dom_stable()
... ...
"""反检测 JS 注入 + Chrome 启动参数,对应 go-rod/stealth。"""
# 真实 Chrome UA(固定版本,避免每次随机导致指纹不一致)
REALISTIC_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
# 反检测 JS 脚本:在页面加载时注入
STEALTH_JS = """
(() => {
... ... @@ -72,6 +79,45 @@ STEALTH_JS = """
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
// 7. hardwareConcurrency — 随机 4 或 8
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 8. deviceMemory — 随机 4 或 8
Object.defineProperty(navigator, 'deviceMemory', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 9. navigator.connection — 伪造网络信息
Object.defineProperty(navigator, 'connection', {
get: () => ({
effectiveType: '4g',
downlink: 10,
rtt: 50,
saveData: false,
}),
configurable: true,
});
// 10. chrome.csi / chrome.loadTimes — 空函数伪装
if (window.chrome) {
window.chrome.csi = function() { return {}; };
window.chrome.loadTimes = function() { return {}; };
}
// 11. outerWidth/outerHeight — 与 innerWidth/innerHeight 对齐
Object.defineProperty(window, 'outerWidth', {
get: () => window.innerWidth,
configurable: true,
});
Object.defineProperty(window, 'outerHeight', {
get: () => window.innerHeight,
configurable: true,
});
})();
"""
... ... @@ -85,4 +131,6 @@ STEALTH_ARGS = [
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-component-update",
"--disable-extensions",
"--disable-sync",
]
... ...
... ... @@ -159,6 +159,9 @@ class Feed:
"sharedCount": self.note_card.interact_info.shared_count,
},
}
cover = self.note_card.cover
if cover.url or cover.url_default:
result["cover"] = cover.url or cover.url_default
if self.note_card.video:
result["video"] = {"duration": self.note_card.video.capa.duration}
return result
... ...