马一丁

Race condition in singleton ChartReviewService with concurrent renderers

... ... @@ -294,16 +294,17 @@ class HTMLRenderer:
# 使用统一的 ChartReviewService 进行图表审查与修复
# 修复结果会直接回写到 document_ir,避免多次渲染重复修复
# review_document 返回本次会话的统计信息(线程安全)
chart_service = get_chart_review_service()
chart_service.review_document(
review_stats = chart_service.review_document(
self.document,
ir_file_path=ir_file_path,
reset_stats=True,
save_on_repair=bool(ir_file_path)
)
# 同步统计信息到本地(用于兼容旧的 _log_chart_validation_stats)
service_stats = chart_service.stats
self.chart_validation_stats.update(service_stats)
# 使用返回的 ReviewStats 对象,而非共享的 chart_service.stats
self.chart_validation_stats.update(review_stats.to_dict())
self.widget_scripts = []
self.chart_counter = 0
... ...
... ... @@ -40,8 +40,9 @@ class MarkdownRenderer:
# 使用统一的 ChartReviewService 进行图表审查与修复
# 虽然 Markdown 渲染时图表会降级为表格,但仍需确保数据有效
# review_document 返回本次会话的统计信息(线程安全,此处不使用)
chart_service = get_chart_review_service()
chart_service.review_document(
_ = chart_service.review_document(
self.document,
ir_file_path=ir_file_path,
reset_stats=True,
... ...
... ... @@ -173,22 +173,22 @@ class PDFRenderer:
Dict[str, Any]: 修复后的Document IR(深拷贝)
"""
# 使用统一的 ChartReviewService
# review_document 返回本次会话的统计信息(线程安全)
chart_service = get_chart_review_service()
chart_service.review_document(
review_stats = chart_service.review_document(
document_ir,
ir_file_path=ir_file_path,
reset_stats=True,
save_on_repair=bool(ir_file_path)
)
stats = chart_service.stats
if stats.get('total', 0) > 0:
repaired_count = stats.get('repaired_locally', 0) + stats.get('repaired_api', 0)
# 使用返回的 ReviewStats 对象,而非共享的 chart_service.stats
if review_stats.total > 0:
logger.info(
f"PDF图表预处理完成: "
f"总计 {stats.get('total', 0)} 个图表, "
f"修复 {repaired_count} 个, "
f"失败 {stats.get('failed', 0)} 个"
f"总计 {review_stats.total} 个图表, "
f"修复 {review_stats.repaired_total} 个, "
f"失败 {review_stats.failed} 个"
)
# 返回深拷贝,避免后续 SVG 转换过程影响回写后的原始 IR
... ...
... ... @@ -6,6 +6,7 @@ Report Engine工具模块。
from ReportEngine.utils.chart_review_service import (
ChartReviewService,
ReviewStats,
get_chart_review_service,
review_document_charts,
)
... ... @@ -21,6 +22,7 @@ from ReportEngine.utils.table_validator import (
__all__ = [
"ChartReviewService",
"ReviewStats",
"get_chart_review_service",
"review_document_charts",
"TableValidator",
... ...
... ... @@ -3,6 +3,11 @@
提供单例服务,确保所有渲染器共享修复状态,避免重复修复。
修复成功后可自动持久化到 IR 文件。
线程安全说明:
- 验证器和修复器实例是无状态的,可安全共享
- 每次 review_document 调用会创建独立的 ReviewSession
- 统计信息通过 ReviewSession 返回,避免并发竞争
"""
from __future__ import annotations
... ... @@ -10,6 +15,7 @@ from __future__ import annotations
import copy
import json
import threading
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional
... ... @@ -25,6 +31,36 @@ from ReportEngine.utils.chart_validator import (
from ReportEngine.utils.chart_repair_api import create_llm_repair_functions
@dataclass
class ReviewStats:
"""
图表审查统计信息 - 每次审查会话独立的统计数据。
通过为每次 review_document 调用创建独立的 ReviewStats 实例,
避免多线程并发时的统计数据竞争问题。
"""
total: int = 0
valid: int = 0
repaired_locally: int = 0
repaired_api: int = 0
failed: int = 0
def to_dict(self) -> Dict[str, int]:
"""转换为字典格式"""
return {
'total': self.total,
'valid': self.valid,
'repaired_locally': self.repaired_locally,
'repaired_api': self.repaired_api,
'failed': self.failed
}
@property
def repaired_total(self) -> int:
"""修复总数"""
return self.repaired_locally + self.repaired_api
class ChartReviewService:
"""
图表审查服务 - 单例模式。
... ... @@ -33,7 +69,12 @@ class ChartReviewService:
1. 统一管理图表验证和修复
2. 维护修复缓存,避免重复修复
3. 支持修复后自动持久化到 IR 文件
4. 提供统计信息
4. 提供统计信息(通过 ReviewStats 返回,线程安全)
线程安全说明:
- validator 和 repairer 是无状态的,可安全共享
- 每次 review_document 调用创建独立的 ReviewStats
- 不再使用全局 _stats,避免并发竞争
"""
_instance: Optional["ChartReviewService"] = None
... ... @@ -55,7 +96,7 @@ class ChartReviewService:
self._initialized = True
# 初始化验证器和修复器
# 初始化验证器和修复器(无状态,可安全共享)
self.validator = create_chart_validator()
self.llm_repair_fns = create_llm_repair_functions()
self.repairer = create_chart_repairer(
... ... @@ -69,31 +110,44 @@ class ChartReviewService:
else:
logger.info(f"ChartReviewService: 已配置 {len(self.llm_repair_fns)} 个 LLM 修复函数")
# 统计信息
self._stats = {
'total': 0,
'valid': 0,
'repaired_locally': 0,
'repaired_api': 0,
'failed': 0
}
# 最后一次审查的统计信息(仅用于向后兼容,不推荐在并发场景使用)
# 新代码应使用 review_document 返回的 ReviewStats
self._last_stats: Optional[ReviewStats] = None
self._last_stats_lock = threading.Lock()
logger.info("ChartReviewService 初始化完成")
def reset_stats(self) -> None:
"""重置统计信息"""
self._stats = {
'total': 0,
'valid': 0,
'repaired_locally': 0,
'repaired_api': 0,
'failed': 0
}
"""
重置统计信息(向后兼容,不推荐使用)。
注意:此方法仅用于向后兼容。在并发场景下,
应使用 review_document 返回的 ReviewStats 对象。
"""
with self._last_stats_lock:
self._last_stats = None
@property
def stats(self) -> Dict[str, int]:
"""获取统计信息副本"""
return self._stats.copy()
"""
获取最后一次审查的统计信息副本(向后兼容)。
警告:在并发场景下,此属性可能返回其他线程的统计结果。
推荐使用 review_document 返回的 ReviewStats 对象。
返回:
Dict[str, int]: 统计信息字典副本
"""
with self._last_stats_lock:
if self._last_stats is None:
return {
'total': 0,
'valid': 0,
'repaired_locally': 0,
'repaired_api': 0,
'failed': 0
}
return self._last_stats.to_dict()
def review_document(
self,
... ... @@ -102,28 +156,33 @@ class ChartReviewService:
*,
reset_stats: bool = True,
save_on_repair: bool = True
) -> Dict[str, Any]:
) -> ReviewStats:
"""
审查并修复文档中的所有图表。
遍历所有章节的 blocks,检测图表类型的 widget,
对未审查过的图表进行验证和修复。
线程安全:每次调用创建独立的 ReviewStats,避免并发竞争。
参数:
document_ir: Document IR 数据
ir_file_path: IR 文件路径,如果提供且有修复,会自动保存
reset_stats: 是否重置统计信息
reset_stats: 保留参数以保持向后兼容,不再有实际作用
save_on_repair: 修复后是否自动保存到文件
返回:
Dict[str, Any]: 审查后的 Document IR(原对象,已修改
ReviewStats: 本次审查的统计信息(线程安全
"""
if reset_stats:
self.reset_stats()
# 每次调用创建独立的统计对象,避免并发竞争
session_stats = ReviewStats()
if not document_ir:
logger.warning("ChartReviewService: document_ir 为空,跳过审查")
return document_ir
# 更新 _last_stats 以保持向后兼容
with self._last_stats_lock:
self._last_stats = session_stats
return session_stats
has_repairs = False
... ... @@ -133,27 +192,37 @@ class ChartReviewService:
continue
blocks = chapter.get("blocks", [])
if isinstance(blocks, list):
chapter_repairs = self._walk_and_review_blocks(blocks, chapter)
chapter_repairs = self._walk_and_review_blocks(blocks, chapter, session_stats)
if chapter_repairs:
has_repairs = True
# 输出统计信息
self._log_stats()
self._log_stats(session_stats)
# 更新 _last_stats 以保持向后兼容
with self._last_stats_lock:
self._last_stats = session_stats
# 如果有修复且提供了文件路径,保存到文件
if has_repairs and ir_file_path and save_on_repair:
self._save_ir_to_file(document_ir, ir_file_path)
return document_ir
return session_stats
def _walk_and_review_blocks(
self,
blocks: List[Any],
chapter_context: Dict[str, Any] | None = None
chapter_context: Dict[str, Any] | None,
session_stats: ReviewStats
) -> bool:
"""
递归遍历 blocks 并审查图表。
参数:
blocks: 要遍历的 block 列表
chapter_context: 章节上下文
session_stats: 本次审查会话的统计对象
返回:
bool: 是否有修复发生
"""
... ... @@ -165,21 +234,21 @@ class ChartReviewService:
# 检查是否是图表 widget
if block.get("type") == "widget":
repaired = self._review_chart_block(block, chapter_context)
repaired = self._review_chart_block(block, chapter_context, session_stats)
if repaired:
has_repairs = True
# 递归处理嵌套的 blocks
nested_blocks = block.get("blocks")
if isinstance(nested_blocks, list):
if self._walk_and_review_blocks(nested_blocks, chapter_context):
if self._walk_and_review_blocks(nested_blocks, chapter_context, session_stats):
has_repairs = True
# 处理 list 类型的 items
if block.get("type") == "list":
for item in block.get("items", []):
if isinstance(item, list):
if self._walk_and_review_blocks(item, chapter_context):
if self._walk_and_review_blocks(item, chapter_context, session_stats):
has_repairs = True
# 处理 table 类型的 cells
... ... @@ -191,7 +260,7 @@ class ChartReviewService:
if isinstance(cell, dict):
cell_blocks = cell.get("blocks", [])
if isinstance(cell_blocks, list):
if self._walk_and_review_blocks(cell_blocks, chapter_context):
if self._walk_and_review_blocks(cell_blocks, chapter_context, session_stats):
has_repairs = True
return has_repairs
... ... @@ -199,11 +268,17 @@ class ChartReviewService:
def _review_chart_block(
self,
block: Dict[str, Any],
chapter_context: Dict[str, Any] | None = None
chapter_context: Dict[str, Any] | None,
session_stats: ReviewStats
) -> bool:
"""
审查单个图表 block。
参数:
block: 要审查的 block
chapter_context: 章节上下文
session_stats: 本次审查会话的统计对象
返回:
bool: 是否进行了修复
"""
... ... @@ -225,11 +300,11 @@ class ChartReviewService:
logger.debug(f"图表 {widget_id} 已审查过,跳过")
return False
self._stats['total'] += 1
session_stats.total += 1
# 词云直接标记为有效
if is_wordcloud:
self._stats['valid'] += 1
session_stats.valid += 1
block["_chart_reviewed"] = True
block["_chart_review_status"] = "valid"
block["_chart_review_method"] = "none"
... ... @@ -243,7 +318,7 @@ class ChartReviewService:
if validation_result.is_valid:
# 验证通过
self._stats['valid'] += 1
session_stats.valid += 1
block["_chart_reviewed"] = True
block["_chart_review_status"] = "valid"
block["_chart_review_method"] = "none"
... ... @@ -269,9 +344,9 @@ class ChartReviewService:
method = repair_result.method or "local"
if method == "local":
self._stats['repaired_locally'] += 1
session_stats.repaired_locally += 1
elif method == "api":
self._stats['repaired_api'] += 1
session_stats.repaired_api += 1
block["_chart_reviewed"] = True
block["_chart_review_status"] = "repaired"
... ... @@ -281,7 +356,7 @@ class ChartReviewService:
return True
# 修复失败
self._stats['failed'] += 1
session_stats.failed += 1
block["_chart_reviewed"] = True
block["_chart_renderable"] = False
block["_chart_review_status"] = "failed"
... ... @@ -412,19 +487,18 @@ class ChartReviewService:
return "验证失败但无具体错误信息"
return "; ".join(errors[:3])
def _log_stats(self) -> None:
def _log_stats(self, stats: ReviewStats) -> None:
"""输出统计信息"""
if self._stats['total'] == 0:
if stats.total == 0:
logger.debug("ChartReviewService: 没有图表需要审查")
return
repaired = self._stats['repaired_locally'] + self._stats['repaired_api']
logger.info(
f"ChartReviewService 图表审查完成: "
f"总计 {self._stats['total']} 个, "
f"有效 {self._stats['valid']} 个, "
f"修复 {repaired} 个 (本地 {self._stats['repaired_locally']}, API {self._stats['repaired_api']}), "
f"失败 {self._stats['failed']} 个"
f"总计 {stats.total} 个, "
f"有效 {stats.valid} 个, "
f"修复 {stats.repaired_total} 个 (本地 {stats.repaired_locally}, API {stats.repaired_api}), "
f"失败 {stats.failed} 个"
)
# 内部元数据键,不应保存到 IR 文件
... ... @@ -526,18 +600,18 @@ def review_document_charts(
*,
reset_stats: bool = True,
save_on_repair: bool = True
) -> Dict[str, Any]:
) -> ReviewStats:
"""
便捷函数:审查并修复文档中的所有图表。
参数:
document_ir: Document IR 数据
ir_file_path: IR 文件路径,如果提供且有修复,会自动保存
reset_stats: 是否重置统计信息
reset_stats: 保留参数以保持向后兼容,不再有实际作用
save_on_repair: 修复后是否自动保存到文件
返回:
Dict[str, Any]: 审查后的 Document IR
ReviewStats: 本次审查的统计信息
"""
service = get_chart_review_service()
return service.review_document(
... ... @@ -550,6 +624,7 @@ def review_document_charts(
__all__ = [
"ChartReviewService",
"ReviewStats",
"get_chart_review_service",
"review_document_charts",
]
... ...