from fastapi import APIRouter, Request, Depends, Form from fastapi.responses import HTMLResponse from sqlalchemy.orm import Session from sqlalchemy import text from database import SessionLocal, redis_client from services.template_service import build_template from services.price_service import apply_adjust from fastapi.templating import Jinja2Templates import json import uuid router = APIRouter(prefix="/estimate", tags=["estimate"]) templates = Jinja2Templates(directory="templates") # ================= DB ================= def get_db(): db = SessionLocal() try: yield db finally: db.close() # ================= 获取所有机型 ================= @router.get("/simulate", response_class=HTMLResponse) def simulate(db: Session = Depends(get_db)): # 获取所有有模板的机型 machines = db.execute(text(""" SELECT machine_id, name FROM t_machine WHERE machine_id IN (SELECT DISTINCT machine_id FROM machine_temp) """)).fetchall() html = "

估价模拟

" html += '
' # 展示机型选择 html += '
' html += '
' html += '
' return html # ================= 获取机型模板和选项 ================= @router.get("/simulate_one_del", response_class=HTMLResponse) def simulate( machine_id: int, db: Session = Depends(get_db) ): # 获取机型对应的模板 row = db.execute(text(""" SELECT estimate_packet FROM machine_temp WHERE machine_id=:mid """), {"mid": machine_id}).fetchone() tpl = json.loads(row.estimate_packet) html = f"

估价模拟 - {tpl['templateId']}

" html += '
' for step in tpl["template"]: html += f"

{step['stepName']}

" for p in step["properties"]: html += f"{p['name']}
" for v in p["values"]: html += f"""
""" html += f"""
""" return html @router.get("/simulate_one", response_class=HTMLResponse) def simulate_one_del2( request: Request, machine_id: int, db: Session = Depends(get_db) ): row = db.execute(text(""" SELECT m.name AS machine_name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() if not row: return HTMLResponse("未找到机型模板", status_code=404) # print("*************row****************") # print(row) tpl = json.loads(row.estimate_packet) return templates.TemplateResponse( "simulate_one.html", { "request": request, "machine_id": machine_id, "machine_name": row.machine_name, "tpl": tpl } ) # ================= 提交估价并计算价格 ================= @router.post("/simulate_del", response_class=HTMLResponse) def simulate_calc( machine_id: int = Form(...), option_ids: list[str] = Form([]), db: Session = Depends(get_db) ): # 获取基准价格 base_price_row = db.execute(text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid """), {"mid": machine_id}).fetchone() # base_price = base_price_row.base_price base_price = base_price_row.base_price if base_price_row else 1000.0 # 获取选项因素 factors = db.execute(text(""" SELECT option_id, factor, absolute_deduct FROM price_option_factor WHERE option_id IN :ids """), {"ids": tuple(option_ids)}).fetchall() # 计算价格 price = base_price for f in factors: if f.factor: price *= (1-float(f.factor)) if f.absolute_deduct: price -= float(f.absolute_deduct) # 逐步应用调节因子 # 1. 获取全局调节因子 global_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='global' """)).fetchone() if global_adjust: price *= float(global_adjust.factor) # 2. 获取品牌调节因子 brand_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='brand' AND ref_id=( SELECT brand_id FROM t_machine WHERE machine_id=:mid ) """), {"mid": machine_id}).fetchone() if brand_adjust: price *= float(brand_adjust.factor) # 3. 获取机型调节因子 machine_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='machine' AND ref_id=:mid """), {"mid": machine_id}).fetchone() if machine_adjust: price *= float(machine_adjust.factor) html = f"""

估价结果

选择项:{",".join(option_ids)}
估价:{round(price, 2)} 元
返回 """ return html @router.post("/simulate_de3", response_class=HTMLResponse) def simulate_calc( request: Request, machine_id: int = Form(...), option_ids: list[str] = Form([]), db: Session = Depends(get_db) ): # 机型名称 machine = db.execute(text(""" SELECT name, brand_name FROM t_machine WHERE machine_id=:mid """), {"mid": machine_id}).fetchone() # 模板 row = db.execute(text(""" SELECT estimate_packet FROM machine_temp WHERE machine_id=:mid """), {"mid": machine_id}).fetchone() tpl = json.loads(row.estimate_packet) # ========== 基准价 ========== base_price_row = db.execute(text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() base_price = float(base_price_row.base_price) if base_price_row else 1000.0 price = base_price detail_rows = [] # ========== 选项扣减 ========== if option_ids: factors = db.execute(text(""" SELECT pf.option_id, pf.factor, pf.absolute_deduct, ro.option_name FROM price_option_factor pf LEFT JOIN release_option ro ON pf.option_id = ro.option_id WHERE pf.option_id IN :ids """), {"ids": tuple(option_ids)}).fetchall() else: factors = [] for f in factors: before = price if f.factor is not None: price *= float(f.factor) if f.absolute_deduct is not None: price -= float(f.absolute_deduct) detail_rows.append({ "type": "option", "name": f.option_name or str(f.option_id), "option_id": f.option_id, "factor": f.factor, "absolute_deduct": f.absolute_deduct, "before": round(before, 2), "after": round(price, 2) }) # ========== 全局调节 ========== global_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='global' LIMIT 1 """)).fetchone() if global_adjust: before = price price *= float(global_adjust.factor) detail_rows.append({ "type": "adjust", "name": "全局调节", "factor": global_adjust.factor, "before": round(before, 2), "after": round(price, 2) }) # ========== 品牌调节 ========== brand_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='brand' AND ref_id=( SELECT brand_id FROM t_machine WHERE machine_id=:mid ) LIMIT 1 """), {"mid": machine_id}).fetchone() if brand_adjust: before = price price *= float(brand_adjust.factor) detail_rows.append({ "type": "adjust", "name": "品牌调节", "factor": brand_adjust.factor, "before": round(before, 2), "after": round(price, 2) }) # ========== 机型调节 ========== machine_adjust = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='machine' AND ref_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() if machine_adjust: before = price price *= float(machine_adjust.factor) detail_rows.append({ "type": "adjust", "name": "机型调节", "factor": machine_adjust.factor, "before": round(before, 2), "after": round(price, 2) }) final_price = round(price, 2) return templates.TemplateResponse( "simulate_one.html", { "request": request, "tpl": tpl, "machine": machine, "machine_id": machine_id, "selected_option_ids": option_ids, "base_price": round(base_price, 2), "detail_rows": detail_rows, "final_price": final_price } ) @router.get("/simulate_one", response_class=HTMLResponse) def simulate_one( request: Request, machine_id: int, db: Session = Depends(get_db) ): row = db.execute(text(""" SELECT m.name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() tpl = json.loads(row.estimate_packet) return templates.TemplateResponse( "simulate_one.html", { "request": request, "machine_id": machine_id, "machine_name": row.name, "tpl": tpl, "result": None } ) # ================= 估价 ================= @router.post("/simulate666", response_class=HTMLResponse) async def simulate_calc( request: Request, machine_id: int = Form(...), db: Session = Depends(get_db) ): form = await request.form() # ---------------- 收集 option ---------------- option_ids = [] for k, v in form.multi_items(): if k.startswith("option_"): option_ids.append(str(v)) # ---------------- 重新取模板与机型名 ---------------- row = db.execute(text(""" SELECT m.name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() if not row: return HTMLResponse("机型模板不存在") tpl = json.loads(row.estimate_packet) # ---------------- 构造 valueId -> valueText 映射 ---------------- value_name_map = {} for step in tpl["template"]: for p in step["properties"]: for v in p["values"]: value_name_map[str(v["valueId"])] = v["valueText"] # ---------------- 基准价 ---------------- base_row = db.execute(text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() base_price = float(base_row.base_price) if base_row else 1000.0 # ---------------- 选项因子 ---------------- detail_rows = [] if option_ids: rows = db.execute(text(""" SELECT option_id, factor, absolute_deduct FROM price_option_factor WHERE option_id IN :ids """), {"ids": tuple(option_ids)}).fetchall() else: rows = [] price = base_price for r in rows: before = price if r.factor is not None: price = price * float(r.factor) if r.absolute_deduct is not None: price = price - float(r.absolute_deduct) detail_rows.append({ "option_id": r.option_id, "option_name": value_name_map.get(str(r.option_id), str(r.option_id)), "factor": r.factor, "absolute": r.absolute_deduct, "before": round(before, 2), "after": round(price, 2) }) # ---------------- 调节因子 ---------------- # 全局 g = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='global' LIMIT 1 """)).fetchone() if g: before = price price = price * float(g.factor) detail_rows.append({ "option_id": "GLOBAL", "option_name": "全局调节", "factor": g.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) # 品牌 b = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='brand' AND ref_id=( SELECT brand_id FROM t_machine WHERE machine_id=:mid ) LIMIT 1 """), {"mid": machine_id}).fetchone() if b: before = price price = price * float(b.factor) detail_rows.append({ "option_id": "BRAND", "option_name": "品牌调节", "factor": b.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) # 机型 m = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='machine' AND ref_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() if m: before = price price = price * float(m.factor) detail_rows.append({ "option_id": "MACHINE", "option_name": "机型调节", "factor": m.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) return templates.TemplateResponse( "simulate_one.html", { "request": request, "machine_id": machine_id, "machine_name": row.name, "tpl": tpl, "result": { "base_price": round(base_price, 2), "final_price": round(price, 2), "details": detail_rows, "selected": option_ids } } ) @router.post("/simulate7", response_class=HTMLResponse) async def simulate_calc( request: Request, machine_id: int = Form(...), db: Session = Depends(get_db) ): form = await request.form() # 收集所有 option option_ids = [] for k, v in form.multi_items(): if k.startswith("option_"): option_ids.append(str(v)) # ---------------- 重新取模板与机型名 ---------------- row = db.execute(text(""" SELECT m.name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() if not row: return HTMLResponse("机型模板不存在") tpl = json.loads(row.estimate_packet) # ---------------- 构造 valueId -> valueText 映射 ---------------- value_name_map = {} for step in tpl["template"]: for p in step["properties"]: for v in p["values"]: value_name_map[str(v["valueId"])] = v["valueText"] # ---------------- 基准价 ---------------- base_row = db.execute(text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() base_price_input = form.get("base_price") if base_price_input and str(base_price_input).strip(): base_price = float(base_price_input) else: base_price = float(base_row.base_price) if base_row else 1 # ---------------- 选项因子 ---------------- detail_rows = [] if option_ids: rows = db.execute(text(""" SELECT option_id, factor, absolute_deduct FROM price_option_factor WHERE option_id IN :ids """), {"ids": tuple(option_ids)}).fetchall() else: rows = [] price = base_price total_deduct_rate = 0 detail_rows.append({ "option_id": "GLOBAL", "option_name": "-- 以下是扣减比例,累加", "factor": 0, "absolute": None, "before": None, "after": None }) for r in rows: before = price if r.factor: total_deduct_rate += float(r.factor) # price = price * (1-float(r.factor)) # if r.absolute_deduct: # price = price - float(r.absolute_deduct) detail_rows.append({ "option_id": r.option_id, "option_name": value_name_map.get(str(r.option_id), str(r.option_id)), "factor": r.factor, "absolute": r.absolute_deduct, "before": round(before, 2), "after": round(price, 2) }) if total_deduct_rate: price = price * (1 - total_deduct_rate) price = max(price, 0.0) # ---------------- 调节因子 ---------------- detail_rows.append({ "option_id": "GLOBAL", "option_name": "-- 以下是调节因子,累乘", "factor": 0, "absolute": None, "before": None, "after": None }) # 全局 g = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='global' LIMIT 1 """)).fetchone() if g: before = price price = price * float(g.factor) detail_rows.append({ "option_id": "GLOBAL", "option_name": "全局调节系数", "factor": g.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) # 品牌 b = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='brand' AND ref_id=( SELECT brand_id FROM t_machine WHERE machine_id=:mid ) LIMIT 1 """), {"mid": machine_id}).fetchone() if b: before = price price = price * float(b.factor) detail_rows.append({ "option_id": "BRAND", "option_name": "品牌调节系数", "factor": b.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) # 机型 m = db.execute(text(""" SELECT factor FROM price_adjust_factor WHERE level='machine' AND ref_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() if m: before = price price = price * float(m.factor) detail_rows.append({ "option_id": "MACHINE", "option_name": "机型调节系数", "factor": m.factor, "absolute": None, "before": round(before, 2), "after": round(price, 2) }) # 重新取模板与机型名 row = db.execute(text(""" SELECT m.name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() tpl = json.loads(row.estimate_packet) return templates.TemplateResponse( "simulate_one.html", { "request": request, "machine_id": machine_id, "machine_name": row.name, "tpl": tpl, "result": { "base_price": round(base_price, 2), "final_price": round(price, 2), "details": detail_rows, "selected": option_ids } } ) @router.post("/simulate", response_class=HTMLResponse) async def simulate_calc( request: Request, machine_id: int = Form(...), db: Session = Depends(get_db) ): form = await request.form() option_ids = [] for k, v in form.multi_items(): if k.startswith("option_"): option_ids.append(int(v)) # ---------------- 基准价 ---------------- base_row = db.execute(text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid LIMIT 1 """), {"mid": machine_id}).fetchone() base_price = float(base_row.base_price) if base_row else 1000.0 # ---------------- 读取选项扣减规则 ---------------- rows = [] if option_ids: rows = db.execute(text(""" SELECT p.option_id, p.factor, p.absolute_deduct, p.group_code, p.severity_level, p.sub_weight, p.is_special, p.repair_level, g.cap_ratio, g.group_weight FROM price_option_factor p LEFT JOIN price_damage_group g ON p.group_code = g.group_code WHERE p.option_id IN :ids """), {"ids": tuple(option_ids)}).fetchall() # ---------------- 覆盖关系 ---------------- overrides = db.execute(text(""" SELECT trigger_group_code, target_group_code, override_type, override_value FROM price_group_override """)).fetchall() override_map = defaultdict(list) for o in overrides: override_map[o.trigger_group_code].append(o) # ---------------- 分组 ---------------- groups = defaultdict(list) specials = [] repairs = [] for r in rows: if r.repair_level and r.repair_level > 0: repairs.append(r) continue if r.is_special: specials.append(r) continue groups[r.group_code].append(r) # ---------------- 覆盖处理 ---------------- hit_groups = set(groups.keys()) skip_groups = set() weight_override = {} for g in hit_groups: for o in override_map.get(g, []): if o.override_type == "skip": skip_groups.add(o.target_group_code) elif o.override_type == "weight": weight_override[o.target_group_code] = float(o.override_value) # ---------------- 分组扣减 ---------------- total_ratio = 0.0 detail_rows = [] for group_code, items in groups.items(): if group_code in skip_groups: continue items = sorted( items, key=lambda x: ( float(x.factor or 0), x.severity_level or 0 ), reverse=True ) main = items[0] group_deduct = float(main.factor or 0) for sub in items[1:]: group_deduct += float(sub.factor or 0) * float(sub.sub_weight or 0.3) cap_ratio = float(main.cap_ratio or 1.0) cap = float(main.factor or 0) * cap_ratio group_deduct = min(group_deduct, cap) group_weight = float(main.group_weight or 1.0) if group_code in weight_override: group_weight = weight_override[group_code] final_group_deduct = group_deduct * group_weight total_ratio += final_group_deduct detail_rows.append({ "option_name": f"--{group_code} 组扣减", "factor": round(final_group_deduct, 4) }) for it in items: detail_rows.append({ "option_name": it.option_id, "factor": it.factor }) # ---------------- special 处理 ---------------- special_ratio = 0.0 for s in specials: special_ratio += float(s.factor or 0) detail_rows.append({ "option_name": f"特殊项:{s.option_id}", "factor": s.factor }) # 特殊项封顶(示例:30%) special_ratio = min(special_ratio, 0.30) total_ratio += special_ratio # ---------------- 维修分级 ---------------- repair_ratio = 0.0 max_repair_level = 0 for r in repairs: repair_ratio += float(r.factor or 0) max_repair_level = max(max_repair_level, r.repair_level or 0) detail_rows.append({ "option_name": f"维修项:{r.option_id}", "factor": r.factor }) # 维修封顶(示例) if max_repair_level >= 3: repair_ratio = max(repair_ratio, 0.30) elif max_repair_level == 2: repair_ratio = min(repair_ratio, 0.20) else: repair_ratio = min(repair_ratio, 0.10) total_ratio += repair_ratio # ---------------- 最终价格 ---------------- total_ratio = min(total_ratio, 0.90) final_price = base_price * (1 - total_ratio) # ---------------- 返回 ---------------- row = db.execute(text(""" SELECT m.name, t.estimate_packet FROM machine_temp t JOIN t_machine m ON t.machine_id = m.machine_id WHERE t.machine_id = :mid """), {"mid": machine_id}).fetchone() import json tpl = json.loads(row.estimate_packet) return templates.TemplateResponse( "simulate_one.html", { "request": request, "machine_id": machine_id, "machine_name": row.name, "tpl": tpl, "result": { "base_price": round(base_price, 2), "final_price": round(final_price, 2), "total_ratio": round(total_ratio, 4), "details": detail_rows, "selected": [str(i) for i in option_ids] } } ) @router.post("/estimate2", response_class=HTMLResponse) def estimate_submit( request: Request, machine_id: int = Form(...), capacity: str = Form(...), option_ids: list[int] = Form([]), db: Session = Depends(get_db) ): # ---------- 基础价格 ---------- base_price = db.execute( text(""" SELECT base_price FROM machine_base_price WHERE machine_id=:mid AND capacity=:cap """), {"mid": machine_id, "cap": capacity} ).scalar() or 0 price = float(base_price) # ---------- 扣减项 ---------- for oid in option_ids: r = db.execute( text(""" SELECT factor, absolute_deduct FROM price_option_factor WHERE option_id=:oid """), {"oid": oid} ).fetchone() if r: price = price * float(r.factor) - float(r.absolute_deduct) # ---------- 调节系数 ---------- brand_id = db.execute( text(""" SELECT brand_id FROM t_machine WHERE machine_id=:mid """), {"mid": machine_id} ).scalar() price = apply_adjust(db, machine_id, brand_id, price) price = round(max(price, 0), 2) # ---------- 估价版本 ---------- version_no = str(uuid.uuid4())[:8] db.execute( text(""" INSERT INTO estimate_record (machine_id, capacity, option_ids, final_price, version_no) VALUES (:m, :c, :o, :p, :v) """), { "m": machine_id, "c": capacity, "o": ",".join(map(str, option_ids)), "p": price, "v": version_no } ) db.commit() # ---------- 模板(Redis 缓存) ---------- cache_key = f"template:{machine_id}" cached = redis_client.get(cache_key) if redis_client else None if cached: template = json.loads(cached) else: rows = db.execute(text("SELECT * FROM release_option")).fetchall() template = build_template(rows) if redis_client: redis_client.set(cache_key, json.dumps(template), ex=3600) # ---------- 重新加载页面 ---------- machines = db.execute( text(""" SELECT machine_id, name FROM t_machine ORDER BY brand_name, name """) ).fetchall() capacities = db.execute( text(""" SELECT capacity FROM machine_base_price WHERE machine_id=:mid """), {"mid": machine_id} ).fetchall() return request.app.state.templates.TemplateResponse( "estimate.html", { "request": request, "machines": machines, "capacities": capacities, "template": template, "price": price, "version": version_no, "selected_machine": machine_id, "selected_capacity": capacity } )