Toggle navigation
Toggle navigation
This project
Loading...
Sign in
万朱浩
/
Venue-Ops
Go to a project
Toggle navigation
Projects
Groups
Snippets
Help
Toggle navigation pinning
Project
Activity
Repository
Pipelines
Graphs
Issues
0
Merge Requests
0
Wiki
Network
Create a new issue
Builds
Commits
Authored by
Doiiars
2025-11-05 17:25:22 +0800
Browse Files
Options
Browse Files
Download
Plain Diff
Committed by
GitHub
2025-11-05 17:25:22 +0800
Commit
3abe20b0ddd0f0166290d5e347e70ec085411f64
3abe20b0
2 parents
4ee5fa96
74110c4c
Merge pull request #140 from DoiiarX/fix-db-module
修复缺失的文件
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
70 additions
and
0 deletions
InsightEngine/utils/db.py
InsightEngine/utils/db.py
0 → 100644
View file @
3abe20b
"""
通用数据库工具(异步)
此模块提供基于 SQLAlchemy 2.x 异步引擎的数据库访问封装,支持 MySQL 与 PostgreSQL。
数据模型定义位置:
- 无(本模块仅提供连接与查询工具,不定义数据模型)
"""
from
__future__
import
annotations
import
asyncio
import
os
from
typing
import
Any
,
Dict
,
Iterable
,
List
,
Optional
,
Union
from
sqlalchemy.ext.asyncio
import
AsyncEngine
,
AsyncSession
,
create_async_engine
from
sqlalchemy
import
text
from
InsightEngine.utils.config
import
settings
__all__
=
[
"get_async_engine"
,
"fetch_all"
,
]
_engine
:
Optional
[
AsyncEngine
]
=
None
def
_build_database_url
()
->
str
:
dialect
:
str
=
(
settings
.
DB_DIALECT
or
"mysql"
)
.
lower
()
host
:
str
=
settings
.
DB_HOST
or
""
port
:
str
=
str
(
settings
.
DB_PORT
or
""
)
user
:
str
=
settings
.
DB_USER
or
""
password
:
str
=
settings
.
DB_PASSWORD
or
""
db_name
:
str
=
settings
.
DB_NAME
or
""
if
os
.
getenv
(
"DATABASE_URL"
):
return
os
.
getenv
(
"DATABASE_URL"
)
# 直接使用外部提供的完整URL
if
dialect
in
(
"postgresql"
,
"postgres"
):
# PostgreSQL 使用 asyncpg 驱动
return
f
"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}"
# 默认 MySQL 使用 aiomysql 驱动
return
f
"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}"
def
get_async_engine
()
->
AsyncEngine
:
global
_engine
if
_engine
is
None
:
database_url
:
str
=
_build_database_url
()
_engine
=
create_async_engine
(
database_url
,
pool_pre_ping
=
True
,
pool_recycle
=
1800
,
)
return
_engine
async
def
fetch_all
(
query
:
str
,
params
:
Optional
[
Union
[
Iterable
[
Any
],
Dict
[
str
,
Any
]]]
=
None
)
->
List
[
Dict
[
str
,
Any
]]:
"""
执行只读查询并返回字典列表。
"""
engine
:
AsyncEngine
=
get_async_engine
()
async
with
engine
.
connect
()
as
conn
:
result
=
await
conn
.
execute
(
text
(
query
),
params
or
{})
rows
=
result
.
mappings
()
.
all
()
# 将 RowMapping 转换为普通字典
return
[
dict
(
row
)
for
row
in
rows
]
...
...
Please
register
or
login
to post a comment