Angiin

feat: 增强反检测能力 — JS 伪装、CDP 真实交互、随机延迟

- stealth.py: 新增 5 项 JS 覆盖(hardwareConcurrency/deviceMemory/connection/chrome.csi+loadTimes/outerWidth+Height)、REALISTIC_UA 常量、--disable-extensions/--disable-sync 启动参数
- cdp.py: click_element 改用 CDP Input 事件(isTrusted=true)、input_content_editable 改用逐字 CDP 键入、new_page 注入 UA 覆盖和随机 viewport
- human.py: 新增 navigation_delay() 函数
- comment.py: 所有 time.sleep 替换为 sleep_random,评论输入改用 CDP 逐字输入
- search.py/feed_detail.py/login.py: 固定延迟替换为随机区间延迟
... ... @@ -7,6 +7,7 @@ from __future__ import annotations
import json
import logging
import random
import time
from typing import Any
... ... @@ -14,7 +15,7 @@ import requests
import websockets.sync.client as ws_client
from .errors import CDPError, ElementNotFoundError
from .stealth import STEALTH_JS
from .stealth import REALISTIC_UA, STEALTH_JS
logger = logging.getLogger(__name__)
... ... @@ -211,15 +212,25 @@ class Page:
raise ElementNotFoundError(selector)
def click_element(self, selector: str) -> None:
"""点击指定选择器的元素。"""
self.evaluate(
"""点击指定选择器的元素(通过 CDP Input 事件,isTrusted=true)。"""
box = self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (el) el.click();
if (!el) return null;
el.scrollIntoView({{block: 'center'}});
const rect = el.getBoundingClientRect();
return {{x: rect.left + rect.width / 2, y: rect.top + rect.height / 2}};
}})()
"""
)
if not box:
return
x = box["x"] + random.uniform(-3, 3)
y = box["y"] + random.uniform(-3, 3)
self.mouse_move(x, y)
time.sleep(random.uniform(0.03, 0.08))
self.mouse_click(x, y)
def input_text(self, selector: str, text: str) -> None:
"""向指定选择器的元素输入文本。"""
... ... @@ -237,18 +248,56 @@ class Page:
)
def input_content_editable(self, selector: str, text: str) -> None:
"""向 contentEditable 元素输入文本(如 div.ql-editor)。"""
"""向 contentEditable 元素输入文本(CDP 逐字输入,模拟真实打字)。"""
# 1. focus 元素
self.evaluate(
f"""
(() => {{
const el = document.querySelector({json.dumps(selector)});
if (!el) return;
el.focus();
el.textContent = {json.dumps(text)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
if (el) el.focus();
}})()
"""
)
time.sleep(0.1)
# 2. 全选清空(Ctrl+A + Backspace)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "key": "a", "code": "KeyA", "modifiers": 2},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyDown",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
self._send_session(
"Input.dispatchKeyEvent",
{
"type": "keyUp",
"key": "Backspace",
"code": "Backspace",
"windowsVirtualKeyCode": 8,
},
)
time.sleep(0.1)
# 3. 逐字输入(随机 30-80ms 间隔)
for char in text:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
)
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyUp", "text": char},
)
time.sleep(random.uniform(0.03, 0.08))
def get_element_text(self, selector: str) -> str | None:
"""获取元素文本内容。"""
... ... @@ -500,14 +549,31 @@ class Browser:
page = Page(self._cdp, target_id, session_id)
# 注入反检测(必须在 enable domains 之前)
page.inject_stealth()
# UA 覆盖
page._send_session(
"Emulation.setUserAgentOverride",
{"userAgent": REALISTIC_UA},
)
# 随机 viewport(模拟真实屏幕尺寸)
page._send_session(
"Emulation.setDeviceMetricsOverride",
{
"width": random.randint(1366, 1920),
"height": random.randint(768, 1080),
"deviceScaleFactor": 1,
"mobile": False,
},
)
# 启用必要的 domain
page._send_session("Page.enable")
page._send_session("DOM.enable")
page._send_session("Runtime.enable")
# 注入反检测
page.inject_stealth()
return page
def get_existing_page(self) -> Page | None:
... ...
... ... @@ -3,10 +3,10 @@
from __future__ import annotations
import logging
import time
from .cdp import Page
from .feed_detail import _check_end_container, _check_page_accessible, _get_comment_count
from .human import sleep_random
from .selectors import (
COMMENT_INPUT_FIELD,
COMMENT_INPUT_TRIGGER,
... ... @@ -37,7 +37,7 @@ def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> Non
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
sleep_random(800, 1500)
_check_page_accessible(page)
... ... @@ -46,27 +46,16 @@ def post_comment(page: Page, feed_id: str, xsec_token: str, content: str) -> Non
raise RuntimeError("未找到评论输入框,该帖子可能不支持评论或网页端不可访问")
page.click_element(COMMENT_INPUT_TRIGGER)
time.sleep(0.5)
sleep_random(400, 800)
# 输入评论内容
# 输入评论内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(1)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(1)
sleep_random(800, 1500)
logger.info("评论发送成功: feed=%s", feed_id)
... ... @@ -103,42 +92,31 @@ def reply_comment(
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
sleep_random(800, 1500)
_check_page_accessible(page)
time.sleep(2)
sleep_random(1500, 2500)
# 查找目标评论
comment_found = _find_and_scroll_to_comment(page, comment_id, user_id)
if not comment_found:
raise RuntimeError(f"未找到评论 (commentID: {comment_id}, userID: {user_id})")
time.sleep(1)
sleep_random(800, 1500)
# 点击回复按钮
reply_selector = f"#comment-{comment_id} {REPLY_BUTTON}" if comment_id else REPLY_BUTTON
page.click_element(reply_selector)
time.sleep(1)
sleep_random(800, 1500)
# 输入回复内容
# 输入回复内容(CDP 逐字输入)
page.wait_for_element(COMMENT_INPUT_FIELD, timeout=5)
page.evaluate(
f"""
(() => {{
const el = document.querySelector({_js_str(COMMENT_INPUT_FIELD)});
if (el) {{
el.focus();
el.textContent = {_js_str(content)};
el.dispatchEvent(new Event('input', {{bubbles: true}}));
}}
}})()
"""
)
time.sleep(0.5)
page.input_content_editable(COMMENT_INPUT_FIELD, content)
sleep_random(600, 1200)
# 点击提交
page.click_element(COMMENT_SUBMIT_BUTTON)
time.sleep(2)
sleep_random(1500, 2500)
logger.info("回复评论成功")
... ... @@ -154,7 +132,7 @@ def _find_and_scroll_to_comment(
# 先滚动到评论区
page.scroll_element_into_view(".comments-container")
time.sleep(1)
sleep_random(800, 1500)
last_count = 0
stagnant = 0
... ... @@ -179,11 +157,11 @@ def _find_and_scroll_to_comment(
# 滚动到最后一条评论
if current_count > 0:
page.scroll_nth_element_into_view(PARENT_COMMENT, current_count - 1)
time.sleep(0.3)
sleep_random(200, 500)
# 继续滚动
page.evaluate("window.scrollBy(0, window.innerHeight * 0.8)")
time.sleep(0.5)
sleep_random(400, 800)
# 通过 commentID 查找
if comment_id:
... ... @@ -215,7 +193,7 @@ def _find_and_scroll_to_comment(
logger.info("通过 userID 找到评论 (尝试 %d 次)", attempt + 1)
return True
time.sleep(0.8)
sleep_random(600, 1200)
return False
... ...
... ... @@ -58,6 +58,15 @@ _INACCESSIBLE_KEYWORDS = [
"仅作者可见",
"因用户设置,你无法查看",
"因违规无法查看",
"Isn't Available",
"isn't available",
]
# 扫码验证关键词(触发反爬机制)
_SCAN_QRCODE_KEYWORDS = [
"扫码查看",
"打开小红书App扫码",
"请使用小红书App扫码",
]
_REPLY_COUNT_RE = re.compile(r"展开\s*(\d+)\s*条回复")
... ... @@ -110,10 +119,10 @@ def get_feed_detail(
else:
raise RuntimeError("页面导航失败")
sleep_random(1000, 1000)
sleep_random(800, 1500)
# 检查页面可访问性
_check_page_accessible(page)
# 检查页面可访问性(扫码验证时自动等待重试)
_check_page_accessible(page, url)
# 加载全部评论
if load_all_comments:
... ... @@ -128,8 +137,11 @@ def get_feed_detail(
# ========== 页面检查 ==========
def _check_page_accessible(page: Page) -> None:
"""检查页面是否可访问。"""
def _check_page_accessible(page: Page, url: str = "") -> None:
"""检查页面是否可访问。
扫码验证场景:等待 10 秒后自动重新访问,验证消失则继续,否则报错。
"""
time.sleep(0.5)
text = page.get_element_text(ACCESS_ERROR_WRAPPER)
... ... @@ -137,6 +149,28 @@ def _check_page_accessible(page: Page) -> None:
return
text = text.strip()
# 检测扫码验证(反爬机制触发)→ 等待后重试
if _is_scan_qrcode_verification(text) and url:
logger.warning("触发小红书扫码验证,等待 10 秒后重新访问...")
time.sleep(10)
page.navigate(url)
page.wait_for_load()
page.wait_dom_stable()
time.sleep(1)
retry_text = page.get_element_text(ACCESS_ERROR_WRAPPER)
if retry_text and _is_scan_qrcode_verification(retry_text.strip()):
raise PageNotAccessibleError(
"触发了小红书验证,需要在浏览器中扫码完成验证后重试。"
"这通常是小红书的反爬机制,请稍后再试或在 Chrome 中手动打开该笔记完成验证"
)
if not retry_text or not retry_text.strip():
logger.info("验证已消失,继续加载笔记")
return
# 重试后仍有其他错误,继续走下面的关键词检测
text = retry_text.strip()
for kw in _INACCESSIBLE_KEYWORDS:
if kw in text:
raise PageNotAccessibleError(kw)
... ... @@ -145,6 +179,11 @@ def _check_page_accessible(page: Page) -> None:
raise PageNotAccessibleError(text)
def _is_scan_qrcode_verification(text: str) -> bool:
"""判断页面文本是否为扫码验证。"""
return any(kw in text for kw in _SCAN_QRCODE_KEYWORDS)
# ========== 数据提取 ==========
... ...
... ... @@ -32,6 +32,11 @@ def sleep_random(min_ms: int, max_ms: int) -> None:
time.sleep(delay)
def navigation_delay() -> None:
"""页面导航后的随机等待,模拟人类阅读。"""
sleep_random(1000, 2500)
def get_scroll_interval(speed: str) -> float:
"""根据速度获取滚动间隔(秒)。"""
if speed == "slow":
... ...
... ... @@ -9,6 +9,7 @@ import tempfile
import time
from .cdp import Page
from .human import sleep_random
from .selectors import LOGIN_STATUS, QRCODE_IMG
from .urls import EXPLORE_URL
... ... @@ -23,7 +24,7 @@ def check_login_status(page: Page) -> bool:
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(1)
sleep_random(800, 1500)
return page.has_element(LOGIN_STATUS)
... ... @@ -38,7 +39,7 @@ def fetch_qrcode(page: Page) -> tuple[str, bool]:
"""
page.navigate(EXPLORE_URL)
page.wait_for_load()
time.sleep(2)
sleep_random(1500, 2500)
# 检查是否已登录
if page.has_element(LOGIN_STATUS):
... ...
... ... @@ -8,6 +8,7 @@ import time
from .cdp import Page
from .errors import NoFeedsError
from .human import sleep_random
from .selectors import FILTER_BUTTON, FILTER_PANEL
from .types import Feed, FilterOption
from .urls import make_search_url
... ... @@ -139,7 +140,7 @@ def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
while time.monotonic() < deadline:
if page.has_element(FILTER_PANEL):
break
time.sleep(0.3)
sleep_random(300, 600)
# 点击各筛选项
for filters_index, tags_index in filters:
... ... @@ -148,7 +149,7 @@ def _apply_filters(page: Page, filters: list[tuple[int, int]]) -> None:
f"div.tags:nth-child({tags_index})"
)
page.click_element(selector)
time.sleep(0.3)
sleep_random(300, 600)
# 等待页面更新
page.wait_dom_stable()
... ...
"""反检测 JS 注入 + Chrome 启动参数,对应 go-rod/stealth。"""
# 真实 Chrome UA(固定版本,避免每次随机导致指纹不一致)
REALISTIC_UA = (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/131.0.0.0 Safari/537.36"
)
# 反检测 JS 脚本:在页面加载时注入
STEALTH_JS = """
(() => {
... ... @@ -72,6 +79,45 @@ STEALTH_JS = """
if (parameter === 37446) return 'Intel Iris OpenGL Engine';
return getParameter.call(this, parameter);
};
// 7. hardwareConcurrency — 随机 4 或 8
Object.defineProperty(navigator, 'hardwareConcurrency', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 8. deviceMemory — 随机 4 或 8
Object.defineProperty(navigator, 'deviceMemory', {
get: () => [4, 8][Math.floor(Math.random() * 2)],
configurable: true,
});
// 9. navigator.connection — 伪造网络信息
Object.defineProperty(navigator, 'connection', {
get: () => ({
effectiveType: '4g',
downlink: 10,
rtt: 50,
saveData: false,
}),
configurable: true,
});
// 10. chrome.csi / chrome.loadTimes — 空函数伪装
if (window.chrome) {
window.chrome.csi = function() { return {}; };
window.chrome.loadTimes = function() { return {}; };
}
// 11. outerWidth/outerHeight — 与 innerWidth/innerHeight 对齐
Object.defineProperty(window, 'outerWidth', {
get: () => window.innerWidth,
configurable: true,
});
Object.defineProperty(window, 'outerHeight', {
get: () => window.innerHeight,
configurable: true,
});
})();
"""
... ... @@ -85,4 +131,6 @@ STEALTH_ARGS = [
"--disable-backgrounding-occluded-windows",
"--disable-renderer-backgrounding",
"--disable-component-update",
"--disable-extensions",
"--disable-sync",
]
... ...