db.py
2.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""
通用数据库工具(异步)
此模块提供基于 SQLAlchemy 2.x 异步引擎的数据库访问封装,支持 MySQL 与 PostgreSQL。
数据模型定义位置:
- 无(本模块仅提供连接与查询工具,不定义数据模型)
"""
from __future__ import annotations
import asyncio
import os
from typing import Any, Dict, Iterable, List, Optional, Union
from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, create_async_engine
from sqlalchemy import text
from InsightEngine.utils.config import settings
__all__ = [
"get_async_engine",
"fetch_all",
]
_engine: Optional[AsyncEngine] = None
def _build_database_url() -> str:
dialect: str = (settings.DB_DIALECT or "mysql").lower()
host: str = settings.DB_HOST or ""
port: str = str(settings.DB_PORT or "")
user: str = settings.DB_USER or ""
password: str = settings.DB_PASSWORD or ""
db_name: str = settings.DB_NAME or ""
if os.getenv("DATABASE_URL"):
return os.getenv("DATABASE_URL") # 直接使用外部提供的完整URL
if dialect in ("postgresql", "postgres"):
# PostgreSQL 使用 asyncpg 驱动
return f"postgresql+asyncpg://{user}:{password}@{host}:{port}/{db_name}"
# 默认 MySQL 使用 aiomysql 驱动
return f"mysql+aiomysql://{user}:{password}@{host}:{port}/{db_name}"
def get_async_engine() -> AsyncEngine:
global _engine
if _engine is None:
database_url: str = _build_database_url()
_engine = create_async_engine(
database_url,
pool_pre_ping=True,
pool_recycle=1800,
)
return _engine
async def fetch_all(query: str, params: Optional[Union[Iterable[Any], Dict[str, Any]]] = None) -> List[Dict[str, Any]]:
"""
执行只读查询并返回字典列表。
"""
engine: AsyncEngine = get_async_engine()
async with engine.connect() as conn:
result = await conn.execute(text(query), params or {})
rows = result.mappings().all()
# 将 RowMapping 转换为普通字典
return [dict(row) for row in rows]