Angiin

chore: Chrome 启动器增强、运行锁修复、长文发布和类型优化

... ... @@ -2,14 +2,17 @@
from __future__ import annotations
import contextlib
import json
import logging
import os
import platform
import shutil
import signal
import socket
import subprocess
import sys
import time
from pathlib import Path
from xhs.stealth import STEALTH_ARGS
... ... @@ -18,6 +21,9 @@ logger = logging.getLogger(__name__)
# 默认远程调试端口
DEFAULT_PORT = 9222
# 全局进程追踪
_chrome_process: subprocess.Popen | None = None
# 各平台 Chrome 默认路径
_CHROME_PATHS: dict[str, list[str]] = {
"Darwin": [
... ... @@ -38,6 +44,22 @@ _CHROME_PATHS: dict[str, list[str]] = {
}
def _get_default_data_dir() -> str:
"""返回默认 Chrome Profile 目录路径。"""
return str(Path.home() / ".xhs" / "chrome-profile")
def is_port_open(port: int, host: str = "127.0.0.1") -> bool:
"""TCP socket 级端口检测(秒级响应)。"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(1)
try:
s.connect((host, port))
return True
except (ConnectionRefusedError, TimeoutError, OSError):
return False
def find_chrome() -> str | None:
"""查找 Chrome 可执行文件路径。"""
# 环境变量优先
... ... @@ -45,13 +67,28 @@ def find_chrome() -> str | None:
if env_path and os.path.isfile(env_path):
return env_path
# which/where 查找
chrome = shutil.which("google-chrome") or shutil.which("chromium")
# which/where 查找(含 Windows chrome.exe)
chrome = (
shutil.which("google-chrome")
or shutil.which("chromium")
or shutil.which("chrome")
or shutil.which("chrome.exe")
)
if chrome:
return chrome
# 平台默认路径
system = platform.system()
# Windows: 额外检查环境变量路径
if system == "Windows":
for env_var in ("PROGRAMFILES", "PROGRAMFILES(X86)", "LOCALAPPDATA"):
base = os.environ.get(env_var, "")
if base:
candidate = os.path.join(base, "Google", "Chrome", "Application", "chrome.exe")
if os.path.isfile(candidate):
return candidate
for path in _CHROME_PATHS.get(system, []):
if os.path.isfile(path):
return path
... ... @@ -59,55 +96,70 @@ def find_chrome() -> str | None:
return None
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行(TCP 级检测)。"""
return is_port_open(port)
def launch_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen:
) -> subprocess.Popen | None:
"""启动 Chrome 进程(带远程调试端口)。
Args:
port: 远程调试端口。
headless: 是否无头模式。
user_data_dir: 用户数据目录(Profile 隔离)。
user_data_dir: 用户数据目录(Profile 隔离),默认 ~/.xhs/chrome-profile
chrome_bin: Chrome 可执行文件路径。
Returns:
Chrome 子进程。
Chrome 子进程,若已在运行则返回 None
Raises:
FileNotFoundError: 未找到 Chrome。
"""
global _chrome_process
# 已在运行则跳过
if is_port_open(port):
logger.info("Chrome 已在运行 (port=%d),跳过启动", port)
return None
if not chrome_bin:
chrome_bin = find_chrome()
if not chrome_bin:
raise FileNotFoundError("未找到 Chrome,请设置 CHROME_BIN 环境变量或安装 Chrome")
# 默认 user-data-dir
if not user_data_dir:
user_data_dir = _get_default_data_dir()
args = [
chrome_bin,
f"--remote-debugging-port={port}",
f"--user-data-dir={user_data_dir}",
*STEALTH_ARGS,
]
if headless:
args.append("--headless=new")
if user_data_dir:
args.append(f"--user-data-dir={user_data_dir}")
# 代理
proxy = os.getenv("XHS_PROXY")
if proxy:
args.append(f"--proxy-server={proxy}")
logger.info("使用代理: %s", _mask_proxy(proxy))
logger.info("启动 Chrome: port=%d, headless=%s", port, headless)
logger.info("启动 Chrome: port=%d, headless=%s, profile=%s", port, headless, user_data_dir)
process = subprocess.Popen(
args,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
_chrome_process = process
# 等待 Chrome 准备就绪
_wait_for_chrome(port)
... ... @@ -120,7 +172,7 @@ def close_chrome(process: subprocess.Popen) -> None:
return
try:
process.send_signal(signal.SIGTERM)
process.terminate()
process.wait(timeout=5)
except (subprocess.TimeoutExpired, OSError):
process.kill()
... ... @@ -129,29 +181,20 @@ def close_chrome(process: subprocess.Popen) -> None:
logger.info("Chrome 进程已关闭")
def is_chrome_running(port: int = DEFAULT_PORT) -> bool:
"""检查指定端口的 Chrome 是否在运行。"""
import requests
try:
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
return resp.status_code == 200
except (requests.ConnectionError, requests.Timeout):
return False
def kill_chrome(port: int = DEFAULT_PORT) -> None:
"""关闭指定端口的 Chrome 实例。
尝试通过 CDP Browser.close 命令关闭,失败则使用进程信号
策略: CDP Browser.close → terminate 追踪进程 → 端口查找终止进程
Args:
port: Chrome 调试端口。
"""
import requests
global _chrome_process
# 策略1: 通过 CDP 关闭
try:
import requests
resp = requests.get(f"http://127.0.0.1:{port}/json/version", timeout=2)
if resp.status_code == 200:
ws_url = resp.json().get("webSocketDebuggerUrl")
... ... @@ -163,32 +206,70 @@ def kill_chrome(port: int = DEFAULT_PORT) -> None:
ws.close()
logger.info("通过 CDP Browser.close 关闭 Chrome (port=%d)", port)
time.sleep(1)
return
except Exception:
pass
# 策略2: 通过 lsof 查找并 kill 进程
# 策略2: terminate 追踪的子进程
if _chrome_process and _chrome_process.poll() is None:
try:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0 and result.stdout.strip():
import contextlib
pids = result.stdout.strip().split("\n")
_chrome_process.terminate()
_chrome_process.wait(timeout=5)
logger.info("通过 terminate 关闭追踪的 Chrome 进程")
except Exception:
with contextlib.suppress(Exception):
_chrome_process.kill()
_chrome_process = None
# 策略3: 通过端口查找并终止进程(跨平台)
if is_port_open(port):
pids = _find_pids_by_port(port)
if pids:
for pid in pids:
with contextlib.suppress(OSError, ValueError):
os.kill(int(pid), signal.SIGTERM)
logger.info("通过 SIGTERM 关闭 Chrome 进程 (port=%d)", port)
time.sleep(1)
_kill_pid(pid)
logger.info("通过进程终止关闭 Chrome (port=%d)", port)
# 等待端口释放(最多 5s)
deadline = time.monotonic() + 5
while time.monotonic() < deadline:
if not is_port_open(port):
return
except Exception:
pass
time.sleep(0.5)
if is_port_open(port):
logger.warning("端口 %d 仍被占用,kill 可能未完全生效", port)
def ensure_chrome(
port: int = DEFAULT_PORT,
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> bool:
"""确保 Chrome 在指定端口可用(一站式入口)。
如果 Chrome 已在运行,直接返回 True。
否则尝试启动 Chrome 并等待端口就绪。
Args:
port: 远程调试端口。
headless: 是否无头模式(仅新启动时生效)。
user_data_dir: 用户数据目录。
chrome_bin: Chrome 可执行文件路径。
logger.warning("未能关闭 Chrome (port=%d)", port)
Returns:
True 表示 Chrome 可用,False 表示启动失败。
"""
if is_port_open(port):
return True
try:
launch_chrome(
port=port, headless=headless, user_data_dir=user_data_dir, chrome_bin=chrome_bin,
)
return is_port_open(port)
except FileNotFoundError as e:
logger.error("启动 Chrome 失败: %s", e)
return False
def restart_chrome(
... ... @@ -196,7 +277,7 @@ def restart_chrome(
headless: bool = False,
user_data_dir: str | None = None,
chrome_bin: str | None = None,
) -> subprocess.Popen:
) -> subprocess.Popen | None:
"""重启 Chrome:关闭当前实例后以新模式重新启动。
Args:
... ... @@ -206,7 +287,7 @@ def restart_chrome(
chrome_bin: Chrome 可执行文件路径。
Returns:
新的 Chrome 子进程。
新的 Chrome 子进程,或 None
"""
logger.info("重启 Chrome: port=%d, headless=%s", port, headless)
kill_chrome(port)
... ... @@ -220,16 +301,70 @@ def restart_chrome(
def _wait_for_chrome(port: int, timeout: float = 15.0) -> None:
"""等待 Chrome 调试端口就绪。"""
"""等待 Chrome 调试端口就绪(TCP 级检测)。"""
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if is_chrome_running(port):
if is_port_open(port):
logger.info("Chrome 已就绪 (port=%d)", port)
return
time.sleep(0.5)
logger.warning("等待 Chrome 就绪超时 (port=%d)", port)
def _find_pids_by_port(port: int) -> list[int]:
"""查找占用指定端口的进程 PID(跨平台)。"""
try:
if sys.platform == "win32":
result = subprocess.run(
["netstat", "-ano", "-p", "TCP"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0:
return []
pids: list[int] = []
for line in result.stdout.splitlines():
if f":{port}" in line and "LISTENING" in line:
parts = line.split()
with contextlib.suppress(ValueError, IndexError):
pids.append(int(parts[-1]))
return list(set(pids))
else:
result = subprocess.run(
["lsof", "-ti", f":{port}"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode != 0 or not result.stdout.strip():
return []
pids = []
for p in result.stdout.strip().split("\n"):
with contextlib.suppress(ValueError):
pids.append(int(p))
return pids
except Exception:
return []
def _kill_pid(pid: int) -> None:
"""终止指定 PID 的进程(跨平台)。"""
try:
if sys.platform == "win32":
subprocess.run(
["taskkill", "/PID", str(pid), "/F"],
capture_output=True,
timeout=5,
)
else:
import signal
os.kill(pid, signal.SIGTERM)
except Exception:
logger.debug("终止进程 %d 失败", pid)
def _mask_proxy(proxy_url: str) -> str:
"""隐藏代理 URL 中的敏感信息。"""
from urllib.parse import urlparse
... ...
... ... @@ -71,7 +71,7 @@ class RunLock:
# 检查进程是否存在
os.kill(pid, 0)
return False
except (FileNotFoundError, ValueError, ProcessLookupError, PermissionError):
except (ValueError, OSError):
return True
def _force_release(self) -> None:
... ...
... ... @@ -5,6 +5,7 @@ from __future__ import annotations
import json
import logging
import time
from pathlib import Path
from .cdp import Page
from .errors import PublishError
... ... @@ -217,14 +218,14 @@ def _fill_long_content(page: Page, content: str) -> None:
def _insert_images_to_editor(page: Page, image_paths: list[str]) -> None:
"""将图片插入到编辑器中。"""
for img_path in image_paths:
normalized = img_path.replace("\\", "/")
file_uri = Path(img_path).resolve().as_uri()
page.evaluate(
f"""
(() => {{
const editor = document.querySelector({json.dumps(CONTENT_EDITOR)});
if (!editor) return false;
const img = document.createElement('img');
img.src = 'file:///' + {json.dumps(normalized)};
img.src = {json.dumps(file_uri)};
editor.appendChild(img);
editor.dispatchEvent(new Event('input', {{ bubbles: true }}));
return true;
... ...
... ... @@ -159,6 +159,9 @@ class Feed:
"sharedCount": self.note_card.interact_info.shared_count,
},
}
cover = self.note_card.cover
if cover.url or cover.url_default:
result["cover"] = cover.url or cover.url_default
if self.note_card.video:
result["video"] = {"duration": self.note_card.video.capa.duration}
return result
... ...