Toggle navigation
Toggle navigation
This project
Loading...
Sign in
万朱浩
/
Venue-Ops
Go to a project
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
马一丁
2025-12-16 11:42:05 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
6d98e9359c883b657edf6c3a122fb5ea63aabc53
6d98e935
1 parent
a371cdf7
Improve chart repair logic
Show whitespace changes
Inline
Side-by-side
Showing
6 changed files
with
1580 additions
and
0 deletions
ReportEngine/nodes/chapter_generation_node.py
ReportEngine/renderers/markdown_renderer.py
ReportEngine/scripts/validate_ir.py
ReportEngine/utils/__init__.py
ReportEngine/utils/chart_repair_api.py
ReportEngine/utils/table_validator.py
ReportEngine/nodes/chapter_generation_node.py
View file @
6d98e93
...
...
@@ -1270,18 +1270,73 @@ class ChapterGenerationNode(BaseNode):
normalized_cells
:
List
[
Dict
[
str
,
Any
]]
=
[]
for
cell
in
cell_entries
:
#
检测错误嵌套的
cells
结构:有
cells
但没有
blocks
#
需要展平成多个独立的
cells
if
isinstance
(
cell
,
dict
)
and
"
cells
"
in
cell
and
"
blocks
"
not
in
cell
:
flattened
=
self._flatten_all_nested_cells
(
cell
)
normalized_cells.extend
(
flattened
)
else
:
sanitized
=
self._normalize_table_cell
(
cell
)
if
sanitized
:
normalized_cells.append
(
sanitized
)
return
normalized_cells
def
_flatten_all_nested_cells
(
self
,
cell
:
Dict
[
str
,
Any
])
->
List
[
Dict
[
str
,
Any
]]:
"""
展平错误嵌套的
cells
结构,返回所有展平后的
cells
。
LLM
有时会生成类似这样的错误结构:
{
"
cells
":
[
{
"
blocks
":
[
...
]
},
{
"
cells
":
[
{
"
blocks
":
[
...
]
},
{
"
cells
":
[
...
]
}
]
}
]
}
应该展平为独立的
cells
列表。
"""
nested_cells
=
cell.get
("
cells
")
if
not
isinstance
(
nested_cells
,
list
)
or
not
nested_cells
:
return
[{"
blocks
":
[
self._as_paragraph_block
("")]}]
result
:
List
[
Dict
[
str
,
Any
]]
=
[]
for
nested
in
nested_cells
:
if
isinstance
(
nested
,
dict
):
if
"
blocks
"
in
nested
and
"
cells
"
not
in
nested
:
#
正常的
cell
,直接规范化添加
sanitized
=
self._normalize_table_cell
(
nested
)
if
sanitized
:
result.append
(
sanitized
)
elif
"
cells
"
in
nested
and
"
blocks
"
not
in
nested
:
#
继续递归展平嵌套的
cells
result.extend
(
self._flatten_all_nested_cells
(
nested
))
else
:
#
其他情况,尝试规范化
sanitized
=
self._normalize_table_cell
(
nested
)
if
sanitized
:
result.append
(
sanitized
)
elif
isinstance
(
nested
,
(
str
,
int
,
float
)):
result.append
({"
blocks
":
[
self._as_paragraph_block
(
str
(
nested
))]})
return
result
if
result
else
[{"
blocks
":
[
self._as_paragraph_block
("")]}]
def
_normalize_table_cell
(
self
,
cell
:
Any
)
->
Dict
[
str
,
Any
]
|
None
:
"""把各种单元格写法规整为
schema
认可的形式"""
if
cell
is
None
:
return
{"
blocks
":
[
self._as_paragraph_block
("")]}
if
isinstance
(
cell
,
dict
):
#
检测错误嵌套的
cells
结构:有
cells
但没有
blocks
#
这是
LLM
常见的错误,把同级
cell
嵌套进了
cells
数组
if
"
cells
"
in
cell
and
"
blocks
"
not
in
cell
:
#
展平嵌套的
cells
并返回第一个有效
cell
#
注意:其余嵌套的
cells
会在
_normalize_table_cells
中被处理
return
self._flatten_nested_cell
(
cell
)
normalized
=
dict
(
cell
)
blocks
=
self._coerce_cell_blocks
(
normalized.get
("
blocks
"),
normalized
)
elif
isinstance
(
cell
,
list
):
...
...
@@ -1297,6 +1352,40 @@ class ChapterGenerationNode(BaseNode):
normalized
["
blocks
"]
=
blocks
or
[
self._as_paragraph_block
("")]
return
normalized
def
_flatten_nested_cell
(
self
,
cell
:
Dict
[
str
,
Any
])
->
Dict
[
str
,
Any
]:
"""
展平错误嵌套的
cell
结构。
LLM
有时会生成类似这样的错误结构:
{
"
cells
":
[
{
"
blocks
":
[
...
]
},
{
"
cells
":
[
...
]
}
]
}
应该返回第一个有效的
cell
内容。
"""
nested_cells
=
cell.get
("
cells
")
if
not
isinstance
(
nested_cells
,
list
)
or
not
nested_cells
:
#
没有有效的嵌套内容,返回空
cell
return
{"
blocks
":
[
self._as_paragraph_block
("")]}
#
递归查找第一个包含
blocks
的有效
cell
for
nested
in
nested_cells
:
if
isinstance
(
nested
,
dict
):
if
"
blocks
"
in
nested
:
#
找到有效
cell
,递归规范化
return
self._normalize_table_cell
(
nested
)
elif
"
cells
"
in
nested
:
#
继续递归展平
result
=
self._flatten_nested_cell
(
nested
)
if
result
:
return
result
#
没有找到有效内容,尝试从第一个嵌套元素提取文本
first_nested
=
nested_cells
[0]
if
isinstance
(
first_nested
,
dict
):
text
=
self._extract_block_text
(
first_nested
)
return
{"
blocks
":
[
self._as_paragraph_block
(
text
or
"")]}
return
{"
blocks
":
[
self._as_paragraph_block
("")]}
def
_coerce_cell_blocks
(
self
,
blocks
:
Any
,
source
:
Dict
[
str
,
Any
]
|
None
)
->
List
[
Dict
[
str
,
Any
]]:
...
...
ReportEngine/renderers/markdown_renderer.py
View file @
6d98e93
...
...
@@ -605,6 +605,8 @@ class MarkdownRenderer:
elif
isinstance
(
data_field
,
dict
):
if
isinstance
(
data_field
.
get
(
"items"
),
list
):
candidates
.
append
(
data_field
.
get
(
"items"
))
if
isinstance
(
data_field
.
get
(
"words"
),
list
):
candidates
.
append
(
data_field
.
get
(
"words"
))
items
:
List
[
Dict
[
str
,
Any
]]
=
[]
seen
:
set
[
str
]
=
set
()
...
...
ReportEngine/scripts/validate_ir.py
0 → 100644
View file @
6d98e93
#!/usr/bin/env python3
"""
IR 文档验证工具。
命令行工具,用于:
- 扫描指定 JSON 文件中的所有图表和表格
- 报告结构问题和数据缺失
- 支持自动修复常见问题
- 支持批量处理
使用方法:
python -m ReportEngine.scripts.validate_ir chapter-030-section-3-0.json
python -m ReportEngine.scripts.validate_ir *.json --fix
python -m ReportEngine.scripts.validate_ir ./output/ --recursive --fix --verbose
"""
from
__future__
import
annotations
import
argparse
import
json
import
sys
from
pathlib
import
Path
from
typing
import
Any
,
Dict
,
List
,
Optional
,
Tuple
from
dataclasses
import
dataclass
,
field
# 添加项目根目录到路径
project_root
=
Path
(
__file__
)
.
parent
.
parent
.
parent
if
str
(
project_root
)
not
in
sys
.
path
:
sys
.
path
.
insert
(
0
,
str
(
project_root
))
from
loguru
import
logger
from
ReportEngine.utils.chart_validator
import
(
ChartValidator
,
ChartRepairer
,
ValidationResult
,
)
from
ReportEngine.utils.table_validator
import
(
TableValidator
,
TableRepairer
,
TableValidationResult
,
)
@dataclass
class
BlockIssue
:
"""单个 block 的问题"""
block_type
:
str
block_id
:
str
path
:
str
errors
:
List
[
str
]
=
field
(
default_factory
=
list
)
warnings
:
List
[
str
]
=
field
(
default_factory
=
list
)
is_fixable
:
bool
=
False
@dataclass
class
DocumentReport
:
"""文档验证报告"""
file_path
:
str
total_blocks
:
int
=
0
chart_count
:
int
=
0
table_count
:
int
=
0
wordcloud_count
:
int
=
0
issues
:
List
[
BlockIssue
]
=
field
(
default_factory
=
list
)
fixed_count
:
int
=
0
@property
def
has_issues
(
self
)
->
bool
:
return
len
(
self
.
issues
)
>
0
@property
def
error_count
(
self
)
->
int
:
return
sum
(
len
(
issue
.
errors
)
for
issue
in
self
.
issues
)
@property
def
warning_count
(
self
)
->
int
:
return
sum
(
len
(
issue
.
warnings
)
for
issue
in
self
.
issues
)
class
IRValidator
:
"""IR 文档验证器"""
def
__init__
(
self
,
chart_validator
:
Optional
[
ChartValidator
]
=
None
,
table_validator
:
Optional
[
TableValidator
]
=
None
,
chart_repairer
:
Optional
[
ChartRepairer
]
=
None
,
table_repairer
:
Optional
[
TableRepairer
]
=
None
,
):
self
.
chart_validator
=
chart_validator
or
ChartValidator
()
self
.
table_validator
=
table_validator
or
TableValidator
()
self
.
chart_repairer
=
chart_repairer
or
ChartRepairer
(
self
.
chart_validator
)
self
.
table_repairer
=
table_repairer
or
TableRepairer
(
self
.
table_validator
)
def
validate_document
(
self
,
document
:
Dict
[
str
,
Any
],
file_path
:
str
=
"<unknown>"
,
)
->
DocumentReport
:
"""
验证整个文档。
Args:
document: IR 文档数据
file_path: 文件路径(用于报告)
Returns:
DocumentReport: 验证报告
"""
report
=
DocumentReport
(
file_path
=
file_path
)
# 遍历所有章节
chapters
=
document
.
get
(
"chapters"
,
[])
for
chapter_idx
,
chapter
in
enumerate
(
chapters
):
if
not
isinstance
(
chapter
,
dict
):
continue
chapter_id
=
chapter
.
get
(
"chapterId"
,
f
"chapter-{chapter_idx}"
)
blocks
=
chapter
.
get
(
"blocks"
,
[])
self
.
_validate_blocks
(
blocks
,
f
"chapters[{chapter_idx}].blocks"
,
chapter_id
,
report
,
)
return
report
def
_validate_blocks
(
self
,
blocks
:
List
[
Any
],
path
:
str
,
chapter_id
:
str
,
report
:
DocumentReport
,
):
"""递归验证 blocks 列表"""
if
not
isinstance
(
blocks
,
list
):
return
for
idx
,
block
in
enumerate
(
blocks
):
if
not
isinstance
(
block
,
dict
):
continue
report
.
total_blocks
+=
1
block_path
=
f
"{path}[{idx}]"
block_type
=
block
.
get
(
"type"
,
""
)
block_id
=
block
.
get
(
"widgetId"
)
or
block
.
get
(
"id"
)
or
f
"block-{idx}"
# 根据类型验证
if
block_type
==
"widget"
:
widget_type
=
(
block
.
get
(
"widgetType"
)
or
""
)
.
lower
()
if
"chart.js"
in
widget_type
:
report
.
chart_count
+=
1
self
.
_validate_chart
(
block
,
block_path
,
block_id
,
report
)
elif
"wordcloud"
in
widget_type
:
report
.
wordcloud_count
+=
1
self
.
_validate_wordcloud
(
block
,
block_path
,
block_id
,
report
)
elif
block_type
==
"table"
:
report
.
table_count
+=
1
self
.
_validate_table
(
block
,
block_path
,
block_id
,
report
)
# 递归处理嵌套 blocks
nested_blocks
=
block
.
get
(
"blocks"
)
if
isinstance
(
nested_blocks
,
list
):
self
.
_validate_blocks
(
nested_blocks
,
f
"{block_path}.blocks"
,
chapter_id
,
report
)
# 处理 table rows 中的 blocks
if
block_type
==
"table"
:
rows
=
block
.
get
(
"rows"
,
[])
for
row_idx
,
row
in
enumerate
(
rows
):
if
isinstance
(
row
,
dict
):
cells
=
row
.
get
(
"cells"
,
[])
for
cell_idx
,
cell
in
enumerate
(
cells
):
if
isinstance
(
cell
,
dict
):
cell_blocks
=
cell
.
get
(
"blocks"
,
[])
self
.
_validate_blocks
(
cell_blocks
,
f
"{block_path}.rows[{row_idx}].cells[{cell_idx}].blocks"
,
chapter_id
,
report
,
)
# 处理 list items 中的 blocks
if
block_type
==
"list"
:
items
=
block
.
get
(
"items"
,
[])
for
item_idx
,
item
in
enumerate
(
items
):
if
isinstance
(
item
,
list
):
self
.
_validate_blocks
(
item
,
f
"{block_path}.items[{item_idx}]"
,
chapter_id
,
report
,
)
def
_validate_chart
(
self
,
block
:
Dict
[
str
,
Any
],
path
:
str
,
block_id
:
str
,
report
:
DocumentReport
,
):
"""验证图表"""
result
=
self
.
chart_validator
.
validate
(
block
)
if
not
result
.
is_valid
or
result
.
warnings
:
issue
=
BlockIssue
(
block_type
=
"chart"
,
block_id
=
block_id
,
path
=
path
,
errors
=
result
.
errors
,
warnings
=
result
.
warnings
,
is_fixable
=
result
.
has_critical_errors
(),
)
report
.
issues
.
append
(
issue
)
def
_validate_table
(
self
,
block
:
Dict
[
str
,
Any
],
path
:
str
,
block_id
:
str
,
report
:
DocumentReport
,
):
"""验证表格"""
result
=
self
.
table_validator
.
validate
(
block
)
if
not
result
.
is_valid
or
result
.
warnings
or
result
.
nested_cells_detected
:
issue
=
BlockIssue
(
block_type
=
"table"
,
block_id
=
block_id
,
path
=
path
,
errors
=
result
.
errors
,
warnings
=
result
.
warnings
,
is_fixable
=
result
.
nested_cells_detected
or
result
.
has_critical_errors
(),
)
# 添加嵌套 cells 警告
if
result
.
nested_cells_detected
:
issue
.
warnings
.
insert
(
0
,
"检测到嵌套 cells 结构(LLM 常见错误)"
)
# 添加空单元格信息
if
result
.
empty_cells_count
>
0
:
issue
.
warnings
.
append
(
f
"空单元格数量: {result.empty_cells_count}/{result.total_cells_count}"
)
report
.
issues
.
append
(
issue
)
def
_validate_wordcloud
(
self
,
block
:
Dict
[
str
,
Any
],
path
:
str
,
block_id
:
str
,
report
:
DocumentReport
,
):
"""验证词云"""
errors
:
List
[
str
]
=
[]
warnings
:
List
[
str
]
=
[]
# 检查数据结构
data
=
block
.
get
(
"data"
)
props
=
block
.
get
(
"props"
,
{})
words_found
=
False
words_count
=
0
# 检查各种可能的词云数据路径
data_paths
=
[
(
"data.words"
,
data
.
get
(
"words"
)
if
isinstance
(
data
,
dict
)
else
None
),
(
"data.items"
,
data
.
get
(
"items"
)
if
isinstance
(
data
,
dict
)
else
None
),
(
"data"
,
data
if
isinstance
(
data
,
list
)
else
None
),
(
"props.words"
,
props
.
get
(
"words"
)
if
isinstance
(
props
,
dict
)
else
None
),
(
"props.items"
,
props
.
get
(
"items"
)
if
isinstance
(
props
,
dict
)
else
None
),
(
"props.data"
,
props
.
get
(
"data"
)
if
isinstance
(
props
,
dict
)
else
None
),
]
for
path_name
,
value
in
data_paths
:
if
isinstance
(
value
,
list
)
and
len
(
value
)
>
0
:
words_found
=
True
words_count
=
len
(
value
)
# 验证词云项格式
for
idx
,
item
in
enumerate
(
value
[:
5
]):
# 只检查前5个
if
isinstance
(
item
,
dict
):
word
=
item
.
get
(
"word"
)
or
item
.
get
(
"text"
)
or
item
.
get
(
"label"
)
weight
=
item
.
get
(
"weight"
)
or
item
.
get
(
"value"
)
if
not
word
:
warnings
.
append
(
f
"{path_name}[{idx}] 缺少 word/text/label 字段"
)
if
weight
is
None
:
warnings
.
append
(
f
"{path_name}[{idx}] 缺少 weight/value 字段"
)
elif
not
isinstance
(
item
,
(
str
,
list
,
tuple
)):
warnings
.
append
(
f
"{path_name}[{idx}] 格式不正确"
)
break
if
not
words_found
:
errors
.
append
(
"词云数据缺失:未在 data.words, data.items, props.words 等路径找到有效数据"
)
elif
words_count
==
0
:
warnings
.
append
(
"词云数据为空"
)
if
errors
or
warnings
:
issue
=
BlockIssue
(
block_type
=
"wordcloud"
,
block_id
=
block_id
,
path
=
path
,
errors
=
errors
,
warnings
=
warnings
,
is_fixable
=
False
,
# 词云数据缺失通常无法自动修复
)
report
.
issues
.
append
(
issue
)
def
repair_document
(
self
,
document
:
Dict
[
str
,
Any
],
report
:
DocumentReport
,
)
->
Tuple
[
Dict
[
str
,
Any
],
int
]:
"""
修复文档中的问题。
Args:
document: IR 文档数据
report: 验证报告
Returns:
Tuple[Dict[str, Any], int]: (修复后的文档, 修复数量)
"""
fixed_count
=
0
# 遍历所有章节
chapters
=
document
.
get
(
"chapters"
,
[])
for
chapter
in
chapters
:
if
not
isinstance
(
chapter
,
dict
):
continue
blocks
=
chapter
.
get
(
"blocks"
,
[])
chapter
[
"blocks"
],
chapter_fixed
=
self
.
_repair_blocks
(
blocks
)
fixed_count
+=
chapter_fixed
return
document
,
fixed_count
def
_repair_blocks
(
self
,
blocks
:
List
[
Any
],
)
->
Tuple
[
List
[
Any
],
int
]:
"""递归修复 blocks 列表"""
if
not
isinstance
(
blocks
,
list
):
return
blocks
,
0
fixed_count
=
0
repaired_blocks
:
List
[
Any
]
=
[]
for
block
in
blocks
:
if
not
isinstance
(
block
,
dict
):
repaired_blocks
.
append
(
block
)
continue
block_type
=
block
.
get
(
"type"
,
""
)
# 修复表格
if
block_type
==
"table"
:
result
=
self
.
table_repairer
.
repair
(
block
)
if
result
.
has_changes
():
block
=
result
.
repaired_block
fixed_count
+=
1
logger
.
info
(
f
"修复表格: {result.changes}"
)
# 修复图表
elif
block_type
==
"widget"
:
widget_type
=
(
block
.
get
(
"widgetType"
)
or
""
)
.
lower
()
if
"chart.js"
in
widget_type
:
result
=
self
.
chart_repairer
.
repair
(
block
)
if
result
.
has_changes
():
block
=
result
.
repaired_block
fixed_count
+=
1
logger
.
info
(
f
"修复图表: {result.changes}"
)
# 递归处理嵌套 blocks
nested_blocks
=
block
.
get
(
"blocks"
)
if
isinstance
(
nested_blocks
,
list
):
block
[
"blocks"
],
nested_fixed
=
self
.
_repair_blocks
(
nested_blocks
)
fixed_count
+=
nested_fixed
# 处理 table rows 中的 blocks
if
block_type
==
"table"
:
rows
=
block
.
get
(
"rows"
,
[])
for
row
in
rows
:
if
isinstance
(
row
,
dict
):
cells
=
row
.
get
(
"cells"
,
[])
for
cell
in
cells
:
if
isinstance
(
cell
,
dict
):
cell_blocks
=
cell
.
get
(
"blocks"
,
[])
cell
[
"blocks"
],
cell_fixed
=
self
.
_repair_blocks
(
cell_blocks
)
fixed_count
+=
cell_fixed
# 处理 list items 中的 blocks
if
block_type
==
"list"
:
items
=
block
.
get
(
"items"
,
[])
for
i
,
item
in
enumerate
(
items
):
if
isinstance
(
item
,
list
):
items
[
i
],
item_fixed
=
self
.
_repair_blocks
(
item
)
fixed_count
+=
item_fixed
repaired_blocks
.
append
(
block
)
return
repaired_blocks
,
fixed_count
def
print_report
(
report
:
DocumentReport
,
verbose
:
bool
=
False
):
"""打印验证报告"""
print
(
f
"
\n
{'=' * 60}"
)
print
(
f
"文件: {report.file_path}"
)
print
(
f
"{'=' * 60}"
)
print
(
f
"
\n
📊 统计:"
)
print
(
f
" - 总 blocks: {report.total_blocks}"
)
print
(
f
" - 图表数量: {report.chart_count}"
)
print
(
f
" - 表格数量: {report.table_count}"
)
print
(
f
" - 词云数量: {report.wordcloud_count}"
)
if
report
.
has_issues
:
print
(
f
"
\n
⚠️ 发现 {len(report.issues)} 个问题:"
)
print
(
f
" - 错误: {report.error_count}"
)
print
(
f
" - 警告: {report.warning_count}"
)
if
verbose
:
for
issue
in
report
.
issues
:
print
(
f
"
\n
[{issue.block_type}] {issue.block_id}"
)
print
(
f
" 路径: {issue.path}"
)
if
issue
.
errors
:
for
error
in
issue
.
errors
:
print
(
f
" ❌ {error}"
)
if
issue
.
warnings
:
for
warning
in
issue
.
warnings
:
print
(
f
" ⚠️ {warning}"
)
if
issue
.
is_fixable
:
print
(
f
" 🔧 可自动修复"
)
else
:
print
(
f
"
\n
✅ 未发现问题"
)
if
report
.
fixed_count
>
0
:
print
(
f
"
\n
🔧 已修复 {report.fixed_count} 个问题"
)
def
validate_file
(
file_path
:
Path
,
validator
:
IRValidator
,
fix
:
bool
=
False
,
verbose
:
bool
=
False
,
)
->
DocumentReport
:
"""验证单个文件"""
try
:
with
open
(
file_path
,
"r"
,
encoding
=
"utf-8"
)
as
f
:
document
=
json
.
load
(
f
)
except
json
.
JSONDecodeError
as
e
:
logger
.
error
(
f
"JSON 解析错误: {file_path}: {e}"
)
report
=
DocumentReport
(
file_path
=
str
(
file_path
))
report
.
issues
.
append
(
BlockIssue
(
block_type
=
"document"
,
block_id
=
"root"
,
path
=
""
,
errors
=
[
f
"JSON 解析错误: {e}"
],
))
return
report
except
Exception
as
e
:
logger
.
error
(
f
"读取文件错误: {file_path}: {e}"
)
report
=
DocumentReport
(
file_path
=
str
(
file_path
))
report
.
issues
.
append
(
BlockIssue
(
block_type
=
"document"
,
block_id
=
"root"
,
path
=
""
,
errors
=
[
f
"读取文件错误: {e}"
],
))
return
report
# 验证文档
report
=
validator
.
validate_document
(
document
,
str
(
file_path
))
# 修复问题
if
fix
and
report
.
has_issues
:
fixable_issues
=
[
i
for
i
in
report
.
issues
if
i
.
is_fixable
]
if
fixable_issues
:
logger
.
info
(
f
"尝试修复 {len(fixable_issues)} 个问题..."
)
document
,
fixed_count
=
validator
.
repair_document
(
document
,
report
)
report
.
fixed_count
=
fixed_count
if
fixed_count
>
0
:
# 保存修复后的文件
backup_path
=
file_path
.
with_suffix
(
f
".bak{file_path.suffix}"
)
try
:
# 创建备份
import
shutil
shutil
.
copy
(
file_path
,
backup_path
)
logger
.
info
(
f
"已创建备份: {backup_path}"
)
# 保存修复后的文件
with
open
(
file_path
,
"w"
,
encoding
=
"utf-8"
)
as
f
:
json
.
dump
(
document
,
f
,
ensure_ascii
=
False
,
indent
=
2
)
logger
.
info
(
f
"已保存修复后的文件: {file_path}"
)
except
Exception
as
e
:
logger
.
error
(
f
"保存文件失败: {e}"
)
return
report
def
main
():
"""主函数"""
parser
=
argparse
.
ArgumentParser
(
description
=
"IR 文档验证工具"
,
formatter_class
=
argparse
.
RawDescriptionHelpFormatter
,
epilog
=
"""
示例:
%(prog)
s chapter-030-section-3-0.json
%(prog)
s *.json --fix
%(prog)
s ./output/ --recursive --fix --verbose
"""
,
)
parser
.
add_argument
(
"paths"
,
nargs
=
"+"
,
help
=
"要验证的 JSON 文件或目录"
,
)
parser
.
add_argument
(
"-r"
,
"--recursive"
,
action
=
"store_true"
,
help
=
"递归处理目录"
,
)
parser
.
add_argument
(
"-f"
,
"--fix"
,
action
=
"store_true"
,
help
=
"自动修复常见问题"
,
)
parser
.
add_argument
(
"-v"
,
"--verbose"
,
action
=
"store_true"
,
help
=
"显示详细信息"
,
)
parser
.
add_argument
(
"--no-color"
,
action
=
"store_true"
,
help
=
"禁用彩色输出"
,
)
args
=
parser
.
parse_args
()
# 配置日志
logger
.
remove
()
if
args
.
verbose
:
logger
.
add
(
sys
.
stderr
,
level
=
"DEBUG"
)
else
:
logger
.
add
(
sys
.
stderr
,
level
=
"INFO"
)
# 收集文件
files
:
List
[
Path
]
=
[]
for
path_str
in
args
.
paths
:
path
=
Path
(
path_str
)
if
path
.
is_file
():
if
path
.
suffix
.
lower
()
==
".json"
:
files
.
append
(
path
)
elif
path
.
is_dir
():
if
args
.
recursive
:
files
.
extend
(
path
.
rglob
(
"*.json"
))
else
:
files
.
extend
(
path
.
glob
(
"*.json"
))
else
:
# 可能是 glob 模式
import
glob
matched
=
glob
.
glob
(
path_str
)
for
m
in
matched
:
mp
=
Path
(
m
)
if
mp
.
is_file
()
and
mp
.
suffix
.
lower
()
==
".json"
:
files
.
append
(
mp
)
if
not
files
:
print
(
"未找到 JSON 文件"
)
sys
.
exit
(
1
)
print
(
f
"找到 {len(files)} 个文件"
)
# 创建验证器
validator
=
IRValidator
()
# 验证文件
total_issues
=
0
total_fixed
=
0
reports
:
List
[
DocumentReport
]
=
[]
for
file_path
in
files
:
report
=
validate_file
(
file_path
,
validator
,
args
.
fix
,
args
.
verbose
)
reports
.
append
(
report
)
total_issues
+=
len
(
report
.
issues
)
total_fixed
+=
report
.
fixed_count
if
args
.
verbose
or
report
.
has_issues
:
print_report
(
report
,
args
.
verbose
)
# 打印总结
print
(
f
"
\n
{'=' * 60}"
)
print
(
"总结"
)
print
(
f
"{'=' * 60}"
)
print
(
f
" - 文件数: {len(files)}"
)
print
(
f
" - 问题总数: {total_issues}"
)
if
args
.
fix
:
print
(
f
" - 已修复: {total_fixed}"
)
# 返回适当的退出码
if
total_issues
>
0
and
total_fixed
<
total_issues
:
sys
.
exit
(
1
)
sys
.
exit
(
0
)
if
__name__
==
"__main__"
:
main
()
...
...
ReportEngine/utils/__init__.py
View file @
6d98e93
...
...
@@ -10,8 +10,23 @@ from ReportEngine.utils.chart_review_service import (
review_document_charts
,
)
from
ReportEngine.utils.table_validator
import
(
TableValidator
,
TableRepairer
,
TableValidationResult
,
TableRepairResult
,
create_table_validator
,
create_table_repairer
,
)
__all__
=
[
"ChartReviewService"
,
"get_chart_review_service"
,
"review_document_charts"
,
"TableValidator"
,
"TableRepairer"
,
"TableValidationResult"
,
"TableRepairResult"
,
"create_table_validator"
,
"create_table_repairer"
,
]
...
...
ReportEngine/utils/chart_repair_api.py
View file @
6d98e93
...
...
@@ -83,6 +83,243 @@ CHART_REPAIR_SYSTEM_PROMPT = """你是一个专业的图表数据修复助手。
"""
# 表格修复提示词
TABLE_REPAIR_SYSTEM_PROMPT
=
"""你是一个专业的表格数据修复助手。你的任务是修复IR表格数据中的格式错误,确保表格能够正常渲染。
**标准表格数据格式:**
```json
{
"type": "table",
"rows": [
{
"cells": [
{
"header": true,
"blocks": [
{
"type": "paragraph",
"inlines": [{"text": "列标题", "marks": []}]
}
]
},
{
"header": true,
"blocks": [
{
"type": "paragraph",
"inlines": [{"text": "另一列", "marks": []}]
}
]
}
]
},
{
"cells": [
{
"blocks": [
{
"type": "paragraph",
"inlines": [{"text": "数据内容", "marks": []}]
}
]
},
{
"blocks": [
{
"type": "paragraph",
"inlines": [{"text": "另一数据", "marks": []}]
}
]
}
]
}
]
}
```
**⚠️ 常见错误:嵌套 cells 结构**
这是一个非常常见的错误,LLM 经常把同级的 cells 错误地嵌套起来:
❌ **错误示例:**
```json
{
"cells": [
{ "blocks": [...], "colspan": 1 },
{ "cells": [
{ "blocks": [...] },
{ "cells": [...] }
]
}
]
}
```
✅ **正确格式:**
```json
{
"cells": [
{ "blocks": [...], "colspan": 1 },
{ "blocks": [...] },
{ "blocks": [...] }
]
}
```
**修复原则:**
1. **展平嵌套 cells** - 将错误嵌套的 cells 展平为同级
2. **确保每个 cell 有 blocks** - 每个单元格必须有 blocks 数组
3. **blocks 内使用 paragraph** - 文本内容应放在 paragraph block 内
4. **保持数据完整性** - 不要丢失原始内容
**修复方法:**
1. 嵌套 cells 结构 → 展平为同级 cells 数组
2. 缺少 blocks 字段 → 添加包含 paragraph 的 blocks
3. 空 cells 数组 → 添加默认空单元格
4. 非法 cell 类型 → 转换为标准格式
请根据错误信息修复表格数据,并返回修复后的完整 table block(JSON格式)。
"""
# 词云修复提示词
WORDCLOUD_REPAIR_SYSTEM_PROMPT
=
"""你是一个专业的词云数据修复助手。你的任务是修复词云 widget 数据中的格式错误,确保词云能够正常渲染。
**标准词云数据格式:**
```json
{
"type": "widget",
"widgetType": "wordcloud",
"widgetId": "wordcloud-001",
"title": "词云标题",
"data": {
"words": [
{"text": "关键词1", "weight": 10},
{"text": "关键词2", "weight": 8},
{"text": "关键词3", "weight": 6}
]
}
}
```
**⚠️ 数据路径说明:**
词云数据可以位于以下路径(按优先级):
1. `data.words` - 推荐路径
2. `data.items` - 备选路径
3. `props.words` - 备选路径
4. `props.items` - 备选路径
5. `props.data` - 备选路径
**词云项目格式:**
每个词云项目应该是一个对象,包含:
- `text` 或 `word` 或 `label`: 词语文本(必需)
- `weight` 或 `value`: 权重/频率(必需)
- `category`: 类别(可选)
**修复原则:**
1. **规范化数据路径** - 优先使用 `data.words`
2. **确保必需字段** - 每个词项必须有文本和权重
3. **转换兼容格式** - 将其他格式转换为标准格式
4. **保持数据完整性** - 不要丢失原始词语
**常见错误及修复方法:**
1. 数据位于错误路径 → 移动到 `data.words`
2. 缺少 weight 字段 → 根据位置生成默认权重
3. 使用 word 而非 text → 统一为 text 字段
4. 数组元素是字符串 → 转换为对象格式
请根据错误信息修复词云数据,并返回修复后的完整 widget block(JSON格式)。
"""
def
build_table_repair_prompt
(
table_block
:
Dict
[
str
,
Any
],
validation_errors
:
List
[
str
]
)
->
str
:
"""
构建表格修复提示词。
Args:
table_block: 原始 table block
validation_errors: 验证错误列表
Returns:
str: 提示词
"""
block_json
=
json
.
dumps
(
table_block
,
ensure_ascii
=
False
,
indent
=
2
)
errors_text
=
"
\n
"
.
join
(
f
"- {error}"
for
error
in
validation_errors
)
prompt
=
f
"""请修复以下表格数据中的错误:
**原始数据:**
```json
{block_json}
```
**检测到的错误:**
{errors_text}
**要求:**
1. 返回修复后的完整 table block(JSON格式)
2. 特别注意展平嵌套的 cells 结构
3. 确保每个 cell 都有 blocks 数组
4. 如果无法确定如何修复,保持原始数据
**重要的输出格式要求:**
1. 只返回纯JSON对象,不要添加任何说明文字
2. 不要使用```json```标记包裹
3. 确保JSON语法完全正确
4. 所有字符串使用双引号
"""
return
prompt
def
build_wordcloud_repair_prompt
(
widget_block
:
Dict
[
str
,
Any
],
validation_errors
:
List
[
str
]
)
->
str
:
"""
构建词云修复提示词。
Args:
widget_block: 原始 wordcloud widget block
validation_errors: 验证错误列表
Returns:
str: 提示词
"""
block_json
=
json
.
dumps
(
widget_block
,
ensure_ascii
=
False
,
indent
=
2
)
errors_text
=
"
\n
"
.
join
(
f
"- {error}"
for
error
in
validation_errors
)
prompt
=
f
"""请修复以下词云数据中的错误:
**原始数据:**
```json
{block_json}
```
**检测到的错误:**
{errors_text}
**要求:**
1. 返回修复后的完整 widget block(JSON格式)
2. 确保词云数据位于 data.words 路径
3. 每个词项必须有 text 和 weight 字段
4. 如果无法确定如何修复,保持原始数据
**重要的输出格式要求:**
1. 只返回纯JSON对象,不要添加任何说明文字
2. 不要使用```json```标记包裹
3. 确保JSON语法完全正确
4. 所有字符串使用双引号
"""
return
prompt
def
build_chart_repair_prompt
(
widget_block
:
Dict
[
str
,
Any
],
validation_errors
:
List
[
str
]
...
...
@@ -283,3 +520,111 @@ def create_llm_repair_functions() -> List:
logger
.
info
(
f
"图表API修复功能已启用,共 {len(repair_functions)} 个Engine可用"
)
return
repair_functions
def
create_table_repair_functions
()
->
List
:
"""
创建表格 LLM 修复函数列表。
使用与图表修复相同的 Engine 配置。
Returns:
List[Callable]: 修复函数列表
"""
repair_functions
=
[]
# 使用 ReportEngine 修复表格
if
settings
.
REPORT_ENGINE_API_KEY
and
settings
.
REPORT_ENGINE_BASE_URL
:
def
repair_table_with_report_engine
(
table_block
:
Dict
[
str
,
Any
],
errors
:
List
[
str
])
->
Optional
[
Dict
[
str
,
Any
]]:
"""使用 ReportEngine 的 LLM 修复表格"""
try
:
from
ReportEngine.llms
import
LLMClient
client
=
LLMClient
(
api_key
=
settings
.
REPORT_ENGINE_API_KEY
,
base_url
=
settings
.
REPORT_ENGINE_BASE_URL
,
model_name
=
settings
.
REPORT_ENGINE_MODEL_NAME
or
"gpt-4"
,
)
prompt
=
build_table_repair_prompt
(
table_block
,
errors
)
response
=
client
.
invoke
(
TABLE_REPAIR_SYSTEM_PROMPT
,
prompt
,
temperature
=
0.0
,
top_p
=
0.05
)
if
not
response
:
return
None
# 解析响应
repaired
=
json
.
loads
(
response
)
return
repaired
except
Exception
as
e
:
logger
.
exception
(
f
"ReportEngine 表格修复失败: {e}"
)
return
None
repair_functions
.
append
(
repair_table_with_report_engine
)
logger
.
debug
(
"已添加 ReportEngine 表格修复函数"
)
if
not
repair_functions
:
logger
.
warning
(
"未配置任何 Engine API,表格 API 修复功能将不可用"
)
else
:
logger
.
info
(
f
"表格 API 修复功能已启用,共 {len(repair_functions)} 个 Engine 可用"
)
return
repair_functions
def
create_wordcloud_repair_functions
()
->
List
:
"""
创建词云 LLM 修复函数列表。
使用与图表修复相同的 Engine 配置。
Returns:
List[Callable]: 修复函数列表
"""
repair_functions
=
[]
# 使用 ReportEngine 修复词云
if
settings
.
REPORT_ENGINE_API_KEY
and
settings
.
REPORT_ENGINE_BASE_URL
:
def
repair_wordcloud_with_report_engine
(
widget_block
:
Dict
[
str
,
Any
],
errors
:
List
[
str
])
->
Optional
[
Dict
[
str
,
Any
]]:
"""使用 ReportEngine 的 LLM 修复词云"""
try
:
from
ReportEngine.llms
import
LLMClient
client
=
LLMClient
(
api_key
=
settings
.
REPORT_ENGINE_API_KEY
,
base_url
=
settings
.
REPORT_ENGINE_BASE_URL
,
model_name
=
settings
.
REPORT_ENGINE_MODEL_NAME
or
"gpt-4"
,
)
prompt
=
build_wordcloud_repair_prompt
(
widget_block
,
errors
)
response
=
client
.
invoke
(
WORDCLOUD_REPAIR_SYSTEM_PROMPT
,
prompt
,
temperature
=
0.0
,
top_p
=
0.05
)
if
not
response
:
return
None
# 解析响应
repaired
=
json
.
loads
(
response
)
return
repaired
except
Exception
as
e
:
logger
.
exception
(
f
"ReportEngine 词云修复失败: {e}"
)
return
None
repair_functions
.
append
(
repair_wordcloud_with_report_engine
)
logger
.
debug
(
"已添加 ReportEngine 词云修复函数"
)
if
not
repair_functions
:
logger
.
warning
(
"未配置任何 Engine API,词云 API 修复功能将不可用"
)
else
:
logger
.
info
(
f
"词云 API 修复功能已启用,共 {len(repair_functions)} 个 Engine 可用"
)
return
repair_functions
...
...
ReportEngine/utils/table_validator.py
0 → 100644
View file @
6d98e93
"""
表格验证和修复工具。
提供对 IR 表格数据的验证和修复能力:
1. 验证表格数据格式是否符合 IR schema 要求
2. 检测嵌套 cells 结构问题
3. 验证 rows/cells 基本格式
4. 检查数据完整性
5. 本地规则修复常见问题
"""
from
__future__
import
annotations
import
copy
from
typing
import
Any
,
Dict
,
List
,
Optional
,
Tuple
from
dataclasses
import
dataclass
from
loguru
import
logger
@dataclass
class
TableValidationResult
:
"""表格验证结果"""
is_valid
:
bool
errors
:
List
[
str
]
warnings
:
List
[
str
]
nested_cells_detected
:
bool
=
False
empty_cells_count
:
int
=
0
total_cells_count
:
int
=
0
def
has_critical_errors
(
self
)
->
bool
:
"""是否有严重错误(会导致渲染失败)"""
return
not
self
.
is_valid
and
len
(
self
.
errors
)
>
0
@dataclass
class
TableRepairResult
:
"""表格修复结果"""
success
:
bool
repaired_block
:
Optional
[
Dict
[
str
,
Any
]]
changes
:
List
[
str
]
def
has_changes
(
self
)
->
bool
:
"""是否有修改"""
return
len
(
self
.
changes
)
>
0
class
TableValidator
:
"""
表格验证器 - 验证 IR 表格数据格式是否正确。
验证规则:
1. 基本结构验证:type, rows 字段
2. 行结构验证:每行必须有 cells 数组
3. 单元格结构验证:每个 cell 必须有 blocks 数组
4. 嵌套 cells 检测:检测错误的嵌套 cells 结构
5. 数据完整性验证:检查空单元格和缺失数据
"""
def
__init__
(
self
):
"""初始化验证器"""
pass
def
validate
(
self
,
table_block
:
Dict
[
str
,
Any
])
->
TableValidationResult
:
"""
验证表格格式。
Args:
table_block: table 类型的 block,包含 type, rows 等字段
Returns:
TableValidationResult: 验证结果
"""
errors
:
List
[
str
]
=
[]
warnings
:
List
[
str
]
=
[]
nested_cells_detected
=
False
empty_cells_count
=
0
total_cells_count
=
0
# 1. 基本结构验证
if
not
isinstance
(
table_block
,
dict
):
errors
.
append
(
"table_block 必须是字典类型"
)
return
TableValidationResult
(
False
,
errors
,
warnings
,
nested_cells_detected
,
empty_cells_count
,
total_cells_count
)
# 2. 检查 type
block_type
=
table_block
.
get
(
'type'
)
if
block_type
!=
'table'
:
errors
.
append
(
f
"block type 应为 'table',实际为 '{block_type}'"
)
# 3. 验证 rows 字段
rows
=
table_block
.
get
(
'rows'
)
if
rows
is
None
:
errors
.
append
(
"缺少 rows 字段"
)
return
TableValidationResult
(
False
,
errors
,
warnings
,
nested_cells_detected
,
empty_cells_count
,
total_cells_count
)
if
not
isinstance
(
rows
,
list
):
errors
.
append
(
"rows 必须是数组类型"
)
return
TableValidationResult
(
False
,
errors
,
warnings
,
nested_cells_detected
,
empty_cells_count
,
total_cells_count
)
if
len
(
rows
)
==
0
:
warnings
.
append
(
"rows 数组为空,表格可能无法正常显示"
)
# 4. 验证每一行
for
row_idx
,
row
in
enumerate
(
rows
):
row_result
=
self
.
_validate_row
(
row
,
row_idx
)
errors
.
extend
(
row_result
[
'errors'
])
warnings
.
extend
(
row_result
[
'warnings'
])
if
row_result
[
'nested_cells_detected'
]:
nested_cells_detected
=
True
empty_cells_count
+=
row_result
[
'empty_cells_count'
]
total_cells_count
+=
row_result
[
'total_cells_count'
]
# 5. 检查列数一致性
column_counts
=
[]
for
row
in
rows
:
if
isinstance
(
row
,
dict
):
cells
=
row
.
get
(
'cells'
,
[])
if
isinstance
(
cells
,
list
):
col_count
=
0
for
cell
in
cells
:
if
isinstance
(
cell
,
dict
):
col_count
+=
int
(
cell
.
get
(
'colspan'
,
1
))
else
:
col_count
+=
1
column_counts
.
append
(
col_count
)
if
column_counts
and
len
(
set
(
column_counts
))
>
1
:
warnings
.
append
(
f
"各行列数不一致: {column_counts},可能导致渲染问题"
)
# 6. 空单元格警告
if
total_cells_count
>
0
and
empty_cells_count
>
total_cells_count
*
0.5
:
warnings
.
append
(
f
"超过50
%
的单元格为空 ({empty_cells_count}/{total_cells_count}),"
"表格可能缺少数据"
)
is_valid
=
len
(
errors
)
==
0
return
TableValidationResult
(
is_valid
,
errors
,
warnings
,
nested_cells_detected
,
empty_cells_count
,
total_cells_count
)
def
_validate_row
(
self
,
row
:
Any
,
row_idx
:
int
)
->
Dict
[
str
,
Any
]:
"""验证单行"""
result
=
{
'errors'
:
[],
'warnings'
:
[],
'nested_cells_detected'
:
False
,
'empty_cells_count'
:
0
,
'total_cells_count'
:
0
,
}
if
not
isinstance
(
row
,
dict
):
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}] 必须是对象类型"
)
return
result
cells
=
row
.
get
(
'cells'
)
if
cells
is
None
:
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}] 缺少 cells 字段"
)
return
result
if
not
isinstance
(
cells
,
list
):
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}].cells 必须是数组类型"
)
return
result
if
len
(
cells
)
==
0
:
result
[
'warnings'
]
.
append
(
f
"rows[{row_idx}].cells 数组为空"
)
# 验证每个单元格
for
cell_idx
,
cell
in
enumerate
(
cells
):
cell_result
=
self
.
_validate_cell
(
cell
,
row_idx
,
cell_idx
)
result
[
'errors'
]
.
extend
(
cell_result
[
'errors'
])
result
[
'warnings'
]
.
extend
(
cell_result
[
'warnings'
])
if
cell_result
[
'nested_cells_detected'
]:
result
[
'nested_cells_detected'
]
=
True
if
cell_result
[
'is_empty'
]:
result
[
'empty_cells_count'
]
+=
1
result
[
'total_cells_count'
]
+=
1
return
result
def
_validate_cell
(
self
,
cell
:
Any
,
row_idx
:
int
,
cell_idx
:
int
)
->
Dict
[
str
,
Any
]:
"""验证单个单元格"""
result
=
{
'errors'
:
[],
'warnings'
:
[],
'nested_cells_detected'
:
False
,
'is_empty'
:
False
,
}
if
not
isinstance
(
cell
,
dict
):
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}] 必须是对象类型"
)
return
result
# 检测嵌套 cells 结构(这是常见的 LLM 错误)
if
'cells'
in
cell
and
'blocks'
not
in
cell
:
result
[
'nested_cells_detected'
]
=
True
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}] 检测到错误的嵌套 cells 结构,"
"应该是 blocks 而不是 cells"
)
return
result
# 验证 blocks 字段
blocks
=
cell
.
get
(
'blocks'
)
if
blocks
is
None
:
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}] 缺少 blocks 字段"
)
return
result
if
not
isinstance
(
blocks
,
list
):
result
[
'errors'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}].blocks 必须是数组类型"
)
return
result
# 检查是否为空
if
len
(
blocks
)
==
0
:
result
[
'is_empty'
]
=
True
else
:
# 检查 blocks 内容是否有效
has_content
=
False
for
block
in
blocks
:
if
isinstance
(
block
,
dict
):
# 检查 paragraph 的 inlines
if
block
.
get
(
'type'
)
==
'paragraph'
:
inlines
=
block
.
get
(
'inlines'
,
[])
for
inline
in
inlines
:
if
isinstance
(
inline
,
dict
):
text
=
inline
.
get
(
'text'
,
''
)
if
text
and
text
.
strip
():
has_content
=
True
break
# 检查其他类型的 text/content
elif
block
.
get
(
'text'
)
or
block
.
get
(
'content'
):
has_content
=
True
break
if
has_content
:
break
if
not
has_content
:
result
[
'is_empty'
]
=
True
# 验证 colspan/rowspan
colspan
=
cell
.
get
(
'colspan'
)
if
colspan
is
not
None
:
if
not
isinstance
(
colspan
,
int
)
or
colspan
<
1
:
result
[
'warnings'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}].colspan 值无效: {colspan}"
)
rowspan
=
cell
.
get
(
'rowspan'
)
if
rowspan
is
not
None
:
if
not
isinstance
(
rowspan
,
int
)
or
rowspan
<
1
:
result
[
'warnings'
]
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}].rowspan 值无效: {rowspan}"
)
return
result
def
can_render
(
self
,
table_block
:
Dict
[
str
,
Any
])
->
bool
:
"""
判断表格是否能正常渲染(快速检查)。
Args:
table_block: table 类型的 block
Returns:
bool: 是否能正常渲染
"""
result
=
self
.
validate
(
table_block
)
return
result
.
is_valid
def
has_nested_cells
(
self
,
table_block
:
Dict
[
str
,
Any
])
->
bool
:
"""
检测表格是否包含嵌套 cells 结构。
Args:
table_block: table 类型的 block
Returns:
bool: 是否包含嵌套 cells
"""
result
=
self
.
validate
(
table_block
)
return
result
.
nested_cells_detected
class
TableRepairer
:
"""
表格修复器 - 尝试修复表格数据。
修复策略:
1. 展平嵌套 cells 结构
2. 补充缺失的 blocks 字段
3. 规范化单元格结构
4. 验证修复结果
"""
def
__init__
(
self
,
validator
:
Optional
[
TableValidator
]
=
None
):
"""
初始化修复器。
Args:
validator: 表格验证器实例
"""
self
.
validator
=
validator
or
TableValidator
()
def
repair
(
self
,
table_block
:
Dict
[
str
,
Any
],
validation_result
:
Optional
[
TableValidationResult
]
=
None
)
->
TableRepairResult
:
"""
尝试修复表格数据。
Args:
table_block: table 类型的 block
validation_result: 验证结果(可选,如果没有会先进行验证)
Returns:
TableRepairResult: 修复结果
"""
# 1. 如果没有验证结果,先验证
if
validation_result
is
None
:
validation_result
=
self
.
validator
.
validate
(
table_block
)
# 2. 如果已经有效,返回原数据
if
validation_result
.
is_valid
and
not
validation_result
.
nested_cells_detected
:
return
TableRepairResult
(
True
,
table_block
,
[])
# 3. 尝试修复
repaired
=
copy
.
deepcopy
(
table_block
)
changes
:
List
[
str
]
=
[]
# 确保基本结构
if
'type'
not
in
repaired
:
repaired
[
'type'
]
=
'table'
changes
.
append
(
"添加缺失的 type 字段"
)
if
'rows'
not
in
repaired
or
not
isinstance
(
repaired
.
get
(
'rows'
),
list
):
repaired
[
'rows'
]
=
[]
changes
.
append
(
"添加缺失的 rows 字段"
)
# 修复每一行
repaired_rows
:
List
[
Dict
[
str
,
Any
]]
=
[]
for
row_idx
,
row
in
enumerate
(
repaired
.
get
(
'rows'
,
[])):
repaired_row
,
row_changes
=
self
.
_repair_row
(
row
,
row_idx
)
repaired_rows
.
append
(
repaired_row
)
changes
.
extend
(
row_changes
)
repaired
[
'rows'
]
=
repaired_rows
# 4. 验证修复结果
repaired_validation
=
self
.
validator
.
validate
(
repaired
)
success
=
repaired_validation
.
is_valid
if
not
success
:
logger
.
warning
(
f
"表格修复后仍有问题: {repaired_validation.errors}"
)
return
TableRepairResult
(
success
,
repaired
,
changes
)
def
_repair_row
(
self
,
row
:
Any
,
row_idx
:
int
)
->
Tuple
[
Dict
[
str
,
Any
],
List
[
str
]]:
"""修复单行"""
changes
:
List
[
str
]
=
[]
if
not
isinstance
(
row
,
dict
):
return
{
'cells'
:
[
self
.
_default_cell
()]},
[
f
"rows[{row_idx}] 类型错误,已重建"
]
repaired_row
=
dict
(
row
)
# 确保有 cells 字段
if
'cells'
not
in
repaired_row
or
not
isinstance
(
repaired_row
.
get
(
'cells'
),
list
):
repaired_row
[
'cells'
]
=
[
self
.
_default_cell
()]
changes
.
append
(
f
"rows[{row_idx}] 添加缺失的 cells 字段"
)
return
repaired_row
,
changes
# 修复每个单元格
repaired_cells
:
List
[
Dict
[
str
,
Any
]]
=
[]
for
cell_idx
,
cell
in
enumerate
(
repaired_row
.
get
(
'cells'
,
[])):
if
isinstance
(
cell
,
dict
)
and
'cells'
in
cell
and
'blocks'
not
in
cell
:
# 展平嵌套 cells
flattened
=
self
.
_flatten_nested_cells
(
cell
)
repaired_cells
.
extend
(
flattened
)
changes
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}] 展平嵌套 cells 结构"
)
else
:
repaired_cell
,
cell_changes
=
self
.
_repair_cell
(
cell
,
row_idx
,
cell_idx
)
repaired_cells
.
append
(
repaired_cell
)
changes
.
extend
(
cell_changes
)
repaired_row
[
'cells'
]
=
repaired_cells
return
repaired_row
,
changes
def
_repair_cell
(
self
,
cell
:
Any
,
row_idx
:
int
,
cell_idx
:
int
)
->
Tuple
[
Dict
[
str
,
Any
],
List
[
str
]]:
"""修复单个单元格"""
changes
:
List
[
str
]
=
[]
if
not
isinstance
(
cell
,
dict
):
if
isinstance
(
cell
,
(
str
,
int
,
float
)):
return
{
'blocks'
:
[
self
.
_text_to_paragraph
(
str
(
cell
))]
},
[
f
"rows[{row_idx}].cells[{cell_idx}] 转换为标准格式"
]
return
self
.
_default_cell
(),
[
f
"rows[{row_idx}].cells[{cell_idx}] 类型错误,已重建"
]
repaired_cell
=
dict
(
cell
)
# 确保有 blocks 字段
if
'blocks'
not
in
repaired_cell
:
# 尝试从其他字段提取内容
text
=
''
for
key
in
(
'text'
,
'content'
,
'value'
):
if
key
in
repaired_cell
and
repaired_cell
[
key
]:
text
=
str
(
repaired_cell
[
key
])
break
repaired_cell
[
'blocks'
]
=
[
self
.
_text_to_paragraph
(
text
or
''
)]
changes
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}] 添加缺失的 blocks 字段"
)
elif
not
isinstance
(
repaired_cell
[
'blocks'
],
list
):
repaired_cell
[
'blocks'
]
=
[
self
.
_text_to_paragraph
(
''
)]
changes
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}].blocks 类型错误,已重建"
)
elif
len
(
repaired_cell
[
'blocks'
])
==
0
:
repaired_cell
[
'blocks'
]
=
[
self
.
_text_to_paragraph
(
''
)]
changes
.
append
(
f
"rows[{row_idx}].cells[{cell_idx}].blocks 为空,添加默认内容"
)
return
repaired_cell
,
changes
def
_flatten_nested_cells
(
self
,
cell
:
Dict
[
str
,
Any
])
->
List
[
Dict
[
str
,
Any
]]:
"""展平嵌套的 cells 结构"""
nested_cells
=
cell
.
get
(
'cells'
,
[])
if
not
isinstance
(
nested_cells
,
list
):
return
[
self
.
_default_cell
()]
result
:
List
[
Dict
[
str
,
Any
]]
=
[]
for
nested
in
nested_cells
:
if
isinstance
(
nested
,
dict
):
if
'blocks'
in
nested
and
'cells'
not
in
nested
:
# 正常的 cell
result
.
append
(
nested
)
elif
'cells'
in
nested
and
'blocks'
not
in
nested
:
# 继续递归展平
result
.
extend
(
self
.
_flatten_nested_cells
(
nested
))
else
:
# 尝试修复
repaired
,
_
=
self
.
_repair_cell
(
nested
,
0
,
0
)
result
.
append
(
repaired
)
elif
isinstance
(
nested
,
(
str
,
int
,
float
)):
result
.
append
({
'blocks'
:
[
self
.
_text_to_paragraph
(
str
(
nested
))]
})
return
result
if
result
else
[
self
.
_default_cell
()]
def
_default_cell
(
self
)
->
Dict
[
str
,
Any
]:
"""创建默认单元格"""
return
{
'blocks'
:
[
self
.
_text_to_paragraph
(
''
)]
}
def
_text_to_paragraph
(
self
,
text
:
str
)
->
Dict
[
str
,
Any
]:
"""将文本转换为 paragraph block"""
return
{
'type'
:
'paragraph'
,
'inlines'
:
[{
'text'
:
text
,
'marks'
:
[]}]
}
def
create_table_validator
()
->
TableValidator
:
"""创建表格验证器实例"""
return
TableValidator
()
def
create_table_repairer
(
validator
:
Optional
[
TableValidator
]
=
None
)
->
TableRepairer
:
"""创建表格修复器实例"""
return
TableRepairer
(
validator
)
__all__
=
[
'TableValidator'
,
'TableRepairer'
,
'TableValidationResult'
,
'TableRepairResult'
,
'create_table_validator'
,
'create_table_repairer'
,
]
...
...
Please
register
or
login
to post a comment