GH/utils/database_exporter.py
AI Developer 2ec2c0a1ab feat: 完整的数据提取与转换器项目
- 添加MDF文件导出功能
- 集成阿里云OCR大模型识别
- 添加百度智能云AI照片评分
- 集成DeepSeek大模型创意文案生成
- 完善文档和配置管理
- 使用uv进行现代化依赖管理
- 添加完整的.gitignore配置
2026-01-08 20:25:49 +08:00

300 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pandas as pd
from sqlalchemy import create_engine, inspect
import sqlite3
import os
import pyodbc
from pathlib import Path
def export_sqlite_to_excel(db_path, output_path, table_name=None):
"""SQLite数据库导出为Excel"""
try:
# 连接SQLite数据库
conn = sqlite3.connect(db_path)
# 获取所有表名
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [table[0] for table in cursor.fetchall()]
if table_name:
# 导出指定表
if table_name in tables:
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
df.to_excel(output_path, index=False)
else:
raise Exception(f"'{table_name}' 不存在")
else:
# 导出所有表到同一个Excel文件的不同sheet
with pd.ExcelWriter(output_path) as writer:
for table in tables:
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
df.to_excel(writer, sheet_name=table, index=False)
conn.close()
return True
except Exception as e:
raise Exception(f"SQLite导出Excel失败: {str(e)}")
def export_mysql_to_excel(host, user, password, database, output_path, table_name=None):
"""MySQL数据库导出为Excel"""
try:
# 创建MySQL连接
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
# 获取所有表名
inspector = inspect(engine)
tables = inspector.get_table_names()
if table_name:
# 导出指定表
if table_name in tables:
df = pd.read_sql_table(table_name, engine)
df.to_excel(output_path, index=False)
else:
raise Exception(f"'{table_name}' 不存在")
else:
# 导出所有表到同一个Excel文件的不同sheet
with pd.ExcelWriter(output_path) as writer:
for table in tables:
df = pd.read_sql_table(table, engine)
df.to_excel(writer, sheet_name=table, index=False)
return True
except Exception as e:
raise Exception(f"MySQL导出Excel失败: {str(e)}")
def database_to_csv(db_path, output_path, table_name=None):
"""数据库导出为CSV"""
try:
if db_path.endswith('.db') or db_path.endswith('.sqlite'):
# SQLite数据库
conn = sqlite3.connect(db_path)
if table_name:
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
df.to_csv(output_path, index=False, encoding='utf-8-sig')
else:
# 导出所有表到不同的CSV文件
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [table[0] for table in cursor.fetchall()]
for table in tables:
csv_file = output_path.replace('.csv', f'_{table}.csv')
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
conn.close()
elif db_path.endswith('.mdf'):
# SQL Server数据库文件
export_mssql_mdf_to_csv(db_path, output_path, table_name)
else:
raise Exception("不支持的数据库格式")
return True
except Exception as e:
raise Exception(f"数据库导出CSV失败: {str(e)}")
def database_to_json(db_path, output_path, table_name=None):
"""数据库导出为JSON"""
try:
import json
if db_path.endswith('.db') or db_path.endswith('.sqlite'):
# SQLite数据库
conn = sqlite3.connect(db_path)
if table_name:
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
data = df.to_dict('records')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
else:
# 导出所有表到不同的JSON文件
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
tables = [table[0] for table in cursor.fetchall()]
for table in tables:
json_file = output_path.replace('.json', f'_{table}.json')
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
data = df.to_dict('records')
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
conn.close()
elif db_path.endswith('.mdf'):
# SQL Server数据库文件
export_mssql_mdf_to_json(db_path, output_path, table_name)
else:
raise Exception("不支持的数据库格式")
return True
except Exception as e:
raise Exception(f"数据库导出JSON失败: {str(e)}")
def export_mssql_mdf_to_excel(mdf_path, output_path, table_name=None, server='localhost',
username='sa', password='', instance='MSSQLSERVER'):
"""SQL Server MDF文件导出为Excel"""
try:
# 连接到SQL Server实例并附加MDF文件
database_name = Path(mdf_path).stem
# 创建连接字符串
if instance == 'MSSQLSERVER':
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE=master;UID={username};PWD={password}"
else:
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE=master;UID={username};PWD={password}"
# 连接到master数据库
conn = pyodbc.connect(connection_string)
cursor = conn.cursor()
# 检查数据库是否已存在
cursor.execute(f"SELECT name FROM sys.databases WHERE name = '{database_name}'")
if cursor.fetchone():
# 数据库已存在,直接使用
pass
else:
# 附加MDF文件到SQL Server
mdf_full_path = os.path.abspath(mdf_path)
ldf_path = mdf_path.replace('.mdf', '_log.ldf')
if not os.path.exists(ldf_path):
ldf_path = mdf_path.replace('.mdf', '.ldf')
attach_sql = f"""
CREATE DATABASE [{database_name}]
ON (FILENAME = '{mdf_full_path}')
"""
if os.path.exists(ldf_path):
attach_sql += f", (FILENAME = '{os.path.abspath(ldf_path)}')"
attach_sql += " FOR ATTACH"
try:
cursor.execute(attach_sql)
conn.commit()
except Exception as attach_error:
# 如果附加失败,尝试直接连接(假设数据库已在运行)
print(f"附加数据库失败,尝试直接连接: {attach_error}")
# 关闭连接并重新连接到目标数据库
conn.close()
if instance == 'MSSQLSERVER':
db_connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
else:
db_connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
# 使用SQLAlchemy连接
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={db_connection_string.replace(';', '&')}")
# 获取所有表名
inspector = inspect(engine)
tables = inspector.get_table_names()
if table_name:
# 导出指定表
if table_name in tables:
df = pd.read_sql_table(table_name, engine)
df.to_excel(output_path, index=False)
else:
raise Exception(f"'{table_name}' 不存在")
else:
# 导出所有表到同一个Excel文件的不同sheet
with pd.ExcelWriter(output_path) as writer:
for table in tables:
df = pd.read_sql_table(table, engine)
# 处理表名长度限制Excel sheet名最多31字符
sheet_name = table[:31] if len(table) > 31 else table
df.to_excel(writer, sheet_name=sheet_name, index=False)
return True
except Exception as e:
raise Exception(f"SQL Server MDF导出Excel失败: {str(e)}")
def export_mssql_mdf_to_csv(mdf_path, output_path, table_name=None, server='localhost',
username='sa', password='', instance='MSSQLSERVER'):
"""SQL Server MDF文件导出为CSV"""
try:
database_name = Path(mdf_path).stem
# 创建连接字符串
if instance == 'MSSQLSERVER':
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
else:
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
# 使用SQLAlchemy连接
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}")
# 获取所有表名
inspector = inspect(engine)
tables = inspector.get_table_names()
if table_name:
# 导出指定表
if table_name in tables:
df = pd.read_sql_table(table_name, engine)
df.to_csv(output_path, index=False, encoding='utf-8-sig')
else:
raise Exception(f"'{table_name}' 不存在")
else:
# 导出所有表到不同的CSV文件
for table in tables:
csv_file = output_path.replace('.csv', f'_{table}.csv')
df = pd.read_sql_table(table, engine)
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
return True
except Exception as e:
raise Exception(f"SQL Server MDF导出CSV失败: {str(e)}")
def export_mssql_mdf_to_json(mdf_path, output_path, table_name=None, server='localhost',
username='sa', password='', instance='MSSQLSERVER'):
"""SQL Server MDF文件导出为JSON"""
try:
import json
database_name = Path(mdf_path).stem
# 创建连接字符串
if instance == 'MSSQLSERVER':
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
else:
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
# 使用SQLAlchemy连接
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}")
# 获取所有表名
inspector = inspect(engine)
tables = inspector.get_table_names()
if table_name:
# 导出指定表
if table_name in tables:
df = pd.read_sql_table(table_name, engine)
data = df.to_dict('records')
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
else:
raise Exception(f"'{table_name}' 不存在")
else:
# 导出所有表到不同的JSON文件
for table in tables:
json_file = output_path.replace('.json', f'_{table}.json')
df = pd.read_sql_table(table, engine)
data = df.to_dict('records')
with open(json_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
raise Exception(f"SQL Server MDF导出JSON失败: {str(e)}")