Toggle navigation
Toggle navigation
This project
Loading...
Sign in
万朱浩
/
Venue-Ops
Go to a project
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
Doiiars
2025-11-05 11:37:37 +0800
Browse Files
Options
Browse Files
Download
Plain Diff
Committed by
GitHub
2025-11-05 11:37:37 +0800
Commit
f0772bfe6eaaeed2990a935784c1aa6d08f271c2
f0772bfe
2 parents
883a405c
b774e5d1
Merge pull request #116 from DoiiarX/fix-news-id
修复news-id冲突问题
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
55 additions
and
40 deletions
MindSpider/BroadTopicExtraction/database_manager.py
MindSpider/BroadTopicExtraction/database_manager.py
View file @
f0772bf
...
...
@@ -25,14 +25,15 @@ except ImportError:
from
config
import
settings
class
DatabaseManager
:
"""数据库管理器"""
def
__init__
(
self
):
"""初始化数据库管理器"""
self
.
engine
:
Engine
=
None
self
.
connect
()
def
connect
(
self
):
"""连接数据库"""
try
:
...
...
@@ -46,46 +47,47 @@ class DatabaseManager:
except
ModuleNotFoundError
as
e
:
missing
:
str
=
str
(
e
)
if
"psycopg"
in
missing
:
logger
.
error
(
"数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]"
)
logger
.
error
(
"数据库连接失败: 未安装PostgreSQL驱动 psycopg。请安装: psycopg[binary]。参考指令:uv pip install psycopg[binary]"
)
elif
"pymysql"
in
missing
:
logger
.
error
(
"数据库连接失败: 未安装MySQL驱动 pymysql。请安装: pymysql。参考指令:uv pip install pymysql"
)
else
:
logger
.
error
(
f
"数据库连接失败(缺少驱动): {e}"
)
raise
except
Exception
as
e
:
logger
.
e
rror
(
f
"数据库连接失败: {e}"
)
logger
.
e
xception
(
f
"数据库连接失败: {e}"
)
raise
def
close
(
self
):
"""关闭数据库连接"""
if
self
.
engine
:
self
.
engine
.
dispose
()
logger
.
info
(
"数据库连接已关闭"
)
def
__enter__
(
self
):
return
self
def
__exit__
(
self
,
exc_type
,
exc_val
,
exc_tb
):
self
.
close
()
# ==================== 新闻数据操作 ====================
def
save_daily_news
(
self
,
news_data
:
List
[
Dict
],
crawl_date
:
date
=
None
)
->
int
:
"""
保存每日新闻数据,如果当天已有数据则覆盖
Args:
news_data: 新闻数据列表
crawl_date: 爬取日期,默认为今天
Returns:
保存的新闻数量
"""
if
not
crawl_date
:
crawl_date
=
date
.
today
()
current_timestamp
=
int
(
datetime
.
now
()
.
timestamp
())
try
:
saved_count
=
0
# 先独立事务执行删除,防止后续插入失败导致无法清理
...
...
@@ -97,7 +99,13 @@ class DatabaseManager:
# 逐条插入,单条失败不影响后续(每条独立事务)
for
news_item
in
news_data
:
try
:
news_id
=
f
"{news_item.get('source', 'unknown')}_{news_item.get('id', news_item.get('rank', 0))}"
# news_item.get('id') 已经是完整的 news_id(格式:source_item_id)
# 为了支持同一条新闻在不同日期出现,将 crawl_date 加入到 news_id 中
base_news_id
=
news_item
.
get
(
'id'
)
or
f
"{news_item.get('source', 'unknown')}_rank_{news_item.get('rank', 0)}"
# 将日期格式化为字符串并加入到 news_id 中,确保全局唯一性
news_id
=
f
"{base_news_id}_{crawl_date.strftime('
%
Y
%
m
%
d')}"
title_val
=
(
news_item
.
get
(
"title"
,
""
)
or
""
)
if
len
(
title_val
)
>
500
:
title_val
=
title_val
[:
500
]
...
...
@@ -124,27 +132,27 @@ class DatabaseManager:
)
saved_count
+=
1
except
Exception
as
e
:
logger
.
warning
(
f
"保存单条新闻失败: {e}"
)
logger
.
exception
(
f
"保存单条新闻失败: {e}"
)
continue
logger
.
info
(
f
"成功保存 {saved_count} 条新闻记录"
)
return
saved_count
except
Exception
as
e
:
logger
.
exception
(
f
"保存新闻数据失败: {e}"
)
return
0
def
get_daily_news
(
self
,
crawl_date
:
date
=
None
)
->
List
[
Dict
]:
"""
获取每日新闻数据
Args:
crawl_date: 爬取日期,默认为今天
Returns:
新闻列表
"""
if
not
crawl_date
:
crawl_date
=
date
.
today
()
query
=
(
"SELECT * FROM daily_news WHERE crawl_date = :d ORDER BY rank_position ASC"
)
...
...
@@ -152,39 +160,43 @@ class DatabaseManager:
result
=
conn
.
execute
(
text
(
query
),
{
"d"
:
crawl_date
})
rows
=
result
.
mappings
()
.
all
()
return
rows
# ==================== 话题数据操作 ====================
def
save_daily_topics
(
self
,
keywords
:
List
[
str
],
summary
:
str
,
extract_date
:
date
=
None
)
->
bool
:
"""
保存每日话题分析
Args:
keywords: 话题关键词列表
summary: 新闻分析总结
extract_date: 提取日期,默认为今天
Returns:
是否保存成功
"""
if
not
extract_date
:
extract_date
=
date
.
today
()
current_timestamp
=
int
(
datetime
.
now
()
.
timestamp
())
try
:
keywords_json
=
json
.
dumps
(
keywords
,
ensure_ascii
=
False
)
# 为了支持外键引用,topic_id 需要全局唯一,所以将日期加入到 topic_id 中
topic_id
=
f
"summary_{extract_date.strftime('
%
Y
%
m
%
d')}"
with
self
.
engine
.
begin
()
as
conn
:
check
=
conn
.
execute
(
text
(
"SELECT id FROM daily_topics WHERE extract_date = :d AND topic_id = :tid"
),
{
"d"
:
extract_date
,
"tid"
:
"summary"
},
{
"d"
:
extract_date
,
"tid"
:
topic_id
},
)
.
first
()
if
check
:
conn
.
execute
(
text
(
"UPDATE daily_topics SET keywords = :k, topic_description = :s, add_ts = :ts, last_modify_ts = :lmt, topic_name = :tn WHERE extract_date = :d AND topic_id = :tid"
),
{
"k"
:
keywords_json
,
"s"
:
summary
,
"ts"
:
current_timestamp
,
"lmt"
:
current_timestamp
,
"d"
:
extract_date
,
"tid"
:
"summary"
,
"tn"
:
"每日新闻分析"
},
{
"k"
:
keywords_json
,
"s"
:
summary
,
"ts"
:
current_timestamp
,
"lmt"
:
current_timestamp
,
"d"
:
extract_date
,
"tid"
:
topic_id
,
"tn"
:
"每日新闻分析"
},
)
logger
.
info
(
f
"更新了 {extract_date} 的话题分析"
)
else
:
...
...
@@ -192,30 +204,32 @@ class DatabaseManager:
text
(
"INSERT INTO daily_topics (extract_date, topic_id, topic_name, keywords, topic_description, add_ts, last_modify_ts) VALUES (:d, :tid, :tn, :k, :s, :ts, :lmt)"
),
{
"d"
:
extract_date
,
"tid"
:
"summary"
,
"tn"
:
"每日新闻分析"
,
"k"
:
keywords_json
,
"s"
:
summary
,
"ts"
:
current_timestamp
,
"lmt"
:
current_timestamp
},
{
"d"
:
extract_date
,
"tid"
:
topic_id
,
"tn"
:
"每日新闻分析"
,
"k"
:
keywords_json
,
"s"
:
summary
,
"ts"
:
current_timestamp
,
"lmt"
:
current_timestamp
},
)
logger
.
info
(
f
"保存了 {extract_date} 的话题分析"
)
return
True
except
Exception
as
e
:
logger
.
exception
(
f
"保存话题分析失败: {e}"
)
return
False
def
get_daily_topics
(
self
,
extract_date
:
date
=
None
)
->
Optional
[
Dict
]:
"""
获取每日话题分析
Args:
extract_date: 提取日期,默认为今天
Returns:
话题分析数据,如果不存在返回None
"""
if
not
extract_date
:
extract_date
=
date
.
today
()
try
:
with
self
.
engine
.
connect
()
as
conn
:
result
=
conn
.
execute
(
text
(
"SELECT * FROM daily_topics WHERE extract_date = :d"
),
{
"d"
:
extract_date
})
.
mappings
()
.
first
()
result
=
conn
.
execute
(
text
(
"SELECT * FROM daily_topics WHERE extract_date = :d"
),
{
"d"
:
extract_date
})
.
mappings
()
.
first
()
if
result
:
result
=
dict
(
result
)
# 转为可变dict以支持item赋值
result
[
"keywords"
]
=
json
.
loads
(
result
[
"keywords"
])
if
result
.
get
(
"keywords"
)
else
[]
...
...
@@ -224,14 +238,14 @@ class DatabaseManager:
except
Exception
as
e
:
logger
.
exception
(
f
"获取话题分析失败: {e}"
)
return
None
def
get_recent_topics
(
self
,
days
:
int
=
7
)
->
List
[
Dict
]:
"""
获取最近几天的话题分析
Args:
days: 天数
Returns:
话题分析列表
"""
...
...
@@ -254,9 +268,9 @@ class DatabaseManager:
except
Exception
as
e
:
logger
.
exception
(
f
"获取最近话题分析失败: {e}"
)
return
[]
# ==================== 统计查询 ====================
def
get_summary_stats
(
self
,
days
:
int
=
7
)
->
Dict
:
"""获取统计摘要"""
try
:
...
...
@@ -290,18 +304,19 @@ class DatabaseManager:
logger
.
exception
(
f
"获取统计摘要失败: {e}"
)
return
{
"news_stats"
:
[],
"topics_stats"
:
[]}
if
__name__
==
"__main__"
:
# 测试数据库管理器
with
DatabaseManager
()
as
db
:
# 测试获取新闻
news
=
db
.
get_daily_news
()
logger
.
info
(
f
"今日新闻数量: {len(news)}"
)
# 测试获取话题
topics
=
db
.
get_daily_topics
()
if
topics
:
logger
.
info
(
f
"今日话题关键词: {topics['keywords']}"
)
else
:
logger
.
info
(
"今日暂无话题分析"
)
logger
.
info
(
"简化数据库管理器测试完成!"
)
...
...
Please
register
or
login
to post a comment