import pandas as pd from sqlalchemy import create_engine import time from datetime import datetime # 数据库连接配置 DB_CONFIG = "mysql+pymysql://root:root@localhost:3306/recycle" engine = create_engine(DB_CONFIG) def process_and_import(file_path): # 1. 读取 CSV # df = pd.read_csv(file_path) df = pd.read_csv(file_path, encoding='gbk') # 2. 条件转换:type_id 等于 101 时转成 2 # 注意:确保 type_id 在读取时是数值类型,如果是字符串请用 '101' if 'type_id' in df.columns: df.loc[df['type_id'] == 101, 'type_id'] = 2 # 3. 数据过滤:machine_id 不为空 # dropna 会删除 machine_id 列中值为 NaN 的行 df_filtered = df.dropna(subset=['machine_id']).copy() current_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S') df_filtered['create_time'] = current_now df_filtered['update_time'] = current_now # 4. 筛选目标字段 # 虽然我们要过滤 machine_id 和 type_id,但最后写入表的字段按你要求的来 required_columns = [ 'id', 'machine_id', 'temp_type', 'estimate_packet', 'create_time', 'update_time' ] # 检查字段是否存在,避免报错 final_df = df_filtered[required_columns] # 5. 数据清洗:处理空值并写入 final_df = final_df.where(pd.notnull(final_df), None) try: final_df.to_sql( name='machine_temp', con=engine, if_exists='append', index=False ) print(f"导入成功!处理后的有效数据行数: {len(final_df)}") except Exception as e: print(f"导入失败: {e}") if __name__ == "__main__": process_and_import('t_chx_machine_temp.csv')