from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from sqlalchemy import text
from collections import defaultdict
from database import SessionLocal, redis_client
from services.template_service import build_template
from services.price_service import apply_adjust
from fastapi.templating import Jinja2Templates
import json
import uuid
router = APIRouter(prefix="/estimate", tags=["estimate"])
templates = Jinja2Templates(directory="templates")
# ================= DB =================
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ================= 获取所有机型 =================
@router.get("/simulate", response_class=HTMLResponse)
def simulate(db: Session = Depends(get_db)):
# 获取所有有模板的机型
machines = db.execute(text("""
SELECT machine_id, name
FROM t_machine
WHERE machine_id IN (SELECT DISTINCT machine_id FROM machine_temp)
""")).fetchall()
html = "
估价模拟
"
html += ''
return html
# ================= 获取机型模板和选项 =================
@router.get("/simulate_one_del", response_class=HTMLResponse)
def simulate(
machine_id: int,
db: Session = Depends(get_db)
):
# 获取机型对应的模板
row = db.execute(text("""
SELECT estimate_packet
FROM machine_temp
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
html = f"估价模拟 - {tpl['templateId']}
"
html += '
"""
return html
@router.get("/simulate_one", response_class=HTMLResponse)
def simulate_one_del2(
request: Request,
machine_id: int,
db: Session = Depends(get_db)
):
row = db.execute(text("""
SELECT m.name AS machine_name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("未找到机型模板", status_code=404)
# print("*************row****************")
# print(row)
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.machine_name,
"tpl": tpl
}
)
# ================= 提交估价并计算价格 =================
@router.post("/simulate_del", response_class=HTMLResponse)
def simulate_calc(
machine_id: int = Form(...),
option_ids: list[str] = Form([]),
db: Session = Depends(get_db)
):
# 获取基准价格
base_price_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
# base_price = base_price_row.base_price
base_price = base_price_row.base_price if base_price_row else 1000.0
# 获取选项因素
factors = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
# 计算价格
price = base_price
for f in factors:
if f.factor:
price *= (1-float(f.factor))
if f.absolute_deduct:
price -= float(f.absolute_deduct)
# 逐步应用调节因子
# 1. 获取全局调节因子
global_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
""")).fetchone()
if global_adjust:
price *= float(global_adjust.factor)
# 2. 获取品牌调节因子
brand_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id FROM t_machine WHERE machine_id=:mid
)
"""), {"mid": machine_id}).fetchone()
if brand_adjust:
price *= float(brand_adjust.factor)
# 3. 获取机型调节因子
machine_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
"""), {"mid": machine_id}).fetchone()
if machine_adjust:
price *= float(machine_adjust.factor)
html = f"""
估价结果
选择项:{",".join(option_ids)}
估价:{round(price, 2)} 元
返回
"""
return html
@router.post("/simulate_de3", response_class=HTMLResponse)
def simulate_calc(
request: Request,
machine_id: int = Form(...),
option_ids: list[str] = Form([]),
db: Session = Depends(get_db)
):
# 机型名称
machine = db.execute(text("""
SELECT name, brand_name
FROM t_machine
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
# 模板
row = db.execute(text("""
SELECT estimate_packet
FROM machine_temp
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
# ========== 基准价 ==========
base_price_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price = float(base_price_row.base_price) if base_price_row else 1000.0
price = base_price
detail_rows = []
# ========== 选项扣减 ==========
if option_ids:
factors = db.execute(text("""
SELECT
pf.option_id,
pf.factor,
pf.absolute_deduct,
ro.option_name
FROM price_option_factor pf
LEFT JOIN release_option ro
ON pf.option_id = ro.option_id
WHERE pf.option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
factors = []
for f in factors:
before = price
if f.factor is not None:
price *= float(f.factor)
if f.absolute_deduct is not None:
price -= float(f.absolute_deduct)
detail_rows.append({
"type": "option",
"name": f.option_name or str(f.option_id),
"option_id": f.option_id,
"factor": f.factor,
"absolute_deduct": f.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 全局调节 ==========
global_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if global_adjust:
before = price
price *= float(global_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "全局调节",
"factor": global_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 品牌调节 ==========
brand_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id FROM t_machine WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if brand_adjust:
before = price
price *= float(brand_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "品牌调节",
"factor": brand_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 机型调节 ==========
machine_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if machine_adjust:
before = price
price *= float(machine_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "机型调节",
"factor": machine_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
final_price = round(price, 2)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"tpl": tpl,
"machine": machine,
"machine_id": machine_id,
"selected_option_ids": option_ids,
"base_price": round(base_price, 2),
"detail_rows": detail_rows,
"final_price": final_price
}
)
@router.get("/simulate_one", response_class=HTMLResponse)
def simulate_one(
request: Request,
machine_id: int,
db: Session = Depends(get_db)
):
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": None
}
)
# ================= 估价 =================
@router.post("/simulate666", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
# ---------------- 收集 option ----------------
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(str(v))
# ---------------- 重新取模板与机型名 ----------------
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("机型模板不存在")
tpl = json.loads(row.estimate_packet)
# ---------------- 构造 valueId -> valueText 映射 ----------------
value_name_map = {}
for step in tpl["template"]:
for p in step["properties"]:
for v in p["values"]:
value_name_map[str(v["valueId"])] = v["valueText"]
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price = float(base_row.base_price) if base_row else 1000.0
# ---------------- 选项因子 ----------------
detail_rows = []
if option_ids:
rows = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
rows = []
price = base_price
for r in rows:
before = price
if r.factor is not None:
price = price * float(r.factor)
if r.absolute_deduct is not None:
price = price - float(r.absolute_deduct)
detail_rows.append({
"option_id": r.option_id,
"option_name": value_name_map.get(str(r.option_id), str(r.option_id)),
"factor": r.factor,
"absolute": r.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
# ---------------- 调节因子 ----------------
# 全局
g = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if g:
before = price
price = price * float(g.factor)
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "全局调节",
"factor": g.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 品牌
b = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id
FROM t_machine
WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if b:
before = price
price = price * float(b.factor)
detail_rows.append({
"option_id": "BRAND",
"option_name": "品牌调节",
"factor": b.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 机型
m = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if m:
before = price
price = price * float(m.factor)
detail_rows.append({
"option_id": "MACHINE",
"option_name": "机型调节",
"factor": m.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(price, 2),
"details": detail_rows,
"selected": option_ids
}
}
)
@router.post("/simulate7", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
# 收集所有 option
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(str(v))
# ---------------- 重新取模板与机型名 ----------------
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("机型模板不存在")
tpl = json.loads(row.estimate_packet)
# ---------------- 构造 valueId -> valueText 映射 ----------------
value_name_map = {}
for step in tpl["template"]:
for p in step["properties"]:
for v in p["values"]:
value_name_map[str(v["valueId"])] = v["valueText"]
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price_input = form.get("base_price")
if base_price_input and str(base_price_input).strip():
base_price = float(base_price_input)
else:
base_price = float(base_row.base_price) if base_row else 1
# ---------------- 选项因子 ----------------
detail_rows = []
if option_ids:
rows = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
rows = []
price = base_price
total_deduct_rate = 0
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "-- 以下是扣减比例,累加",
"factor": 0,
"absolute": None,
"before": None,
"after": None
})
for r in rows:
before = price
if r.factor:
total_deduct_rate += float(r.factor)
# price = price * (1-float(r.factor))
# if r.absolute_deduct:
# price = price - float(r.absolute_deduct)
detail_rows.append({
"option_id": r.option_id,
"option_name": value_name_map.get(str(r.option_id), str(r.option_id)),
"factor": r.factor,
"absolute": r.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
if total_deduct_rate:
price = price * (1 - total_deduct_rate)
price = max(price, 0.0)
# ---------------- 调节因子 ----------------
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "-- 以下是调节因子,累乘",
"factor": 0,
"absolute": None,
"before": None,
"after": None
})
# 全局
g = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if g:
before = price
price = price * float(g.factor)
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "全局调节系数",
"factor": g.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 品牌
b = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id
FROM t_machine
WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if b:
before = price
price = price * float(b.factor)
detail_rows.append({
"option_id": "BRAND",
"option_name": "品牌调节系数",
"factor": b.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 机型
m = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if m:
before = price
price = price * float(m.factor)
detail_rows.append({
"option_id": "MACHINE",
"option_name": "机型调节系数",
"factor": m.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 重新取模板与机型名
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(price, 2),
"details": detail_rows,
"selected": option_ids
}
}
)
@router.post("/simulate", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(int(v))
# ---------------- 重新取模板与机型名 ----------------
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("机型模板不存在")
tpl = json.loads(row.estimate_packet)
# ---------------- 构造 valueId -> valueText 映射 ----------------
value_name_map = {}
for step in tpl["template"]:
for p in step["properties"]:
for v in p["values"]:
value_name_map[str(v["valueId"])] = v["valueText"]
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price_input = form.get("base_price")
if base_price_input and str(base_price_input).strip():
base_price = float(base_price_input)
else:
base_price = float(base_row.base_price) if base_row else 1
# base_price = float(base_row.base_price) if base_row else 1000.0
# ---------------- 读取选项扣减规则 ----------------
rows = []
if option_ids:
rows = db.execute(text("""
SELECT
p.option_id,
p.factor,
p.absolute_deduct,
p.group_code,
p.severity_level,
p.sub_weight,
p.is_special,
p.repair_level,
g.cap_ratio,
g.group_weight
FROM price_option_factor p
LEFT JOIN price_damage_group g
ON p.group_code = g.group_code
WHERE p.option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
# ---------------- 覆盖关系 ----------------
overrides = db.execute(text("""
SELECT
trigger_group_code,
target_group_code,
override_type,
override_value
FROM price_group_override
""")).fetchall()
override_map = defaultdict(list)
for o in overrides:
override_map[o.trigger_group_code].append(o)
# ---------------- 分组 ----------------
groups = defaultdict(list)
specials = []
repairs = []
for r in rows:
if r.repair_level and r.repair_level > 0:
repairs.append(r)
continue
if r.is_special:
specials.append(r)
continue
groups[r.group_code].append(r)
# ---------------- 覆盖处理 ----------------
hit_groups = set(groups.keys())
skip_groups = set()
weight_override = {}
for g in hit_groups:
for o in override_map.get(g, []):
if o.override_type == "skip":
skip_groups.add(o.target_group_code)
elif o.override_type == "weight":
weight_override[o.target_group_code] = float(o.override_value)
# ---------------- 分组扣减 ----------------
total_ratio = 0.0
detail_rows = []
for group_code, items in groups.items():
if group_code in skip_groups:
continue
items = sorted(
items,
key=lambda x: (
float(x.factor or 0),
x.severity_level or 0
),
reverse=True
)
main = items[0]
group_deduct = float(main.factor or 0)
for sub in items[1:]:
group_deduct += float(sub.factor or 0) * float(sub.sub_weight or 0.3)
cap_ratio = float(main.cap_ratio or 1.0)
cap = float(main.factor or 0) * cap_ratio
group_deduct = min(group_deduct, cap)
group_weight = float(main.group_weight or 1.0)
if group_code in weight_override:
group_weight = weight_override[group_code]
final_group_deduct = group_deduct * group_weight
total_ratio += final_group_deduct
detail_rows.append({
"option_name": f"--{group_code} 组扣减",
"factor": round(final_group_deduct, 4)
})
for it in items:
detail_rows.append({
# "option_name": it.option_id,
"option_name": value_name_map.get(str(it.option_id), str(it.option_id)),
"factor": it.factor
})
# ---------------- special 处理 ----------------
special_ratio = 0.0
for s in specials:
special_ratio += float(s.factor or 0)
detail_rows.append({
"option_name": f"特殊项:{s.option_id}",
"factor": s.factor
})
# 特殊项封顶(示例:30%)
special_ratio = min(special_ratio, 0.30)
total_ratio += special_ratio
# ---------------- 维修分级 ----------------
repair_ratio = 0.0
max_repair_level = 0
for r in repairs:
repair_ratio += float(r.factor or 0)
max_repair_level = max(max_repair_level, r.repair_level or 0)
detail_rows.append({
"option_name": f"维修项:{r.option_id}",
"factor": r.factor
})
# 维修封顶(示例)
if max_repair_level >= 3:
repair_ratio = max(repair_ratio, 0.30)
elif max_repair_level == 2:
repair_ratio = min(repair_ratio, 0.20)
else:
repair_ratio = min(repair_ratio, 0.10)
total_ratio += repair_ratio
# ---------------- 最终价格 ----------------
total_ratio = min(total_ratio, 0.90)
final_price = base_price * (1 - total_ratio)
# ---------------- 返回 ----------------
row = db.execute(text("""
SELECT m.name, t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
# import json
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(final_price, 2),
"total_ratio": round(total_ratio, 4),
"details": detail_rows,
"selected": [str(i) for i in option_ids]
}
}
)
SPECIAL_DISCOUNT_RATIO = 0.70 # 特殊项触发后,对剩余价格打7折
REPAIR_LEVEL_CAP = {
3: 0.60, # 核心维修,最多吃掉基础价60%
2: 0.35, # 重要维修
1: 0.15, # 次要维修
}
def calculate_price(db, base_price: float, selected_option_ids: list[int]):
if not selected_option_ids:
return {
"base_price": base_price,
"final_price": base_price,
"details": []
}
# -------------------------------------------------
# 1. 读取 option 规则
# -------------------------------------------------
rows = db.execute(text("""
SELECT
f.option_id,
f.factor,
f.absolute_deduct,
f.group_code,
f.severity_level,
f.sub_weight,
f.is_special,
f.repair_level,
o.option_name
FROM price_option_factor f
LEFT JOIN release_option o
ON o.option_id = f.option_id
WHERE f.option_id IN :ids
"""), {
"ids": tuple(selected_option_ids)
}).mappings().all()
# -------------------------------------------------
# 2. 分组
# -------------------------------------------------
group_map = defaultdict(list)
special_items = []
repair_items = []
for r in rows:
r = dict(r)
if r["is_special"]:
special_items.append(r)
if r["repair_level"] and r["repair_level"] > 0:
repair_items.append(r)
group_code = r["group_code"] or "default"
group_map[group_code].append(r)
# -------------------------------------------------
# 3. 读取分组配置
# -------------------------------------------------
group_rows = db.execute(text("""
SELECT
group_code,
cap_ratio,
group_weight
FROM price_damage_group
""")).mappings().all()
group_conf = {r["group_code"]: r for r in group_rows}
# -------------------------------------------------
# 4. 读取组联动
# -------------------------------------------------
override_rows = db.execute(text("""
SELECT
trigger_group_code,
target_group_code,
override_type,
override_value
FROM price_group_override
""")).mappings().all()
overrides = defaultdict(list)
for r in override_rows:
overrides[r["trigger_group_code"]].append(r)
# -------------------------------------------------
# 5. 普通损伤组计算(不含repair组)
# -------------------------------------------------
group_result = {}
for group_code, items in group_map.items():
if group_code == "repair":
continue
effective = [
i for i in items
if (i["factor"] or 0) > 0 or (i["absolute_deduct"] or 0) > 0
]
if not effective:
continue
effective.sort(
key=lambda x: (
x["severity_level"] or 0,
x["factor"] or 0
),
reverse=True
)
main = effective[0]
minors = effective[1:]
main_deduct = base_price * (main["factor"] or 0) + (main["absolute_deduct"] or 0)
minor_sum = 0
for m in minors:
w = m["sub_weight"] if m["sub_weight"] is not None else 0.3
minor_sum += base_price * (m["factor"] or 0) * w
raw = main_deduct + minor_sum
conf = group_conf.get(group_code)
if conf:
cap = main_deduct * float(conf["cap_ratio"])
deduct = min(raw, cap)
deduct *= float(conf["group_weight"])
else:
deduct = raw
group_result[group_code] = {
"deduct": deduct,
"main": main,
"has_effect": deduct > 0
}
# -------------------------------------------------
# 6. repair 组分级模型
# -------------------------------------------------
repair_deduct = 0
repair_detail = []
if repair_items:
by_level = defaultdict(list)
for r in repair_items:
by_level[r["repair_level"]].append(r)
for level, items in by_level.items():
items = [
i for i in items
if (i["factor"] or 0) > 0 or (i["absolute_deduct"] or 0) > 0
]
if not items:
continue
# 同级只取最严重
items.sort(key=lambda x: x["factor"] or 0, reverse=True)
main = items[0]
d = base_price * (main["factor"] or 0) + (main["absolute_deduct"] or 0)
cap_ratio = REPAIR_LEVEL_CAP.get(level, 1.0)
d = min(d, base_price * cap_ratio)
repair_deduct += d
repair_detail.append({
"repair_level": level,
"option_name": main["option_name"],
"deduct": round(d, 2)
})
group_result["repair"] = {
"deduct": repair_deduct,
"main": None,
"has_effect": repair_deduct > 0,
"detail": repair_detail
}
# -------------------------------------------------
# 7. 组联动
# -------------------------------------------------
disabled_groups = set()
group_weight_override = {}
for trigger, rules in overrides.items():
if trigger not in group_result:
continue
if not group_result[trigger]["has_effect"]:
continue
for rule in rules:
target = rule["target_group_code"]
if rule["override_type"] == "skip":
disabled_groups.add(target)
elif rule["override_type"] == "weight":
group_weight_override[target] = float(rule["override_value"])
# -------------------------------------------------
# 8. 汇总普通扣减
# -------------------------------------------------
total_deduct = 0
details = []
for group_code, g in group_result.items():
if group_code in disabled_groups:
continue
d = g["deduct"]
if group_code in group_weight_override:
d *= group_weight_override[group_code]
total_deduct += d
if group_code == "repair":
details.append({
"group": "repair",
"detail": g["detail"],
"group_deduct": round(d, 2)
})
else:
details.append({
"group": group_code,
"option_name": g["main"]["option_name"],
"group_deduct": round(d, 2)
})
price_after_damage = base_price - total_deduct
if price_after_damage < 0:
price_after_damage = 0
# -------------------------------------------------
# 9. 特殊项整体折价
# -------------------------------------------------
special_applied = []
if special_items:
# 你后面可以在这里按 option 决定不同折扣
price_after_damage *= SPECIAL_DISCOUNT_RATIO
for s in special_items:
special_applied.append(s["option_name"])
final_price = round(price_after_damage, 2)
return {
"base_price": round(base_price, 2),
"final_price": final_price,
"damage_deduct": round(total_deduct, 2),
"special_discount_applied": special_applied,
"details": details
}
# curl -sS -X POST "http://127.0.0.1:8002/api/product/bid" -H "Content-Type: application/json" -H "X-API-Key: dHiw0yduUhnpQRV9z30EGpJD8fAne1CJ" -d '{"task_id":2,"task_types":["bid"]}'