Toggle navigation
Toggle navigation
This project
Loading...
Sign in
万朱浩
/
Venue-Ops
Go to a project
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
Doiiars
2025-11-05 14:40:07 +0800
Browse Files
Options
Browse Files
Download
Email Patches
Plain Diff
Commit
22ed16e00ef7c2c0b235b9b3f4a7b4b5f4a584d4
22ed16e0
1 parent
a8249c46
修复数据库初始化问题、删除过时的数据库格式方法、修复空停用词错误。
Show whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
111 additions
and
275 deletions
MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py
MindSpider/schema/db_manager.py
MindSpider/schema/init_database.py
MindSpider/schema/init_database_sa.py
requirements.txt
MindSpider/DeepSentimentCrawling/MediaCrawler/tools/words.py
View file @
22ed16e
...
...
@@ -35,6 +35,8 @@ class AsyncWordCloudGenerator:
jieba
.
add_word
(
word
)
def
load_stop_words
(
self
):
if
not
os
.
path
.
exists
(
self
.
stop_words_file
):
return
set
()
with
open
(
self
.
stop_words_file
,
'r'
,
encoding
=
'utf-8'
)
as
f
:
return
set
(
f
.
read
()
.
strip
()
.
split
(
'
\n
'
))
...
...
MindSpider/schema/db_manager.py
View file @
22ed16e
...
...
@@ -24,7 +24,7 @@ except ImportError:
logger
.
error
(
"错误: 无法导入config.py配置文件"
)
sys
.
exit
(
1
)
from
MindSpider.
config
import
settings
from
config
import
settings
class
DatabaseManager
:
def
__init__
(
self
):
...
...
MindSpider/schema/init_database.py
View file @
22ed16e
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
MindSpider AI爬虫项目 - 数据库初始化脚本
用于创建项目所需的所有数据库表
MindSpider 数据库初始化(SQLAlchemy 2.x 异步引擎)
此脚本创建 MindSpider 扩展表(与 MediaCrawler 原始表分离)。
支持 MySQL 与 PostgreSQL,需已有可连接的数据库实例。
数据模型定义位置:
- MindSpider/schema/models_sa.py
"""
from
__future__
import
annotations
import
asyncio
import
os
from
typing
import
Optional
from
loguru
import
logger
from
sqlalchemy.ext.asyncio
import
create_async_engine
from
sqlalchemy
import
text
from
models_sa
import
Base
# 导入 models_bigdata 以确保所有表类被注册到 Base.metadata
# models_bigdata 现在也使用 models_sa 的 Base,所以所有表都在同一个 metadata 中
import
models_bigdata
# noqa: F401 # 导入以注册所有表类
import
sys
import
pymysql
from
pathlib
import
Path
from
MindSpider.config
import
settings
# 添加项目根目录到路径
project_root
=
Path
(
__file__
)
.
parent
.
parent
sys
.
path
.
append
(
str
(
project_root
))
# 导入配置
try
:
import
config
except
ImportError
:
print
(
"错误: 无法导入config.py配置文件"
)
print
(
"请确保config.py文件存在于项目根目录"
)
sys
.
exit
(
1
)
def
create_database_connection
():
"""创建数据库连接"""
try
:
connection
=
pymysql
.
connect
(
host
=
settings
.
db_host
,
port
=
settings
.
db_port
,
user
=
settings
.
db_user
,
password
=
settings
.
db_password
,
charset
=
settings
.
db_charset
,
autocommit
=
True
from
config
import
settings
def
_env
(
key
:
str
,
default
:
Optional
[
str
]
=
None
)
->
Optional
[
str
]:
v
=
os
.
getenv
(
key
)
return
v
if
v
not
in
(
None
,
""
)
else
default
def
_build_database_url
()
->
str
:
# 优先 DATABASE_URL
database_url
=
settings
.
DATABASE_URL
if
hasattr
(
settings
,
"DATABASE_URL"
)
else
None
if
database_url
:
return
database_url
dialect
=
(
settings
.
DB_DIALECT
or
"mysql"
)
.
lower
()
host
=
settings
.
DB_HOST
or
"localhost"
port
=
str
(
settings
.
DB_PORT
or
(
"3306"
if
dialect
==
"mysql"
else
"5432"
))
user
=
settings
.
DB_USER
or
"root"
password
=
settings
.
DB_PASSWORD
or
""
db_name
=
settings
.
DB_NAME
or
"mindspider"
if
dialect
in
(
"postgresql"
,
"postgres"
):
return
f
"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}"
return
f
"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}"
async
def
_create_views_if_needed
(
engine_dialect
:
str
):
# 视图为可选;仅当业务需要时创建。两端使用通用 SQL 聚合避免方言函数。
# 如不需要视图,可跳过。
engine_dialect
=
engine_dialect
.
lower
()
v_topic_crawling_stats
=
(
"CREATE OR REPLACE VIEW v_topic_crawling_stats AS
\n
"
"SELECT dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status,
\n
"
" COUNT(DISTINCT ct.task_id) AS total_tasks,
\n
"
" SUM(CASE WHEN ct.task_status = 'completed' THEN 1 ELSE 0 END) AS completed_tasks,
\n
"
" SUM(CASE WHEN ct.task_status = 'failed' THEN 1 ELSE 0 END) AS failed_tasks,
\n
"
" SUM(COALESCE(ct.total_crawled,0)) AS total_content_crawled,
\n
"
" SUM(COALESCE(ct.success_count,0)) AS total_success_count,
\n
"
" SUM(COALESCE(ct.error_count,0)) AS total_error_count
\n
"
"FROM daily_topics dt
\n
"
"LEFT JOIN crawling_tasks ct ON dt.topic_id = ct.topic_id
\n
"
"GROUP BY dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status"
)
v_daily_summary
=
(
"CREATE OR REPLACE VIEW v_daily_summary AS
\n
"
"SELECT dn.crawl_date AS crawl_date,
\n
"
" COUNT(DISTINCT dn.news_id) AS total_news,
\n
"
" COUNT(DISTINCT dn.source_platform) AS platforms_covered,
\n
"
" (SELECT COUNT(*) FROM daily_topics WHERE extract_date = dn.crawl_date) AS topics_extracted,
\n
"
" (SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date = dn.crawl_date) AS tasks_created
\n
"
"FROM daily_news dn
\n
"
"GROUP BY dn.crawl_date
\n
"
"ORDER BY dn.crawl_date DESC"
)
print
(
f
"成功连接到MySQL服务器: {settings.db_host}:{settings.db_port}"
)
return
connection
except
Exception
as
e
:
print
(
f
"连接数据库失败: {e}"
)
return
None
def
create_database
(
connection
):
"""创建数据库"""
try
:
cursor
=
connection
.
cursor
()
cursor
.
execute
(
f
"CREATE DATABASE IF NOT EXISTS `{settings.db_name}` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci"
)
cursor
.
execute
(
f
"USE `{settings.db_name}`"
)
print
(
f
"数据库 '{settings.db_name}' 创建/选择成功"
)
return
True
except
Exception
as
e
:
print
(
f
"创建数据库失败: {e}"
)
return
False
def
execute_sql_file
(
connection
,
sql_file_path
,
description
=
""
):
"""执行SQL文件"""
if
not
os
.
path
.
exists
(
sql_file_path
):
print
(
f
"警告: SQL文件不存在: {sql_file_path}"
)
return
False
try
:
cursor
=
connection
.
cursor
()
with
open
(
sql_file_path
,
'r'
,
encoding
=
'utf-8'
)
as
f
:
sql_content
=
f
.
read
()
# 分割SQL语句(简单实现,按分号分割)
sql_statements
=
[
stmt
.
strip
()
for
stmt
in
sql_content
.
split
(
';'
)
if
stmt
.
strip
()]
success_count
=
0
error_count
=
0
for
stmt
in
sql_statements
:
if
not
stmt
or
stmt
.
startswith
(
'--'
):
continue
try
:
cursor
.
execute
(
stmt
)
success_count
+=
1
except
Exception
as
e
:
error_count
+=
1
print
(
f
"执行SQL语句失败: {str(e)[:100]}..."
)
print
(
f
"{description} - 成功执行: {success_count} 条语句, 失败: {error_count} 条语句"
)
return
error_count
==
0
except
Exception
as
e
:
print
(
f
"执行SQL文件失败 {sql_file_path}: {e}"
)
return
False
def
main
():
"""主函数"""
print
(
"="
*
60
)
print
(
"MindSpider AI爬虫项目 - 数据库初始化"
)
print
(
"="
*
60
)
# 检查配置
print
(
"检查数据库配置..."
)
print
(
f
"数据库主机: {settings.db_host}"
)
print
(
f
"数据库端口: {settings.db_port}"
)
print
(
f
"数据库名称: {settings.db_name}"
)
print
(
f
"数据库用户: {settings.db_user}"
)
print
(
f
"字符集: {settings.db_charset}"
)
print
()
# 创建数据库连接
print
(
"正在连接数据库..."
)
connection
=
create_database_connection
()
if
not
connection
:
print
(
"数据库初始化失败!"
)
return
False
try
:
# 创建数据库
print
(
"正在创建/选择数据库..."
)
if
not
create_database
(
connection
):
return
False
# 获取SQL文件路径
schema_dir
=
Path
(
__file__
)
.
parent
mediacrawler_sql
=
schema_dir
.
parent
/
"DeepSentimentCrawling"
/
"MediaCrawler"
/
"schema"
/
"tables.sql"
mindspider_sql
=
schema_dir
/
"mindspider_tables.sql"
print
()
print
(
"开始执行SQL脚本..."
)
# 1. 执行MediaCrawler的原始表结构
if
mediacrawler_sql
.
exists
():
print
(
"1. 创建MediaCrawler基础表..."
)
execute_sql_file
(
connection
,
str
(
mediacrawler_sql
),
"MediaCrawler基础表"
)
else
:
print
(
"警告: MediaCrawler SQL文件不存在,跳过基础表创建"
)
# 2. 执行MindSpider扩展表结构
print
(
"2. 创建MindSpider扩展表..."
)
if
mindspider_sql
.
exists
():
execute_sql_file
(
connection
,
str
(
mindspider_sql
),
"MindSpider扩展表"
)
else
:
print
(
"错误: MindSpider SQL文件不存在"
)
return
False
print
()
print
(
"="
*
60
)
print
(
"数据库初始化完成!"
)
print
(
"="
*
60
)
# 显示创建的表
cursor
=
connection
.
cursor
()
cursor
.
execute
(
"SHOW TABLES"
)
tables
=
cursor
.
fetchall
()
print
(
f
"数据库 '{settings.db_name}' 中共创建了 {len(tables)} 个表:"
)
for
table
in
tables
:
print
(
f
" - {table[0]}"
)
print
()
print
(
"数据库初始化成功完成!您现在可以开始使用MindSpider了。"
)
return
True
except
Exception
as
e
:
print
(
f
"数据库初始化过程中发生错误: {e}"
)
return
False
finally
:
if
connection
:
connection
.
close
()
print
(
"数据库连接已关闭"
)
# PostgreSQL 的 CREATE OR REPLACE VIEW 也可用;两端均执行
from
sqlalchemy.ext.asyncio
import
AsyncEngine
engine
:
AsyncEngine
=
create_async_engine
(
_build_database_url
())
async
with
engine
.
begin
()
as
conn
:
await
conn
.
execute
(
text
(
v_topic_crawling_stats
))
await
conn
.
execute
(
text
(
v_daily_summary
))
await
engine
.
dispose
()
async
def
main
()
->
None
:
database_url
=
_build_database_url
()
engine
=
create_async_engine
(
database_url
,
pool_pre_ping
=
True
,
pool_recycle
=
1800
)
# 由于 models_bigdata 和 models_sa 现在共享同一个 Base,所有表都在同一个 metadata 中
# 只需创建一次,SQLAlchemy 会自动处理表之间的依赖关系
async
with
engine
.
begin
()
as
conn
:
await
conn
.
run_sync
(
Base
.
metadata
.
create_all
)
# 保持原有视图创建和释放逻辑
dialect_name
=
engine
.
url
.
get_backend_name
()
await
_create_views_if_needed
(
dialect_name
)
await
engine
.
dispose
()
logger
.
info
(
"[init_database_sa] 数据表与视图创建完成"
)
if
__name__
==
"__main__"
:
success
=
main
()
sys
.
exit
(
0
if
success
else
1
)
asyncio
.
run
(
main
())
...
...
MindSpider/schema/init_database_sa.py
deleted
100644 → 0
View file @
a8249c4
"""
MindSpider 数据库初始化(SQLAlchemy 2.x 异步引擎)
此脚本创建 MindSpider 扩展表(与 MediaCrawler 原始表分离)。
支持 MySQL 与 PostgreSQL,需已有可连接的数据库实例。
数据模型定义位置:
- MindSpider/schema/models_sa.py
"""
from
__future__
import
annotations
import
asyncio
import
os
from
typing
import
Optional
from
loguru
import
logger
from
sqlalchemy.ext.asyncio
import
create_async_engine
from
sqlalchemy
import
text
from
models_sa
import
Base
# 导入 models_bigdata 以确保所有表类被注册到 Base.metadata
# models_bigdata 现在也使用 models_sa 的 Base,所以所有表都在同一个 metadata 中
import
models_bigdata
# noqa: F401 # 导入以注册所有表类
import
sys
from
pathlib
import
Path
# 添加项目根目录到路径
project_root
=
Path
(
__file__
)
.
parent
.
parent
sys
.
path
.
append
(
str
(
project_root
))
from
config
import
settings
def
_env
(
key
:
str
,
default
:
Optional
[
str
]
=
None
)
->
Optional
[
str
]:
v
=
os
.
getenv
(
key
)
return
v
if
v
not
in
(
None
,
""
)
else
default
def
_build_database_url
()
->
str
:
# 优先 DATABASE_URL
database_url
=
settings
.
DATABASE_URL
if
hasattr
(
settings
,
"DATABASE_URL"
)
else
None
if
database_url
:
return
database_url
dialect
=
(
settings
.
DB_DIALECT
or
"mysql"
)
.
lower
()
host
=
settings
.
DB_HOST
or
"localhost"
port
=
str
(
settings
.
DB_PORT
or
(
"3306"
if
dialect
==
"mysql"
else
"5432"
))
user
=
settings
.
DB_USER
or
"root"
password
=
settings
.
DB_PASSWORD
or
""
db_name
=
settings
.
DB_NAME
or
"mindspider"
if
dialect
in
(
"postgresql"
,
"postgres"
):
return
f
"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}"
return
f
"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}"
async
def
_create_views_if_needed
(
engine_dialect
:
str
):
# 视图为可选;仅当业务需要时创建。两端使用通用 SQL 聚合避免方言函数。
# 如不需要视图,可跳过。
engine_dialect
=
engine_dialect
.
lower
()
v_topic_crawling_stats
=
(
"CREATE OR REPLACE VIEW v_topic_crawling_stats AS
\n
"
"SELECT dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status,
\n
"
" COUNT(DISTINCT ct.task_id) AS total_tasks,
\n
"
" SUM(CASE WHEN ct.task_status = 'completed' THEN 1 ELSE 0 END) AS completed_tasks,
\n
"
" SUM(CASE WHEN ct.task_status = 'failed' THEN 1 ELSE 0 END) AS failed_tasks,
\n
"
" SUM(COALESCE(ct.total_crawled,0)) AS total_content_crawled,
\n
"
" SUM(COALESCE(ct.success_count,0)) AS total_success_count,
\n
"
" SUM(COALESCE(ct.error_count,0)) AS total_error_count
\n
"
"FROM daily_topics dt
\n
"
"LEFT JOIN crawling_tasks ct ON dt.topic_id = ct.topic_id
\n
"
"GROUP BY dt.topic_id, dt.topic_name, dt.extract_date, dt.processing_status"
)
v_daily_summary
=
(
"CREATE OR REPLACE VIEW v_daily_summary AS
\n
"
"SELECT dn.crawl_date AS crawl_date,
\n
"
" COUNT(DISTINCT dn.news_id) AS total_news,
\n
"
" COUNT(DISTINCT dn.source_platform) AS platforms_covered,
\n
"
" (SELECT COUNT(*) FROM daily_topics WHERE extract_date = dn.crawl_date) AS topics_extracted,
\n
"
" (SELECT COUNT(*) FROM crawling_tasks WHERE scheduled_date = dn.crawl_date) AS tasks_created
\n
"
"FROM daily_news dn
\n
"
"GROUP BY dn.crawl_date
\n
"
"ORDER BY dn.crawl_date DESC"
)
# PostgreSQL 的 CREATE OR REPLACE VIEW 也可用;两端均执行
from
sqlalchemy.ext.asyncio
import
AsyncEngine
engine
:
AsyncEngine
=
create_async_engine
(
_build_database_url
())
async
with
engine
.
begin
()
as
conn
:
await
conn
.
execute
(
text
(
v_topic_crawling_stats
))
await
conn
.
execute
(
text
(
v_daily_summary
))
await
engine
.
dispose
()
async
def
main
()
->
None
:
database_url
=
_build_database_url
()
engine
=
create_async_engine
(
database_url
,
pool_pre_ping
=
True
,
pool_recycle
=
1800
)
# 由于 models_bigdata 和 models_sa 现在共享同一个 Base,所有表都在同一个 metadata 中
# 只需创建一次,SQLAlchemy 会自动处理表之间的依赖关系
async
with
engine
.
begin
()
as
conn
:
await
conn
.
run_sync
(
Base
.
metadata
.
create_all
)
# 保持原有视图创建和释放逻辑
dialect_name
=
engine
.
url
.
get_backend_name
()
await
_create_views_if_needed
(
dialect_name
)
await
engine
.
dispose
()
logger
.
info
(
"[init_database_sa] 数据表与视图创建完成"
)
if
__name__
==
"__main__"
:
asyncio
.
run
(
main
())
requirements.txt
View file @
22ed16e
...
...
@@ -34,6 +34,8 @@ pymysql==1.1.0
aiomysql==0.2.0
aiosqlite==0.21.0
redis>=4.6.0
SQLAlchemy==2.0.35
asyncpg==0.29.0
# ===== 爬虫相关 =====
playwright==1.45.0
...
...
@@ -64,6 +66,7 @@ tqdm>=4.65.0
tenacity==8.2.2
loguru>=0.7.0
pydantic==2.5.2
pydantic-settings==2.2.1
# ===== 开发工具(可选) =====
pytest>=7.4.0
...
...
Please
register
or
login
to post a comment