import_deduction_ratio.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. import pymysql
  2. import pandas as pd
  3. import time
  4. from pymysql.err import OperationalError, ProgrammingError, DataError
  5. # -------------------------- 你的配置信息(已保留,无需修改) --------------------------
  6. DB_CONFIG = {
  7. "host": "127.0.0.1", # 数据库地址,本地一般是127.0.0.1
  8. "port": 3306, # 数据库端口,默认3306
  9. "user": "root", # 数据库用户名
  10. "password": "root", # 数据库密码
  11. "database": "recycle", # 替换为price_option_factor表所在的数据库名
  12. "charset": "utf8mb4"
  13. }
  14. CSV_FILE_PATH = "deduction_ratio.csv" # CSV文件路径,若不在同目录则写绝对路径
  15. # ---------------------------------------------------------------------------------------
  16. def insert_csv_to_mysql():
  17. # 1. 读取CSV文件,仅保留需要的option_id和factor字段
  18. try:
  19. df = pd.read_csv(CSV_FILE_PATH, usecols=["option_id", "factor"])
  20. # 转换数据类型(匹配数据库字段,option_id为bigint,factor为decimal)
  21. df["option_id"] = df["option_id"].astype(int)
  22. df["factor"] = df["factor"].astype(float)
  23. if df.empty:
  24. print("CSV文件中无有效数据,无需插入")
  25. return
  26. except FileNotFoundError:
  27. print(f"错误:未找到CSV文件,路径为{CSV_FILE_PATH}")
  28. return
  29. except Exception as e:
  30. print(f"读取CSV文件失败:{str(e)}")
  31. return
  32. # 生成当前时间(MySQL datetime格式:YYYY-MM-DD HH:MM:SS),同批次数据时间一致
  33. current_time = time.strftime("%Y-%m-%d %H:%M:%S")
  34. # ✅ 修复:普通元组通过索引取值(0=option_id,1=factor),这是报错的核心解决点
  35. data_list = [
  36. (row[0], row[1], current_time, current_time)
  37. for row in df.itertuples(index=False, name=None)
  38. ]
  39. # 2. 连接数据库并批量插入数据
  40. conn = None
  41. cursor = None
  42. try:
  43. # 建立数据库连接
  44. conn = pymysql.connect(**DB_CONFIG)
  45. cursor = conn.cursor()
  46. # 插入SQL语句:新增create_time、update_time字段手动写入
  47. insert_sql = """
  48. INSERT INTO price_option_factor (option_id, factor, create_time, update_time)
  49. VALUES (%s, %s, %s, %s)
  50. """
  51. # 批量执行插入(效率高于单条插入)
  52. cursor.executemany(insert_sql, data_list)
  53. # 提交事务
  54. conn.commit()
  55. print(f"成功插入{cursor.rowcount}条数据到price_option_factor表,时间统一为:{current_time}")
  56. except OperationalError:
  57. print("数据库连接失败:请检查host/port/user/password/database是否正确")
  58. except ProgrammingError as e:
  59. print(f"SQL执行错误:表不存在或字段名错误,详情:{str(e)}")
  60. except DataError:
  61. print("数据类型错误:CSV中的数据超出数据库字段的取值范围(如factor超出decimal(5,4))")
  62. except Exception as e:
  63. # 出错时回滚事务
  64. if conn:
  65. conn.rollback()
  66. print(f"插入数据失败:{str(e)}")
  67. finally:
  68. # 关闭游标和连接(无论是否出错都执行)
  69. if cursor:
  70. cursor.close()
  71. if conn:
  72. conn.close()
  73. if __name__ == "__main__":
  74. insert_csv_to_mysql()