test_db_sync.py
8.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
# -*- coding: utf-8 -*-
# @Author : persist-1<persist1@126.com>
# @Time : 2025/9/8 00:02
# @Desc : 用于将orm映射模型(database/models.py)与两种数据库实际结构进行对比,并进行更新操作(连接数据库->结构比对->差异报告->交互式同步)
# @Tips : 该脚本需要安装依赖'pymysql==1.1.0'
import os
import sys
from sqlalchemy import create_engine, inspect as sqlalchemy_inspect
from sqlalchemy.schema import MetaData
# 将项目根目录添加到 sys.path
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from config.db_config import mysql_db_config, sqlite_db_config
from database.models import Base
def get_mysql_engine():
"""创建并返回一个MySQL数据库引擎"""
conn_str = f"mysql+pymysql://{mysql_db_config['user']}:{mysql_db_config['password']}@{mysql_db_config['host']}:{mysql_db_config['port']}/{mysql_db_config['db_name']}"
return create_engine(conn_str)
def get_sqlite_engine():
"""创建并返回一个SQLite数据库引擎"""
conn_str = f"sqlite:///{sqlite_db_config['db_path']}"
return create_engine(conn_str)
def get_db_schema(engine):
"""获取数据库的当前表结构"""
inspector = sqlalchemy_inspect(engine)
schema = {}
for table_name in inspector.get_table_names():
columns = {}
for column in inspector.get_columns(table_name):
columns[column['name']] = str(column['type'])
schema[table_name] = columns
return schema
def get_orm_schema():
"""获取ORM模型的表结构"""
schema = {}
for table_name, table in Base.metadata.tables.items():
columns = {}
for column in table.columns:
columns[column.name] = str(column.type)
schema[table_name] = columns
return schema
def compare_schemas(db_schema, orm_schema):
"""比较数据库结构和ORM模型结构,返回差异"""
db_tables = set(db_schema.keys())
orm_tables = set(orm_schema.keys())
added_tables = orm_tables - db_tables
deleted_tables = db_tables - orm_tables
common_tables = db_tables.intersection(orm_tables)
changed_tables = {}
for table in common_tables:
db_cols = set(db_schema[table].keys())
orm_cols = set(orm_schema[table].keys())
added_cols = orm_cols - db_cols
deleted_cols = db_cols - orm_cols
modified_cols = {}
for col in db_cols.intersection(orm_cols):
if db_schema[table][col] != orm_schema[table][col]:
modified_cols[col] = (db_schema[table][col], orm_schema[table][col])
if added_cols or deleted_cols or modified_cols:
changed_tables[table] = {
"added": list(added_cols),
"deleted": list(deleted_cols),
"modified": modified_cols
}
return {
"added_tables": list(added_tables),
"deleted_tables": list(deleted_tables),
"changed_tables": changed_tables
}
def print_diff(db_name, diff):
"""打印差异报告"""
print(f"--- {db_name} 数据库结构差异报告 ---")
if not any(diff.values()):
print("数据库结构与ORM模型一致,无需同步。")
return
if diff.get("added_tables"):
print("\n[+] 新增的表:")
for table in diff["added_tables"]:
print(f" - {table}")
if diff.get("deleted_tables"):
print("\n[-] 删除的表:")
for table in diff["deleted_tables"]:
print(f" - {table}")
if diff.get("changed_tables"):
print("\n[*] 变动的表:")
for table, changes in diff["changed_tables"].items():
print(f" - {table}:")
if changes.get("added"):
print(" [+] 新增字段:", ", ".join(changes["added"]))
if changes.get("deleted"):
print(" [-] 删除字段:", ", ".join(changes["deleted"]))
if changes.get("modified"):
print(" [*] 修改字段:")
for col, types in changes["modified"].items():
print(f" - {col}: {types[0]} -> {types[1]}")
print("--- 报告结束 ---")
def sync_database(engine, diff):
"""将ORM模型同步到数据库"""
metadata = Base.metadata
# Alembic的上下文配置
from alembic.migration import MigrationContext
from alembic.operations import Operations
conn = engine.connect()
ctx = MigrationContext.configure(conn)
op = Operations(ctx)
# 处理删除的表
for table_name in diff['deleted_tables']:
op.drop_table(table_name)
print(f"已删除表: {table_name}")
# 处理新增的表
for table_name in diff['added_tables']:
table = metadata.tables.get(table_name)
if table is not None:
table.create(engine)
print(f"已创建表: {table_name}")
# 处理字段变更
for table_name, changes in diff['changed_tables'].items():
# 删除字段
for col_name in changes['deleted']:
op.drop_column(table_name, col_name)
print(f"在表 {table_name} 中已删除字段: {col_name}")
# 新增字段
for col_name in changes['added']:
table = metadata.tables.get(table_name)
column = table.columns.get(col_name)
if column is not None:
op.add_column(table_name, column)
print(f"在表 {table_name} 中已新增字段: {col_name}")
# 修改字段
for col_name, types in changes['modified'].items():
table = metadata.tables.get(table_name)
if table is not None:
column = table.columns.get(col_name)
if column is not None:
op.alter_column(table_name, col_name, type_=column.type)
print(f"在表 {table_name} 中已修改字段: {col_name} (类型变为 {column.type})")
def main():
"""主函数"""
orm_schema = get_orm_schema()
# 处理 MySQL
try:
mysql_engine = get_mysql_engine()
mysql_schema = get_db_schema(mysql_engine)
mysql_diff = compare_schemas(mysql_schema, orm_schema)
print_diff("MySQL", mysql_diff)
if any(mysql_diff.values()):
choice = input(">>> 需要人工确认:是否要将ORM模型同步到MySQL数据库? (y/N): ")
if choice.lower() == 'y':
sync_database(mysql_engine, mysql_diff)
print("MySQL数据库同步完成。")
except Exception as e:
print(f"处理MySQL时出错: {e}")
# 处理 SQLite
try:
sqlite_engine = get_sqlite_engine()
sqlite_schema = get_db_schema(sqlite_engine)
sqlite_diff = compare_schemas(sqlite_schema, orm_schema)
print_diff("SQLite", sqlite_diff)
if any(sqlite_diff.values()):
choice = input(">>> 需要人工确认:是否要将ORM模型同步到SQLite数据库? (y/N): ")
if choice.lower() == 'y':
# 注意:SQLite不支持ALTER COLUMN来修改字段类型,这里简化处理
print("警告:SQLite的字段修改支持有限,此脚本不会执行修改字段类型的操作。")
sync_database(sqlite_engine, sqlite_diff)
print("SQLite数据库同步完成。")
except Exception as e:
print(f"处理SQLite时出错: {e}")
if __name__ == "__main__":
main()
######################### Feedback example #########################
# [*] 变动的表:
# - kuaishou_video:
# [*] 修改字段:
# - user_id: TEXT -> VARCHAR(64)
# - xhs_note_comment:
# [*] 修改字段:
# - comment_id: BIGINT -> VARCHAR(255)
# - zhihu_content:
# [*] 修改字段:
# - created_time: BIGINT -> VARCHAR(32)
# - content_id: BIGINT -> VARCHAR(64)
# - zhihu_creator:
# [*] 修改字段:
# - user_id: INTEGER -> VARCHAR(64)
# - tieba_note:
# [*] 修改字段:
# - publish_time: BIGINT -> VARCHAR(255)
# - tieba_id: INTEGER -> VARCHAR(255)
# - note_id: BIGINT -> VARCHAR(644)
# --- 报告结束 ---
# >>> 需要人工确认:是否要将ORM模型同步到MySQL数据库? (y/N): y
# 在表 kuaishou_video 中已修改字段: user_id (类型变为 VARCHAR(64))
# 在表 xhs_note_comment 中已修改字段: comment_id (类型变为 VARCHAR(255))
# 在表 zhihu_content 中已修改字段: created_time (类型变为 VARCHAR(32))
# 在表 zhihu_content 中已修改字段: content_id (类型变为 VARCHAR(64))
# 在表 zhihu_creator 中已修改字段: user_id (类型变为 VARCHAR(64))
# 在表 tieba_note 中已修改字段: publish_time (类型变为 VARCHAR(255))
# 在表 tieba_note 中已修改字段: tieba_id (类型变为 VARCHAR(255))
# 在表 tieba_note 中已修改字段: note_id (类型变为 VARCHAR(644))
# MySQL数据库同步完成。