| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950 |
- import pandas as pd
- from sqlalchemy import create_engine
- import time
- from datetime import datetime
- # 数据库连接配置
- DB_CONFIG = "mysql+pymysql://root:root@localhost:3306/recycle"
- engine = create_engine(DB_CONFIG)
- def process_and_import(file_path):
- # 1. 读取 CSV
- # df = pd.read_csv(file_path)
- df = pd.read_csv(file_path, encoding='gbk')
- # 2. 条件转换:type_id 等于 101 时转成 2
- # 注意:确保 type_id 在读取时是数值类型,如果是字符串请用 '101'
- if 'type_id' in df.columns:
- df.loc[df['type_id'] == 101, 'type_id'] = 2
- # 3. 数据过滤:machine_id 不为空
- # dropna 会删除 machine_id 列中值为 NaN 的行
- df_filtered = df.dropna(subset=['machine_id']).copy()
- current_now = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- df_filtered['create_time'] = current_now
- df_filtered['update_time'] = current_now
- # 4. 筛选目标字段
- # 虽然我们要过滤 machine_id 和 type_id,但最后写入表的字段按你要求的来
- required_columns = [
- 'id', 'machine_id', 'temp_type',
- 'estimate_packet', 'create_time', 'update_time'
- ]
-
- # 检查字段是否存在,避免报错
- final_df = df_filtered[required_columns]
- # 5. 数据清洗:处理空值并写入
- final_df = final_df.where(pd.notnull(final_df), None)
- try:
- final_df.to_sql(
- name='machine_temp',
- con=engine,
- if_exists='append',
- index=False
- )
- print(f"导入成功!处理后的有效数据行数: {len(final_df)}")
- except Exception as e:
- print(f"导入失败: {e}")
- if __name__ == "__main__":
- process_and_import('t_chx_machine_temp.csv')
|