import pandas as pd from sqlalchemy import create_engine, inspect import sqlite3 import os import pyodbc from pathlib import Path def export_sqlite_to_excel(db_path, output_path, table_name=None): """SQLite数据库导出为Excel""" try: # 连接SQLite数据库 conn = sqlite3.connect(db_path) # 获取所有表名 cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [table[0] for table in cursor.fetchall()] if table_name: # 导出指定表 if table_name in tables: df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) df.to_excel(output_path, index=False) else: raise Exception(f"表 '{table_name}' 不存在") else: # 导出所有表到同一个Excel文件的不同sheet with pd.ExcelWriter(output_path) as writer: for table in tables: df = pd.read_sql_query(f"SELECT * FROM {table}", conn) df.to_excel(writer, sheet_name=table, index=False) conn.close() return True except Exception as e: raise Exception(f"SQLite导出Excel失败: {str(e)}") def export_mysql_to_excel(host, user, password, database, output_path, table_name=None): """MySQL数据库导出为Excel""" try: # 创建MySQL连接 engine = create_engine(f'mysql+pymysql://{user}:{password}@{host}/{database}') # 获取所有表名 inspector = inspect(engine) tables = inspector.get_table_names() if table_name: # 导出指定表 if table_name in tables: df = pd.read_sql_table(table_name, engine) df.to_excel(output_path, index=False) else: raise Exception(f"表 '{table_name}' 不存在") else: # 导出所有表到同一个Excel文件的不同sheet with pd.ExcelWriter(output_path) as writer: for table in tables: df = pd.read_sql_table(table, engine) df.to_excel(writer, sheet_name=table, index=False) return True except Exception as e: raise Exception(f"MySQL导出Excel失败: {str(e)}") def database_to_csv(db_path, output_path, table_name=None): """数据库导出为CSV""" try: if db_path.endswith('.db') or db_path.endswith('.sqlite'): # SQLite数据库 conn = sqlite3.connect(db_path) if table_name: df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) df.to_csv(output_path, index=False, encoding='utf-8-sig') else: # 导出所有表到不同的CSV文件 cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [table[0] for table in cursor.fetchall()] for table in tables: csv_file = output_path.replace('.csv', f'_{table}.csv') df = pd.read_sql_query(f"SELECT * FROM {table}", conn) df.to_csv(csv_file, index=False, encoding='utf-8-sig') conn.close() elif db_path.endswith('.mdf'): # SQL Server数据库文件 export_mssql_mdf_to_csv(db_path, output_path, table_name) else: raise Exception("不支持的数据库格式") return True except Exception as e: raise Exception(f"数据库导出CSV失败: {str(e)}") def database_to_json(db_path, output_path, table_name=None): """数据库导出为JSON""" try: import json if db_path.endswith('.db') or db_path.endswith('.sqlite'): # SQLite数据库 conn = sqlite3.connect(db_path) if table_name: df = pd.read_sql_query(f"SELECT * FROM {table_name}", conn) data = df.to_dict('records') with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) else: # 导出所有表到不同的JSON文件 cursor = conn.cursor() cursor.execute("SELECT name FROM sqlite_master WHERE type='table';") tables = [table[0] for table in cursor.fetchall()] for table in tables: json_file = output_path.replace('.json', f'_{table}.json') df = pd.read_sql_query(f"SELECT * FROM {table}", conn) data = df.to_dict('records') with open(json_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) conn.close() elif db_path.endswith('.mdf'): # SQL Server数据库文件 export_mssql_mdf_to_json(db_path, output_path, table_name) else: raise Exception("不支持的数据库格式") return True except Exception as e: raise Exception(f"数据库导出JSON失败: {str(e)}") def export_mssql_mdf_to_excel(mdf_path, output_path, table_name=None, server='localhost', username='sa', password='', instance='MSSQLSERVER'): """SQL Server MDF文件导出为Excel""" try: # 连接到SQL Server实例并附加MDF文件 database_name = Path(mdf_path).stem # 创建连接字符串 if instance == 'MSSQLSERVER': connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE=master;UID={username};PWD={password}" else: connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE=master;UID={username};PWD={password}" # 连接到master数据库 conn = pyodbc.connect(connection_string) cursor = conn.cursor() # 检查数据库是否已存在 cursor.execute(f"SELECT name FROM sys.databases WHERE name = '{database_name}'") if cursor.fetchone(): # 数据库已存在,直接使用 pass else: # 附加MDF文件到SQL Server mdf_full_path = os.path.abspath(mdf_path) ldf_path = mdf_path.replace('.mdf', '_log.ldf') if not os.path.exists(ldf_path): ldf_path = mdf_path.replace('.mdf', '.ldf') attach_sql = f""" CREATE DATABASE [{database_name}] ON (FILENAME = '{mdf_full_path}') """ if os.path.exists(ldf_path): attach_sql += f", (FILENAME = '{os.path.abspath(ldf_path)}')" attach_sql += " FOR ATTACH" try: cursor.execute(attach_sql) conn.commit() except Exception as attach_error: # 如果附加失败,尝试直接连接(假设数据库已在运行) print(f"附加数据库失败,尝试直接连接: {attach_error}") # 关闭连接并重新连接到目标数据库 conn.close() if instance == 'MSSQLSERVER': db_connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}" else: db_connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}" # 使用SQLAlchemy连接 engine = create_engine(f"mssql+pyodbc:///?odbc_connect={db_connection_string.replace(';', '&')}") # 获取所有表名 inspector = inspect(engine) tables = inspector.get_table_names() if table_name: # 导出指定表 if table_name in tables: df = pd.read_sql_table(table_name, engine) df.to_excel(output_path, index=False) else: raise Exception(f"表 '{table_name}' 不存在") else: # 导出所有表到同一个Excel文件的不同sheet with pd.ExcelWriter(output_path) as writer: for table in tables: df = pd.read_sql_table(table, engine) # 处理表名长度限制(Excel sheet名最多31字符) sheet_name = table[:31] if len(table) > 31 else table df.to_excel(writer, sheet_name=sheet_name, index=False) return True except Exception as e: raise Exception(f"SQL Server MDF导出Excel失败: {str(e)}") def export_mssql_mdf_to_csv(mdf_path, output_path, table_name=None, server='localhost', username='sa', password='', instance='MSSQLSERVER'): """SQL Server MDF文件导出为CSV""" try: database_name = Path(mdf_path).stem # 创建连接字符串 if instance == 'MSSQLSERVER': connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}" else: connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}" # 使用SQLAlchemy连接 engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}") # 获取所有表名 inspector = inspect(engine) tables = inspector.get_table_names() if table_name: # 导出指定表 if table_name in tables: df = pd.read_sql_table(table_name, engine) df.to_csv(output_path, index=False, encoding='utf-8-sig') else: raise Exception(f"表 '{table_name}' 不存在") else: # 导出所有表到不同的CSV文件 for table in tables: csv_file = output_path.replace('.csv', f'_{table}.csv') df = pd.read_sql_table(table, engine) df.to_csv(csv_file, index=False, encoding='utf-8-sig') return True except Exception as e: raise Exception(f"SQL Server MDF导出CSV失败: {str(e)}") def export_mssql_mdf_to_json(mdf_path, output_path, table_name=None, server='localhost', username='sa', password='', instance='MSSQLSERVER'): """SQL Server MDF文件导出为JSON""" try: import json database_name = Path(mdf_path).stem # 创建连接字符串 if instance == 'MSSQLSERVER': connection_string = f"DRIVER={{SQL Server}};SERVER={server};DATABASE={database_name};UID={username};PWD={password}" else: connection_string = f"DRIVER={{SQL Server}};SERVER={server}\\{instance};DATABASE={database_name};UID={username};PWD={password}" # 使用SQLAlchemy连接 engine = create_engine(f"mssql+pyodbc:///?odbc_connect={connection_string.replace(';', '&')}") # 获取所有表名 inspector = inspect(engine) tables = inspector.get_table_names() if table_name: # 导出指定表 if table_name in tables: df = pd.read_sql_table(table_name, engine) data = df.to_dict('records') with open(output_path, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) else: raise Exception(f"表 '{table_name}' 不存在") else: # 导出所有表到不同的JSON文件 for table in tables: json_file = output_path.replace('.json', f'_{table}.json') df = pd.read_sql_table(table, engine) data = df.to_dict('records') with open(json_file, 'w', encoding='utf-8') as f: json.dump(data, f, ensure_ascii=False, indent=2) return True except Exception as e: raise Exception(f"SQL Server MDF导出JSON失败: {str(e)}")