import csv import pymysql from datetime import datetime # ====== 1. 数据库配置 ====== DB_CONFIG = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "root", "database": "recycle", "charset": "utf8mb4" } CSV_FILE = "t_chxs_machine.csv" # 请确保文件名正确 def import_machines(): conn = pymysql.connect(**DB_CONFIG) try: with conn.cursor() as cur: # ====== 2. 读取 CSV ====== with open(CSV_FILE, mode="r", encoding="gb18030") as f: reader = csv.DictReader(f) data_to_insert = [] for row in reader: # 处理可能为空的 create_time,如果 CSV 里是空的,就传 None 让数据库用 DEFAULT c_time = row.get("create_time").strip() create_time = c_time if c_time else None # 组装数据元组 data_to_insert.append(( row["id"], row["code"], row["type_id"], row["type_name"], row["brand_id"], row["brand_name"], row["machine_id"], row["name"], row["shrink_name"], create_time )) # ====== 3. 批量插入 SQL ====== sql = """ INSERT INTO t_machine (id, code, type_id, type_name, brand_id, brand_name, machine_id, name, shrink_name, create_time) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ # 使用 executemany 比一条条插入快得多 cur.executemany(sql, data_to_insert) conn.commit() print(f"✅ 成功导入 {len(data_to_insert)} 条数据到 t_machine 表") except Exception as e: conn.rollback() print(f"❌ 导入失败: {e}") finally: conn.close() if __name__ == "__main__": import_machines()