- 添加MDF文件导出功能 - 集成阿里云OCR大模型识别 - 添加百度智能云AI照片评分 - 集成DeepSeek大模型创意文案生成 - 完善文档和配置管理 - 使用uv进行现代化依赖管理 - 添加完整的.gitignore配置
300 lines
12 KiB
Python
300 lines
12 KiB
Python
import pandas as pd
|
||
from sqlalchemy import create_engine, inspect
|
||
import sqlite3
|
||
import os
|
||
import pyodbc
|
||
from pathlib import Path
|
||
|
||
def export_sqlite_to_excel(db_path, output_path, table_name=None):
|
||
"""SQLite数据库导出为Excel"""
|
||
try:
|
||
# 连接SQLite数据库
|
||
conn = sqlite3.connect(db_path)
|
||
|
||
# 获取所有表名
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||
tables = [table[0] for table in cursor.fetchall()]
|
||
|
||
if table_name:
|
||
# 导出指定表
|
||
if table_name in tables:
|
||
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
|
||
df.to_excel(output_path, index=False)
|
||
else:
|
||
raise Exception(f"表 '{table_name}' 不存在")
|
||
else:
|
||
# 导出所有表到同一个Excel文件的不同sheet
|
||
with pd.ExcelWriter(output_path) as writer:
|
||
for table in tables:
|
||
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
|
||
df.to_excel(writer, sheet_name=table, index=False)
|
||
|
||
conn.close()
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"SQLite导出Excel失败: {str(e)}")
|
||
|
||
def export_mysql_to_excel(host, user, password, database, output_path, table_name=None):
|
||
"""MySQL数据库导出为Excel"""
|
||
try:
|
||
# 创建MySQL连接
|
||
engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}')
|
||
|
||
# 获取所有表名
|
||
inspector = inspect(engine)
|
||
tables = inspector.get_table_names()
|
||
|
||
if table_name:
|
||
# 导出指定表
|
||
if table_name in tables:
|
||
df = pd.read_sql_table(table_name, engine)
|
||
df.to_excel(output_path, index=False)
|
||
else:
|
||
raise Exception(f"表 '{table_name}' 不存在")
|
||
else:
|
||
# 导出所有表到同一个Excel文件的不同sheet
|
||
with pd.ExcelWriter(output_path) as writer:
|
||
for table in tables:
|
||
df = pd.read_sql_table(table, engine)
|
||
df.to_excel(writer, sheet_name=table, index=False)
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"MySQL导出Excel失败: {str(e)}")
|
||
|
||
def database_to_csv(db_path, output_path, table_name=None):
|
||
"""数据库导出为CSV"""
|
||
try:
|
||
if db_path.endswith('.db') or db_path.endswith('.sqlite'):
|
||
# SQLite数据库
|
||
conn = sqlite3.connect(db_path)
|
||
|
||
if table_name:
|
||
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
|
||
df.to_csv(output_path, index=False, encoding='utf-8-sig')
|
||
else:
|
||
# 导出所有表到不同的CSV文件
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||
tables = [table[0] for table in cursor.fetchall()]
|
||
|
||
for table in tables:
|
||
csv_file = output_path.replace('.csv', f'_{table}.csv')
|
||
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
|
||
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
|
||
|
||
conn.close()
|
||
elif db_path.endswith('.mdf'):
|
||
# SQL Server数据库文件
|
||
export_mssql_mdf_to_csv(db_path, output_path, table_name)
|
||
else:
|
||
raise Exception("不支持的数据库格式")
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"数据库导出CSV失败: {str(e)}")
|
||
|
||
def database_to_json(db_path, output_path, table_name=None):
|
||
"""数据库导出为JSON"""
|
||
try:
|
||
import json
|
||
|
||
if db_path.endswith('.db') or db_path.endswith('.sqlite'):
|
||
# SQLite数据库
|
||
conn = sqlite3.connect(db_path)
|
||
|
||
if table_name:
|
||
df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn)
|
||
data = df.to_dict('records')
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
else:
|
||
# 导出所有表到不同的JSON文件
|
||
cursor = conn.cursor()
|
||
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
|
||
tables = [table[0] for table in cursor.fetchall()]
|
||
|
||
for table in tables:
|
||
json_file = output_path.replace('.json', f'_{table}.json')
|
||
df = pd.read_sql_query(f"SELECT * FROM {table}", conn)
|
||
data = df.to_dict('records')
|
||
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
conn.close()
|
||
elif db_path.endswith('.mdf'):
|
||
# SQL Server数据库文件
|
||
export_mssql_mdf_to_json(db_path, output_path, table_name)
|
||
else:
|
||
raise Exception("不支持的数据库格式")
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"数据库导出JSON失败: {str(e)}")
|
||
|
||
def export_mssql_mdf_to_excel(mdf_path, output_path, table_name=None, server='localhost',
|
||
username='sa', password='', instance='MSSQLSERVER'):
|
||
"""SQL Server MDF文件导出为Excel"""
|
||
try:
|
||
# 连接到SQL Server实例并附加MDF文件
|
||
database_name = Path(mdf_path).stem
|
||
|
||
# 创建连接字符串
|
||
if instance == 'MSSQLSERVER':
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE=master;UID={username};PWD={password}"
|
||
else:
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE=master;UID={username};PWD={password}"
|
||
|
||
# 连接到master数据库
|
||
conn = pyodbc.connect(connection_string)
|
||
cursor = conn.cursor()
|
||
|
||
# 检查数据库是否已存在
|
||
cursor.execute(f"SELECT name FROM sys.databases WHERE name = '{database_name}'")
|
||
if cursor.fetchone():
|
||
# 数据库已存在,直接使用
|
||
pass
|
||
else:
|
||
# 附加MDF文件到SQL Server
|
||
mdf_full_path = os.path.abspath(mdf_path)
|
||
ldf_path = mdf_path.replace('.mdf', '_log.ldf')
|
||
|
||
if not os.path.exists(ldf_path):
|
||
ldf_path = mdf_path.replace('.mdf', '.ldf')
|
||
|
||
attach_sql = f"""
|
||
CREATE DATABASE [{database_name}]
|
||
ON (FILENAME = '{mdf_full_path}')
|
||
"""
|
||
|
||
if os.path.exists(ldf_path):
|
||
attach_sql += f", (FILENAME = '{os.path.abspath(ldf_path)}')"
|
||
|
||
attach_sql += " FOR ATTACH"
|
||
|
||
try:
|
||
cursor.execute(attach_sql)
|
||
conn.commit()
|
||
except Exception as attach_error:
|
||
# 如果附加失败,尝试直接连接(假设数据库已在运行)
|
||
print(f"附加数据库失败,尝试直接连接: {attach_error}")
|
||
|
||
# 关闭连接并重新连接到目标数据库
|
||
conn.close()
|
||
|
||
if instance == 'MSSQLSERVER':
|
||
db_connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
|
||
else:
|
||
db_connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
|
||
|
||
# 使用SQLAlchemy连接
|
||
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={db_connection_string.replace(';', '&')}")
|
||
|
||
# 获取所有表名
|
||
inspector = inspect(engine)
|
||
tables = inspector.get_table_names()
|
||
|
||
if table_name:
|
||
# 导出指定表
|
||
if table_name in tables:
|
||
df = pd.read_sql_table(table_name, engine)
|
||
df.to_excel(output_path, index=False)
|
||
else:
|
||
raise Exception(f"表 '{table_name}' 不存在")
|
||
else:
|
||
# 导出所有表到同一个Excel文件的不同sheet
|
||
with pd.ExcelWriter(output_path) as writer:
|
||
for table in tables:
|
||
df = pd.read_sql_table(table, engine)
|
||
# 处理表名长度限制(Excel sheet名最多31字符)
|
||
sheet_name = table[:31] if len(table) > 31 else table
|
||
df.to_excel(writer, sheet_name=sheet_name, index=False)
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"SQL Server MDF导出Excel失败: {str(e)}")
|
||
|
||
def export_mssql_mdf_to_csv(mdf_path, output_path, table_name=None, server='localhost',
|
||
username='sa', password='', instance='MSSQLSERVER'):
|
||
"""SQL Server MDF文件导出为CSV"""
|
||
try:
|
||
database_name = Path(mdf_path).stem
|
||
|
||
# 创建连接字符串
|
||
if instance == 'MSSQLSERVER':
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
|
||
else:
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
|
||
|
||
# 使用SQLAlchemy连接
|
||
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}")
|
||
|
||
# 获取所有表名
|
||
inspector = inspect(engine)
|
||
tables = inspector.get_table_names()
|
||
|
||
if table_name:
|
||
# 导出指定表
|
||
if table_name in tables:
|
||
df = pd.read_sql_table(table_name, engine)
|
||
df.to_csv(output_path, index=False, encoding='utf-8-sig')
|
||
else:
|
||
raise Exception(f"表 '{table_name}' 不存在")
|
||
else:
|
||
# 导出所有表到不同的CSV文件
|
||
for table in tables:
|
||
csv_file = output_path.replace('.csv', f'_{table}.csv')
|
||
df = pd.read_sql_table(table, engine)
|
||
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"SQL Server MDF导出CSV失败: {str(e)}")
|
||
|
||
def export_mssql_mdf_to_json(mdf_path, output_path, table_name=None, server='localhost',
|
||
username='sa', password='', instance='MSSQLSERVER'):
|
||
"""SQL Server MDF文件导出为JSON"""
|
||
try:
|
||
import json
|
||
|
||
database_name = Path(mdf_path).stem
|
||
|
||
# 创建连接字符串
|
||
if instance == 'MSSQLSERVER':
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}"
|
||
else:
|
||
connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}"
|
||
|
||
# 使用SQLAlchemy连接
|
||
engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}")
|
||
|
||
# 获取所有表名
|
||
inspector = inspect(engine)
|
||
tables = inspector.get_table_names()
|
||
|
||
if table_name:
|
||
# 导出指定表
|
||
if table_name in tables:
|
||
df = pd.read_sql_table(table_name, engine)
|
||
data = df.to_dict('records')
|
||
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
else:
|
||
raise Exception(f"表 '{table_name}' 不存在")
|
||
else:
|
||
# 导出所有表到不同的JSON文件
|
||
for table in tables:
|
||
json_file = output_path.replace('.json', f'_{table}.json')
|
||
df = pd.read_sql_table(table, engine)
|
||
data = df.to_dict('records')
|
||
|
||
with open(json_file, 'w', encoding='utf-8') as f:
|
||
json.dump(data, f, ensure_ascii=False, indent=2)
|
||
|
||
return True
|
||
except Exception as e:
|
||
raise Exception(f"SQL Server MDF导出JSON失败: {str(e)}") |