export_excel.py 14.8 KB
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429
"""导出小红书搜索结果和评论到 Excel。"""

from __future__ import annotations

import logging
import time
from datetime import datetime
from pathlib import Path

from openpyxl import load_workbook
from openpyxl.styles import Alignment, Font, PatternFill

from .errors import NoFeedDetailError, PageNotAccessibleError, XHSError
from .feed_detail import get_feed_detail
from .search import search_feeds
from .types import Comment, Feed, FilterOption

logger = logging.getLogger(__name__)

# Sheet 名称
SHEET_FEEDS = "文章列表"
SHEET_COMMENTS = "评论列表"

# 评论查询状态
STATUS_PENDING = ""  # 未查询
STATUS_SUCCESS = "成功"
STATUS_FAILED = "失败"

# 标题行样式
HEADER_FONT = Font(bold=True, color="FFFFFF")
HEADER_FILL = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
HEADER_ALIGNMENT = Alignment(horizontal="center", vertical="center", wrap_text=True)


class ExportExcel:
    """导出小红书数据到 Excel。"""

    def __init__(self, file_path: str, keyword: str) -> None:
        self.file_path = Path(file_path)
        self.keyword = keyword
        self.wb = None
        self.feeds_sheet = None
        self.comments_sheet = None

        # 列定义(从 1 开始)
        self.feeds_headers = [
            "序号", "标题", "作者", "类型", "点赞数", "评论数", "收藏数",
            "评论查询状态", "文章id", "xsec_token", "链接"
        ]
        self.comments_headers = [
            "文章id", "评论内容", "评论时间", "点赞数", "用户", "IP属地", "回复数"
        ]

    def create(self) -> None:
        """创建新的 Excel 文件并初始化表头。"""
        from openpyxl import Workbook

        self.wb = Workbook()
        self.wb.remove(self.wb.active)  # 删除默认 sheet

        # 创建文章列表 sheet
        self.feeds_sheet = self.wb.create_sheet(SHEET_FEEDS)
        self._write_feeds_headers()

        # 创建评论列表 sheet
        self.comments_sheet = self.wb.create_sheet(SHEET_COMMENTS)
        self._write_comments_headers()

        self.wb.save(self.file_path)
        logger.info("创建 Excel 文件: %s", self.file_path)

    def load(self) -> bool:
        """加载已存在的 Excel 文件。"""
        if not self.file_path.exists():
            return False

        self.wb = load_workbook(self.file_path)
        self.feeds_sheet = self.wb[SHEET_FEEDS]
        self.comments_sheet = self.wb[SHEET_COMMENTS]
        logger.info("加载已存在的 Excel 文件: %s", self.file_path)
        return True

    def _write_feeds_headers(self) -> None:
        """写入文章列表表头。"""
        for col, header in enumerate(self.feeds_headers, 1):
            cell = self.feeds_sheet.cell(row=1, column=col, value=header)
            cell.font = HEADER_FONT
            cell.fill = HEADER_FILL
            cell.alignment = HEADER_ALIGNMENT

        self.feeds_sheet.freeze_panes = "A2"

        # 设置列宽 (A=序号, B=标题, C=作者, D=类型, E=点赞数, F=评论数, G=收藏数, H=状态, I=文章id, J=xsec_token, K=链接)
        col_widths = {
            "A": 6,    # 序号
            "B": 40,   # 标题
            "C": 12,   # 作者
            "D": 6,    # 类型
            "E": 8,    # 点赞数
            "F": 8,    # 评论数
            "G": 8,    # 收藏数
            "H": 12,   # 评论查询状态
            "I": 24,   # 文章id
            "J": 60,   # xsec_token
            "K": 50,   # 链接
        }
        for col_letter, width in col_widths.items():
            self.feeds_sheet.column_dimensions[col_letter].width = width

        # 设置行高
        self.feeds_sheet.row_dimensions[1].height = 25

    def _write_comments_headers(self) -> None:
        """写入评论列表表头。"""
        for col, header in enumerate(self.comments_headers, 1):
            cell = self.comments_sheet.cell(row=1, column=col, value=header)
            cell.font = HEADER_FONT
            cell.fill = HEADER_FILL
            cell.alignment = HEADER_ALIGNMENT

        self.comments_sheet.freeze_panes = "A2"

        # 设置列宽 (A=文章id, B=评论内容, C=评论时间, D=点赞数, E=用户, F=IP属地, G=回复数)
        col_widths = {
            "A": 24,   # 文章id
            "B": 60,   # 评论内容
            "C": 18,   # 评论时间
            "D": 8,    # 点赞数
            "E": 12,   # 用户
            "F": 12,   # IP属地
            "G": 8,    # 回复数
        }
        for col_letter, width in col_widths.items():
            self.comments_sheet.column_dimensions[col_letter].width = width

        # 设置行高
        self.comments_sheet.row_dimensions[1].height = 25

    def _get_feeds_next_row(self) -> int:
        """获取文章列表下一个空行号。"""
        return self.feeds_sheet.max_row + 1

    def _get_comments_next_row(self) -> int:
        """获取评论列表下一个空行号。"""
        return self.comments_sheet.max_row + 1

    def append_feed(self, feed: Feed, row: int | None = None) -> int:
        """追加文章数据到 Excel。

        Returns:
            实际写入的行号
        """
        if row is None:
            row = self._get_feeds_next_row()

        interact = feed.note_card.interact_info
        user = feed.note_card.user
        title = feed.note_card.display_title

        data = [
            row - 1,  # 序号(从 1 开始,但Excel行从2开始)
            title,
            user.nickname or user.nick_name,
            "视频" if feed.note_card.type == "video" else "图文",
            interact.liked_count,
            interact.comment_count,
            interact.collected_count,
            STATUS_PENDING,  # 评论查询状态
            feed.id,
            feed.xsec_token,
            f"https://www.xiaohongshu.com/explore/{feed.id}?xsec_token={feed.xsec_token}&xsec_source=pc_feed",
        ]

        for col, value in enumerate(data, 1):
            cell = self.feeds_sheet.cell(row=row, column=col, value=value)
            if col == 2:  # 标题列
                cell.alignment = Alignment(wrap_text=True)
            elif col == 11:  # 链接列
                cell.alignment = Alignment(wrap_text=True)

        # 设置数据行行高
        self.feeds_sheet.row_dimensions[row].height = 18

        self.wb.save(self.file_path)
        return row

    def append_comment(self, feed_id: str, comment: Comment) -> int:
        """追加评论数据到 Excel。

        Args:
            feed_id: 文章的 feed_id,用于跳转链接
        Returns:
            实际写入的行号
        """
        row = self._get_comments_next_row()

        # 格式化评论时间
        comment_time = ""
        if comment.create_time:
            try:
                dt = datetime.fromtimestamp(comment.create_time)
                comment_time = dt.strftime("%Y-%m-%d %H:%M:%S")
            except (ValueError, OSError):
                comment_time = str(comment.create_time)

        data = [
            feed_id,
            comment.content,
            comment_time,
            comment.like_count,
            comment.user_info.nickname or comment.user_info.nick_name,
            comment.ip_location,
            comment.sub_comment_count,
        ]

        for col, value in enumerate(data, 1):
            cell = self.comments_sheet.cell(row=row, column=col, value=value)
            if col == 2:  # 评论内容列
                cell.alignment = Alignment(wrap_text=True)
            elif col == 1:  # feed_id 列,添加超链接跳转到文章 sheet
                # 查找 feed_id 对应的行号
                target_row = self._find_feed_row(feed_id)
                if target_row:
                    cell.hyperlink = f"#'{SHEET_FEEDS}'!A{target_row}"
                    cell.font = Font(color="0563C1", underline="single")

        # 设置数据行行高
        self.comments_sheet.row_dimensions[row].height = 18

        self.wb.save(self.file_path)
        return row

    def _find_feed_row(self, feed_id: str) -> int | None:
        """根据 feed_id 查找对应的行号。"""
        for row in range(2, self.feeds_sheet.max_row + 1):
            cell_feed_id = self.feeds_sheet.cell(row=row, column=9).value  # feed_id 列
            if cell_feed_id == feed_id:
                return row
        return None

    def update_feed_status(self, row: int, status: str) -> None:
        """更新文章的评论查询状态。"""
        status_col = 8  # 评论查询状态列
        self.feeds_sheet.cell(row=row, column=status_col, value=status)

        # 设置状态颜色
        cell = self.feeds_sheet.cell(row=row, column=status_col)
        if status == STATUS_SUCCESS:
            cell.fill = PatternFill(start_color="C6EFCE", end_color="C6EFCE", fill_type="solid")
            cell.font = Font(color="006100")
        elif status == STATUS_FAILED:
            cell.fill = PatternFill(start_color="FFC7CE", end_color="FFC7CE", fill_type="solid")
            cell.font = Font(color="9C0006")

        self.wb.save(self.file_path)

    def get_pending_feeds(self) -> list[dict]:
        """获取所有待查询评论的文章。"""
        pending = []
        for row in range(2, self.feeds_sheet.max_row + 1):
            status = self.feeds_sheet.cell(row=row, column=8).value
            if status != STATUS_SUCCESS:
                pending.append({
                    "row": row,
                    "feed_id": self.feeds_sheet.cell(row=row, column=9).value,
                    "xsec_token": self.feeds_sheet.cell(row=row, column=10).value,
                    "title": self.feeds_sheet.cell(row=row, column=2).value,
                })
        return pending

    def get_saved_feeds_count(self) -> int:
        """获取已保存的文章数量。"""
        return max(0, self.feeds_sheet.max_row - 1)

    def save(self) -> None:
        """保存 Excel 文件。"""
        self.wb.save(self.file_path)


def search_and_export(
    page,
    keyword: str,
    output_file: str,
    filter_option: FilterOption | None = None,
    limit: int = 0,
) -> None:
    """搜索文章并导出到 Excel。

    Args:
        page: CDP 页面对象
        keyword: 搜索关键词
        output_file: 输出 Excel 文件路径
        filter_option: 筛选选项
        limit: 限制搜索文章数量,0 表示不限制
    """
    export = ExportExcel(output_file, keyword)

    if export.load():
        logger.info("文件已存在,将追加新数据")
    else:
        export.create()

    # 搜索文章
    logger.info("开始搜索关键词: %s", keyword)
    feeds = search_feeds(page, keyword, filter_option)
    logger.info("搜索到 %d 篇文章", len(feeds))

    # 限制数量
    if limit > 0 and len(feeds) > limit:
        feeds = feeds[:limit]
        logger.info("限制为前 %d 篇", limit)

    start_row = export._get_feeds_next_row() if export.load() else 2

    # 追加到 Excel
    for i, feed in enumerate(feeds):
        row = start_row + i
        export.append_feed(feed, row)
        logger.info("已保存文章 [%d/%d]: %s", i + 1, len(feeds), feed.note_card.display_title[:30])

    logger.info("文章搜索完成,已保存到: %s", output_file)


def poll_comments(
    page,
    excel_file: str,
    feed_index: int | None = None,
    delay: float = 1.0,
) -> dict:
    """轮询查询评论并保存到 Excel。

    Args:
        page: CDP 页面对象
        excel_file: Excel 文件路径
        feed_index: 指定文章序号(从 1 开始),None 表示查询所有未完成的
        delay: 请求间隔(秒)

    Returns:
        统计信息 {"success": count, "failed": count}
    """
    export = ExportExcel(excel_file, "")

    if not export.load():
        raise FileNotFoundError(f"Excel 文件不存在: {excel_file}")

    pending_feeds = export.get_pending_feeds()

    if feed_index is not None:
        # 只查询指定文章
        feed_index_0 = feed_index - 1  # 转为 0-based
        pending_feeds = [
            f for f in pending_feeds
            if f["row"] - 2 == feed_index_0  # row 2 = index 0
        ]
        if not pending_feeds:
            # 检查是否已完成
            row = feed_index + 1
            status = export.feeds_sheet.cell(row=row, column=8).value
            if status == STATUS_SUCCESS:
                logger.info("文章 %d 的评论已查询完成", feed_index)
                return {"success": 0, "failed": 0, "skipped": 1}
            raise ValueError(f"文章序号 {feed_index} 不存在")

    if not pending_feeds:
        logger.info("所有文章的评论都已查询完成")
        return {"success": 0, "failed": 0, "skipped": 0}

    logger.info("开始查询 %d 篇文章的评论", len(pending_feeds))

    stats = {"success": 0, "failed": 0}

    for i, feed_info in enumerate(pending_feeds):
        row = feed_info["row"]
        feed_id = feed_info["feed_id"]
        xsec_token = feed_info["xsec_token"]
        title = feed_info["title"]

        logger.info("[%d/%d] 查询文章评论: %s (row=%d)",
                    i + 1, len(pending_feeds), str(title)[:30], row)

        try:
            # 获取文章详情(含评论)
            detail = get_feed_detail(page, feed_id, xsec_token, load_all_comments=True)

            # 保存评论
            for comment in detail.comments.list_:
                export.append_comment(feed_id=feed_id, comment=comment)
                _append_sub_comments(export, feed_id, comment.sub_comments)

            # 更新状态为成功
            export.update_feed_status(row, STATUS_SUCCESS)
            stats["success"] += 1
            logger.info("文章评论查询成功: %s", str(title)[:30])

        except PageNotAccessibleError as e:
            export.update_feed_status(row, STATUS_FAILED)
            stats["failed"] += 1
            logger.warning("文章无法访问 [%s]: %s", title, e)
        except NoFeedDetailError as e:
            export.update_feed_status(row, STATUS_FAILED)
            stats["failed"] += 1
            logger.warning("获取详情失败 [%s]: %s", title, e)
        except XHSError as e:
            export.update_feed_status(row, STATUS_FAILED)
            stats["failed"] += 1
            logger.warning("查询失败 [%s]: %s", title, e)
        except Exception as e:
            export.update_feed_status(row, STATUS_FAILED)
            stats["failed"] += 1
            logger.error("未知错误 [%s]: %s", title, e)

        # 请求间隔
        if i < len(pending_feeds) - 1:
            time.sleep(delay)

    logger.info("评论查询完成: 成功=%d, 失败=%d", stats["success"], stats["failed"])
    return stats


def _append_sub_comments(
    export: ExportExcel,
    feed_id: str,
    sub_comments: list[Comment],
) -> None:
    """递归追加子评论。"""
    for comment in sub_comments:
        export.append_comment(feed_id, comment)
        if comment.sub_comments:
            _append_sub_comments(export, feed_id, comment.sub_comments)