马一丁

Further fixes to table rendering logic

... ... @@ -1168,9 +1168,192 @@ class ChapterGenerationNode(BaseNode):
def _sanitize_table_block(self, block: Dict[str, Any]):
"""保证表格的rows/cells结构合法且每个单元格包含至少一个block"""
rows = self._normalize_table_rows(block.get("rows"))
raw_rows = block.get("rows")
# 先检测是否存在嵌套行结构问题(只有1行但cells中有嵌套)
if isinstance(raw_rows, list) and len(raw_rows) == 1:
first_row = raw_rows[0]
if isinstance(first_row, dict):
cells = first_row.get("cells", [])
# 检测是否存在嵌套结构
has_nested = any(
isinstance(cell, dict) and "cells" in cell and "blocks" not in cell
for cell in cells
if isinstance(cell, dict)
)
if has_nested:
# 修复嵌套行结构
fixed_rows = self._fix_nested_rows_structure(raw_rows)
block["rows"] = fixed_rows
return
# 正常情况下,使用标准规范化
rows = self._normalize_table_rows(raw_rows)
block["rows"] = rows
def _fix_nested_rows_structure(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
修复嵌套错误的表格行结构。
LLM生成的表格只有1行但所有数据被嵌套在cells中时,
本方法会展平所有单元格并重新组织成正确的多行结构。
参数:
rows: 原始的表格行数组(应该只有1行)。
返回:
List[Dict]: 修复后的多行表格结构。
"""
if not rows or len(rows) != 1:
return self._normalize_table_rows(rows)
first_row = rows[0]
original_cells = first_row.get("cells", [])
# 递归展平所有嵌套的单元格
all_cells = self._flatten_all_cells_recursive(original_cells)
if len(all_cells) <= 1:
return self._normalize_table_rows(rows)
# 辅助函数:获取单元格文本
def _get_cell_text(cell: Dict[str, Any]) -> str:
blocks = cell.get("blocks", [])
for block in blocks:
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
if isinstance(inline, dict):
text = inline.get("text", "")
if text:
return str(text).strip()
return ""
def _is_placeholder_cell(cell: Dict[str, Any]) -> bool:
"""判断单元格是否是占位符"""
text = _get_cell_text(cell)
return text in ("--", "-", "—", "——", "", "N/A", "n/a")
def _is_header_cell(cell: Dict[str, Any]) -> bool:
"""判断单元格是否像表头(通常有加粗标记或是典型表头词)"""
blocks = cell.get("blocks", [])
for block in blocks:
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
if isinstance(inline, dict):
marks = inline.get("marks", [])
if any(isinstance(m, dict) and m.get("type") == "bold" for m in marks):
return True
# 也检查典型的表头词
text = _get_cell_text(cell)
header_keywords = {
"时间", "日期", "名称", "类型", "状态", "数量", "金额", "比例", "指标",
"平台", "渠道", "来源", "描述", "说明", "备注", "序号", "编号",
"事件", "关键", "数据", "支撑", "反应", "市场", "情感", "节点",
"维度", "要点", "详情", "标签", "影响", "趋势", "权重", "类别",
"信息", "内容", "风格", "偏好", "主要", "用户", "核心", "特征",
"分类", "范围", "对象", "项目", "阶段", "周期", "频率", "等级",
}
return any(kw in text for kw in header_keywords) and len(text) <= 20
# 过滤掉占位符单元格
valid_cells = [c for c in all_cells if not _is_placeholder_cell(c)]
if len(valid_cells) <= 1:
return self._normalize_table_rows(rows)
# 检测表头列数:统计连续的表头单元格数量
header_count = 0
for cell in valid_cells:
if _is_header_cell(cell):
header_count += 1
else:
break
# 如果没有检测到表头,使用启发式方法
if header_count == 0:
total = len(valid_cells)
for possible_cols in [4, 5, 3, 6, 2]:
if total % possible_cols == 0:
header_count = possible_cols
break
else:
# 尝试找到最接近的能整除的列数
for possible_cols in [4, 5, 3, 6, 2]:
remainder = total % possible_cols
if remainder <= 3:
header_count = possible_cols
break
else:
# 无法确定列数,使用原始数据
return self._normalize_table_rows(rows)
# 计算有效的单元格数量
total = len(valid_cells)
remainder = total % header_count
if remainder > 0 and remainder <= 3:
# 截断尾部多余的单元格
valid_cells = valid_cells[:total - remainder]
elif remainder > 3:
# 余数太大,可能列数检测错误
return self._normalize_table_rows(rows)
# 重新组织成多行
fixed_rows: List[Dict[str, Any]] = []
for i in range(0, len(valid_cells), header_count):
row_cells = valid_cells[i:i + header_count]
# 标记第一行为表头
if i == 0:
for cell in row_cells:
cell["header"] = True
fixed_rows.append({"cells": row_cells})
return fixed_rows if fixed_rows else self._normalize_table_rows(rows)
def _flatten_all_cells_recursive(self, cells: List[Any]) -> List[Dict[str, Any]]:
"""
递归展平所有嵌套的单元格结构。
参数:
cells: 可能包含嵌套结构的单元格数组。
返回:
List[Dict]: 展平后的单元格数组,每个单元格都有blocks
"""
if not cells:
return []
flattened: List[Dict[str, Any]] = []
def _extract_cells(cell_or_list: Any) -> None:
if not isinstance(cell_or_list, dict):
if isinstance(cell_or_list, (str, int, float)):
flattened.append({"blocks": [self._as_paragraph_block(str(cell_or_list))]})
return
# 如果当前对象有 blocks,说明它是一个有效的单元格
if "blocks" in cell_or_list:
# 创建单元格副本,移除嵌套的 cells
clean_cell = {
k: v for k, v in cell_or_list.items()
if k != "cells"
}
# 确保blocks有效
blocks = clean_cell.get("blocks")
if not isinstance(blocks, list) or not blocks:
clean_cell["blocks"] = [self._as_paragraph_block("")]
flattened.append(clean_cell)
# 如果当前对象有嵌套的 cells,递归处理
nested_cells = cell_or_list.get("cells")
if isinstance(nested_cells, list):
for nested_cell in nested_cells:
_extract_cells(nested_cell)
for cell in cells:
_extract_cells(cell)
return flattened
def _sanitize_engine_quote_block(self, block: Dict[str, Any]):
"""engineQuote仅用于单Agent发言,内部仅允许paragraphtitle需锁定Agent名称"""
engine_raw = block.get("engine")
... ...
... ... @@ -1318,12 +1318,13 @@ class HTMLRenderer:
"""获取单元格的文本内容"""
blocks = cell.get("blocks", [])
for block in blocks:
if block.get("type") == "paragraph":
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
text = inline.get("text", "")
if text:
return text.strip()
if isinstance(inline, dict):
text = inline.get("text", "")
if text:
return str(text).strip()
return ""
def _is_placeholder_cell(cell: Dict[str, Any]) -> bool:
... ... @@ -1337,21 +1338,31 @@ class HTMLRenderer:
if len(all_cells) <= 2:
return rows
# 检测表头列数:查找带有 bold 标记的单元格
# 检测表头列数:查找带有 bold 标记或典型表头词的单元格
def _is_header_cell(cell: Dict[str, Any]) -> bool:
"""判断单元格是否像表头(通常有加粗标记)"""
"""判断单元格是否像表头(有加粗标记或是典型表头词)"""
blocks = cell.get("blocks", [])
for block in blocks:
if block.get("type") == "paragraph":
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
marks = inline.get("marks", [])
if any(m.get("type") == "bold" for m in marks):
return True
return False
# 计算表头列数:统计连续的加粗单元格数量
# 占位符已经在前面被过滤掉了
if isinstance(inline, dict):
marks = inline.get("marks", [])
if any(isinstance(m, dict) and m.get("type") == "bold" for m in marks):
return True
# 也检查典型的表头词
text = _get_cell_text(cell)
header_keywords = {
"时间", "日期", "名称", "类型", "状态", "数量", "金额", "比例", "指标",
"平台", "渠道", "来源", "描述", "说明", "备注", "序号", "编号",
"事件", "关键", "数据", "支撑", "反应", "市场", "情感", "节点",
"维度", "要点", "详情", "标签", "影响", "趋势", "权重", "类别",
"信息", "内容", "风格", "偏好", "主要", "用户", "核心", "特征",
"分类", "范围", "对象", "项目", "阶段", "周期", "频率", "等级",
}
return any(kw in text for kw in header_keywords) and len(text) <= 20
# 计算表头列数:统计连续的表头单元格数量
header_count = 0
for cell in all_cells:
if _is_header_cell(cell):
... ... @@ -1364,13 +1375,13 @@ class HTMLRenderer:
if header_count == 0:
# 假设列数为 4 或 5(常见的表格列数)
total = len(all_cells)
for possible_cols in [4, 5, 3, 6]:
for possible_cols in [4, 5, 3, 6, 2]:
if total % possible_cols == 0:
header_count = possible_cols
break
else:
# 尝试找到最接近的能整除的列数
for possible_cols in [4, 5, 3, 6]:
for possible_cols in [4, 5, 3, 6, 2]:
remainder = total % possible_cols
# 允许最多3个多余的单元格(可能是尾部的总结或注释)
if remainder <= 3:
... ...
... ... @@ -254,12 +254,13 @@ class MarkdownRenderer:
"""获取单元格的文本内容"""
blocks = cell.get("blocks", [])
for block in blocks:
if block.get("type") == "paragraph":
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
text = inline.get("text", "")
if text:
return text.strip()
if isinstance(inline, dict):
text = inline.get("text", "")
if text:
return str(text).strip()
return ""
def _is_placeholder_cell(cell: Dict[str, Any]) -> bool:
... ... @@ -273,21 +274,31 @@ class MarkdownRenderer:
if len(all_cells) <= 2:
return rows
# 检测表头列数:查找带有 bold 标记的单元格
# 检测表头列数:查找带有 bold 标记或典型表头词的单元格
def _is_header_cell(cell: Dict[str, Any]) -> bool:
"""判断单元格是否像表头(通常有加粗标记)"""
"""判断单元格是否像表头(有加粗标记或是典型表头词)"""
blocks = cell.get("blocks", [])
for block in blocks:
if block.get("type") == "paragraph":
if isinstance(block, dict) and block.get("type") == "paragraph":
inlines = block.get("inlines", [])
for inline in inlines:
marks = inline.get("marks", [])
if any(m.get("type") == "bold" for m in marks):
return True
return False
# 计算表头列数:统计连续的加粗单元格数量
# 占位符已经在前面被过滤掉了
if isinstance(inline, dict):
marks = inline.get("marks", [])
if any(isinstance(m, dict) and m.get("type") == "bold" for m in marks):
return True
# 也检查典型的表头词
text = _get_cell_text(cell)
header_keywords = {
"时间", "日期", "名称", "类型", "状态", "数量", "金额", "比例", "指标",
"平台", "渠道", "来源", "描述", "说明", "备注", "序号", "编号",
"事件", "关键", "数据", "支撑", "反应", "市场", "情感", "节点",
"维度", "要点", "详情", "标签", "影响", "趋势", "权重", "类别",
"信息", "内容", "风格", "偏好", "主要", "用户", "核心", "特征",
"分类", "范围", "对象", "项目", "阶段", "周期", "频率", "等级",
}
return any(kw in text for kw in header_keywords) and len(text) <= 20
# 计算表头列数:统计连续的表头单元格数量
header_count = 0
for cell in all_cells:
if _is_header_cell(cell):
... ... @@ -300,13 +311,13 @@ class MarkdownRenderer:
if header_count == 0:
# 假设列数为 4 或 5(常见的表格列数)
total = len(all_cells)
for possible_cols in [4, 5, 3, 6]:
for possible_cols in [4, 5, 3, 6, 2]:
if total % possible_cols == 0:
header_count = possible_cols
break
else:
# 尝试找到最接近的能整除的列数
for possible_cols in [4, 5, 3, 6]:
for possible_cols in [4, 5, 3, 6, 2]:
remainder = total % possible_cols
# 允许最多3个多余的单元格(可能是尾部的总结或注释)
if remainder <= 3:
... ...