publish_pipeline.py
4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""发布编排器:下载 → 登录检查 → 发布 → 报告。"""
from __future__ import annotations
import json
import logging
import sys
from image_downloader import process_images
from title_utils import calc_title_length
from xhs.cdp import Browser
from xhs.login import check_login_status
from xhs.publish import publish_image_content
from xhs.publish_video import publish_video_content
from xhs.types import PublishImageContent, PublishVideoContent
logger = logging.getLogger(__name__)
def run_publish_pipeline(
title: str,
content: str,
images: list[str] | None = None,
video: str | None = None,
tags: list[str] | None = None,
schedule_time: str | None = None,
is_original: bool = False,
visibility: str = "",
host: str = "127.0.0.1",
port: int = 9222,
account: str = "",
) -> dict:
"""执行完整发布流水线。
Returns:
发布结果字典。
"""
# 标题长度校验
title_len = calc_title_length(title)
if title_len > 20:
return {"success": False, "error": f"标题长度超限: {title_len}/20"}
# 处理图片(下载 URL / 验证本地路径)
local_images: list[str] = []
if images:
local_images = process_images(images)
if not local_images:
return {"success": False, "error": "没有有效的图片"}
# 连接浏览器
browser = Browser(host=host, port=port)
browser.connect()
try:
page = browser.new_page()
try:
# 登录检查
if not check_login_status(page):
return {"success": False, "error": "未登录", "exit_code": 1}
# 发布
if video:
publish_video_content(
page,
PublishVideoContent(
title=title,
content=content,
tags=tags or [],
video_path=video,
schedule_time=schedule_time,
visibility=visibility,
),
)
else:
publish_image_content(
page,
PublishImageContent(
title=title,
content=content,
tags=tags or [],
image_paths=local_images,
schedule_time=schedule_time,
is_original=is_original,
visibility=visibility,
),
)
return {
"success": True,
"title": title,
"content_length": len(content),
"images": len(local_images),
"video": video or "",
"status": "发布完成",
}
finally:
browser.close_page(page)
finally:
browser.close()
def main() -> None:
"""CLI 入口(被 cli.py 的 publish/publish-video 子命令调用时使用)。"""
import argparse
parser = argparse.ArgumentParser(description="小红书发布流水线")
parser.add_argument("--title-file", required=True, help="标题文件路径")
parser.add_argument("--content-file", required=True, help="正文文件路径")
parser.add_argument("--images", nargs="*", help="图片路径或 URL 列表")
parser.add_argument("--video", help="视频文件路径")
parser.add_argument("--tags", nargs="*", help="标签列表")
parser.add_argument("--schedule-at", help="定时发布时间 (ISO8601)")
parser.add_argument("--original", action="store_true", help="声明原创")
parser.add_argument("--visibility", default="", help="可见范围")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", type=int, default=9222)
parser.add_argument("--account", default="")
args = parser.parse_args()
# 读取标题和正文
with open(args.title_file, encoding="utf-8") as f:
title = f.read().strip()
with open(args.content_file, encoding="utf-8") as f:
content = f.read().strip()
result = run_publish_pipeline(
title=title,
content=content,
images=args.images,
video=args.video,
tags=args.tags,
schedule_time=args.schedule_at,
is_original=args.original,
visibility=args.visibility,
host=args.host,
port=args.port,
account=args.account,
)
print(json.dumps(result, ensure_ascii=False, indent=2))
sys.exit(0 if result["success"] else 2)
if __name__ == "__main__":
main()