You need to sign in or sign up before continuing.
Angiin

fix: 修复发布流程 4 个 bug — 页面关闭、标签错位、换行丢失、标签重复

- cli.py: 分步发布命令(fill-publish/fill-publish-video/long-article/select-template/next-step)不再关闭页面,新增 _connect_existing 复用已有 tab
- cdp.py: input_content_editable 遇到 \n 时按 Enter 键,正确产生段落换行
- publish.py: _input_tags 先 focus 正文编辑器再输入标签;新增 _extract_hashtags_from_content 从正文末尾提取 hashtag 合并到 tags 去重;标签输入增加重试等待联想下拉
... ... @@ -12,6 +12,12 @@ import json
import logging
import sys
# Windows 控制台默认编码(如 cp1252)不支持中文,强制 UTF-8
if sys.stdout and hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if sys.stderr and hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s: %(message)s",
... ... @@ -27,14 +33,43 @@ def _output(data: dict, exit_code: int = 0) -> None:
def _connect(args: argparse.Namespace):
"""连接到 Chrome 并返回 (browser, page)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port):
_output(
{"success": False, "error": "无法启动 Chrome,请检查 Chrome 是否已安装"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.new_page()
return browser, page
def _connect_existing(args: argparse.Namespace):
"""连接到 Chrome 并复用已有页面(用于分步发布的后续步骤)。"""
from chrome_launcher import ensure_chrome
from xhs.cdp import Browser
if not ensure_chrome(port=args.port):
_output(
{"success": False, "error": "无法连接到 Chrome"},
exit_code=2,
)
browser = Browser(host=args.host, port=args.port)
browser.connect()
page = browser.get_existing_page()
if not page:
_output(
{"success": False, "error": "未找到已打开的页面,请先执行前置步骤"},
exit_code=2,
)
return browser, page
def _headless_fallback(port: int) -> None:
"""Headless 模式未登录时自动降级到有窗口模式。"""
from chrome_launcher import restart_chrome
... ... @@ -332,7 +367,7 @@ def cmd_fill_publish(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,让用户在浏览器中预览
browser.close()
... ... @@ -368,15 +403,15 @@ def cmd_fill_publish_video(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,让用户在浏览器中预览
browser.close()
def cmd_click_publish(args: argparse.Namespace) -> None:
"""点击发布按钮(在用户确认后调用)。"""
"""点击发布按钮(在用户确认后调用)。复用已有的发布页 tab。"""
from xhs.publish import click_publish_button
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
click_publish_button(page)
_output({"success": True, "status": "发布完成"})
... ... @@ -410,15 +445,15 @@ def cmd_long_article(args: argparse.Namespace) -> None:
}
)
finally:
browser.close_page(page)
# 不关闭页面,后续 select-template / next-step 需要复用
browser.close()
def cmd_select_template(args: argparse.Namespace) -> None:
"""选择排版模板。"""
"""选择排版模板。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import select_template
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
selected = select_template(page, args.name)
if selected:
... ... @@ -429,23 +464,23 @@ def cmd_select_template(args: argparse.Namespace) -> None:
exit_code=2,
)
finally:
browser.close_page(page)
# 不关闭页面,后续 next-step 需要复用
browser.close()
def cmd_next_step(args: argparse.Namespace) -> None:
"""点击下一步 + 填写发布页描述。"""
"""点击下一步 + 填写发布页描述。复用已有的长文编辑页 tab。"""
from xhs.publish_long_article import click_next_and_fill_description
with open(args.content_file, encoding="utf-8") as f:
description = f.read().strip()
browser, page = _connect(args)
browser, page = _connect_existing(args)
try:
click_next_and_fill_description(page, description)
_output({"success": True, "status": "已进入发布页,等待确认发布"})
finally:
browser.close_page(page)
# 不关闭页面,等待 click-publish
browser.close()
... ...
... ... @@ -287,8 +287,11 @@ class Page:
},
)
time.sleep(0.1)
# 3. 逐字输入(随机 30-80ms 间隔)
# 3. 逐字输入(随机 30-80ms 间隔,换行符转为 Enter 键
for char in text:
if char == "\n":
self.press_key("Enter")
else:
self._send_session(
"Input.dispatchKeyEvent",
{"type": "keyDown", "text": char},
... ...
... ... @@ -5,6 +5,7 @@ from __future__ import annotations
import json
import logging
import random
import re
import time
from .cdp import Page
... ... @@ -127,27 +128,31 @@ def _navigate_to_publish_page(page: Page) -> None:
"""导航到发布页面。"""
page.navigate(PUBLISH_URL)
page.wait_for_load(timeout=300)
time.sleep(2)
time.sleep(3)
page.wait_dom_stable()
time.sleep(1)
time.sleep(2)
def _click_publish_tab(page: Page, tab_name: str) -> None:
"""点击发布页 TAB(上传图文/上传视频)。"""
page.wait_for_element(UPLOAD_CONTENT, timeout=15)
deadline = time.monotonic() + 15
while time.monotonic() < deadline:
# 查找匹配的 TAB
# 查找匹配的 TAB(支持多种结构)
found = page.evaluate(
f"""
(() => {{
const tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
// 策略1: 查找 div.creator-tab(过滤隐藏元素)
let tabs = document.querySelectorAll({json.dumps(CREATOR_TAB)});
for (const tab of tabs) {{
if (tab.textContent.trim() === {json.dumps(tab_name)}) {{
// 检查是否被遮挡
const titleSpan = tab.querySelector('span.title');
const tabText = titleSpan ? titleSpan.textContent.trim() : tab.textContent.trim();
if (tabText === {json.dumps(tab_name)}) {{
const rect = tab.getBoundingClientRect();
const style = window.getComputedStyle(tab);
// 跳过隐藏或被移出视口的元素
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
const x = rect.left + rect.width / 2;
const y = rect.top + rect.height / 2;
const target = document.elementFromPoint(x, y);
... ... @@ -158,6 +163,21 @@ def _click_publish_tab(page: Page, tab_name: str) -> None:
return 'blocked';
}}
}}
// 策略2: 查找任意包含目标文本的元素
const allElements = document.querySelectorAll('*');
for (const el of allElements) {{
if (el.children.length === 0 && el.textContent.trim() === {json.dumps(tab_name)}) {{
const rect = el.getBoundingClientRect();
const style = window.getComputedStyle(el);
if (rect.width === 0 || rect.height === 0) continue;
if (rect.left < 0 || rect.top < 0) continue;
if (style.display === 'none' || style.visibility === 'hidden') continue;
el.click();
return 'clicked';
}}
}}
return 'not_found';
}})()
"""
... ... @@ -172,6 +192,19 @@ def _click_publish_tab(page: Page, tab_name: str) -> None:
time.sleep(0.2)
# 调试:输出页面信息
debug_info = page.evaluate("""
(() => {
const creatorTabs = document.querySelectorAll('div.creator-tab');
const tabTexts = Array.from(creatorTabs).map(t => ({
text: t.textContent.trim(),
html: t.outerHTML.substring(0, 200)
}));
const url = window.location.href;
return JSON.stringify({url, tabCount: creatorTabs.length, tabs: tabTexts});
})()
""")
logger.error("调试信息: %s", debug_info)
raise PublishError(f"没有找到发布 TAB - {tab_name}")
... ... @@ -223,6 +256,34 @@ def _wait_for_upload_complete(page: Page, expected_count: int) -> None:
# ========== 表单提交 ==========
def _extract_hashtags_from_content(content: str, tags: list[str]) -> tuple[str, list[str]]:
"""从正文末尾提取 hashtag 行,合并到 tags 列表。
Returns:
(cleaned_content, merged_tags)
"""
lines = content.rstrip().split("\n")
# 检查最后一行是否全是 #tag 格式
if lines:
last_line = lines[-1].strip()
hashtag_pattern = re.compile(r"^(#\S+\s*)+$")
if hashtag_pattern.match(last_line):
# 提取 hashtag
extracted = re.findall(r"#(\S+)", last_line)
# 合并到 tags(去重)
existing = {t.lstrip("#") for t in tags}
merged = list(tags)
for t in extracted:
if t not in existing:
merged.append(t)
existing.add(t)
# 去掉最后一行
cleaned = "\n".join(lines[:-1]).rstrip()
logger.info("从正文末尾提取 %d 个标签,合并后共 %d 个", len(extracted), len(merged))
return cleaned, merged
return content, list(tags)
def _fill_publish_form(
page: Page,
title: str,
... ... @@ -233,6 +294,9 @@ def _fill_publish_form(
visibility: str,
) -> None:
"""填写表单(不点击发布)。"""
# 从正文末尾提取 hashtag 并合并到 tags
content, tags = _extract_hashtags_from_content(content, tags)
# 标题
page.input_text(TITLE_INPUT, title)
time.sleep(0.5)
... ... @@ -334,6 +398,10 @@ def _input_tags(page: Page, content_selector: str, tags: list[str]) -> None:
"""输入标签。"""
time.sleep(1)
# 先点击正文编辑器,确保焦点在正文而非标题
page.click_element(content_selector)
time.sleep(0.3)
# 移动光标到正文末尾(20次 ArrowDown)
for _ in range(20):
page.press_key("ArrowDown")
... ... @@ -353,27 +421,32 @@ def _input_single_tag(page: Page, content_selector: str, tag: str) -> None:
"""输入单个标签。"""
# 输入 #
page.type_text("#", delay_ms=0)
time.sleep(0.2)
time.sleep(0.3)
# 逐字输入标签
# 逐字输入标签(随机间隔模拟真实输入)
for char in tag:
page.type_text(char, delay_ms=50)
time.sleep(1)
page.type_text(char, delay_ms=0)
time.sleep(random.uniform(0.05, 0.12))
# 尝试点击标签联想
# 等待标签联想出现(最多 3 秒)
deadline = time.monotonic() + 3.0
clicked = False
while time.monotonic() < deadline:
time.sleep(0.5)
if page.has_element(TAG_TOPIC_CONTAINER):
item_selector = f"{TAG_TOPIC_CONTAINER} {TAG_FIRST_ITEM}"
if page.has_element(item_selector):
page.click_element(item_selector)
logger.info("点击标签联想: %s", tag)
time.sleep(0.5)
return
clicked = True
break
if not clicked:
# 没有联想,直接空格
logger.warning("未找到标签联想,直接输入空格: %s", tag)
page.type_text(" ", delay_ms=0)
time.sleep(0.5)
time.sleep(0.8)
# ========== 定时发布 ==========
... ...