from fastapi import APIRouter, Request, Depends, Form
from fastapi.responses import HTMLResponse
from sqlalchemy.orm import Session
from sqlalchemy import text
from database import SessionLocal, redis_client
from services.template_service import build_template
from services.price_service import apply_adjust
from fastapi.templating import Jinja2Templates
import json
import uuid
router = APIRouter(prefix="/estimate", tags=["estimate"])
templates = Jinja2Templates(directory="templates")
# ================= DB =================
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ================= 获取所有机型 =================
@router.get("/simulate", response_class=HTMLResponse)
def simulate(db: Session = Depends(get_db)):
# 获取所有有模板的机型
machines = db.execute(text("""
SELECT machine_id, name
FROM t_machine
WHERE machine_id IN (SELECT DISTINCT machine_id FROM machine_temp)
""")).fetchall()
html = "
估价模拟
"
html += ''
return html
# ================= 获取机型模板和选项 =================
@router.get("/simulate_one_del", response_class=HTMLResponse)
def simulate(
machine_id: int,
db: Session = Depends(get_db)
):
# 获取机型对应的模板
row = db.execute(text("""
SELECT estimate_packet
FROM machine_temp
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
html = f"估价模拟 - {tpl['templateId']}
"
html += '
"""
return html
@router.get("/simulate_one", response_class=HTMLResponse)
def simulate_one_del2(
request: Request,
machine_id: int,
db: Session = Depends(get_db)
):
row = db.execute(text("""
SELECT m.name AS machine_name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("未找到机型模板", status_code=404)
# print("*************row****************")
# print(row)
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.machine_name,
"tpl": tpl
}
)
# ================= 提交估价并计算价格 =================
@router.post("/simulate_del", response_class=HTMLResponse)
def simulate_calc(
machine_id: int = Form(...),
option_ids: list[str] = Form([]),
db: Session = Depends(get_db)
):
# 获取基准价格
base_price_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
# base_price = base_price_row.base_price
base_price = base_price_row.base_price if base_price_row else 1000.0
# 获取选项因素
factors = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
# 计算价格
price = base_price
for f in factors:
if f.factor:
price *= (1-float(f.factor))
if f.absolute_deduct:
price -= float(f.absolute_deduct)
# 逐步应用调节因子
# 1. 获取全局调节因子
global_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
""")).fetchone()
if global_adjust:
price *= float(global_adjust.factor)
# 2. 获取品牌调节因子
brand_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id FROM t_machine WHERE machine_id=:mid
)
"""), {"mid": machine_id}).fetchone()
if brand_adjust:
price *= float(brand_adjust.factor)
# 3. 获取机型调节因子
machine_adjust = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
"""), {"mid": machine_id}).fetchone()
if machine_adjust:
price *= float(machine_adjust.factor)
html = f"""
估价结果
选择项:{",".join(option_ids)}
估价:{round(price, 2)} 元
返回
"""
return html
@router.post("/simulate_de3", response_class=HTMLResponse)
def simulate_calc(
request: Request,
machine_id: int = Form(...),
option_ids: list[str] = Form([]),
db: Session = Depends(get_db)
):
# 机型名称
machine = db.execute(text("""
SELECT name, brand_name
FROM t_machine
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
# 模板
row = db.execute(text("""
SELECT estimate_packet
FROM machine_temp
WHERE machine_id=:mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
# ========== 基准价 ==========
base_price_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price = float(base_price_row.base_price) if base_price_row else 1000.0
price = base_price
detail_rows = []
# ========== 选项扣减 ==========
if option_ids:
factors = db.execute(text("""
SELECT
pf.option_id,
pf.factor,
pf.absolute_deduct,
ro.option_name
FROM price_option_factor pf
LEFT JOIN release_option ro
ON pf.option_id = ro.option_id
WHERE pf.option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
factors = []
for f in factors:
before = price
if f.factor is not None:
price *= float(f.factor)
if f.absolute_deduct is not None:
price -= float(f.absolute_deduct)
detail_rows.append({
"type": "option",
"name": f.option_name or str(f.option_id),
"option_id": f.option_id,
"factor": f.factor,
"absolute_deduct": f.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 全局调节 ==========
global_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if global_adjust:
before = price
price *= float(global_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "全局调节",
"factor": global_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 品牌调节 ==========
brand_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id FROM t_machine WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if brand_adjust:
before = price
price *= float(brand_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "品牌调节",
"factor": brand_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
# ========== 机型调节 ==========
machine_adjust = db.execute(text("""
SELECT factor
FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if machine_adjust:
before = price
price *= float(machine_adjust.factor)
detail_rows.append({
"type": "adjust",
"name": "机型调节",
"factor": machine_adjust.factor,
"before": round(before, 2),
"after": round(price, 2)
})
final_price = round(price, 2)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"tpl": tpl,
"machine": machine,
"machine_id": machine_id,
"selected_option_ids": option_ids,
"base_price": round(base_price, 2),
"detail_rows": detail_rows,
"final_price": final_price
}
)
@router.get("/simulate_one", response_class=HTMLResponse)
def simulate_one(
request: Request,
machine_id: int,
db: Session = Depends(get_db)
):
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": None
}
)
# ================= 估价 =================
@router.post("/simulate666", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
# ---------------- 收集 option ----------------
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(str(v))
# ---------------- 重新取模板与机型名 ----------------
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("机型模板不存在")
tpl = json.loads(row.estimate_packet)
# ---------------- 构造 valueId -> valueText 映射 ----------------
value_name_map = {}
for step in tpl["template"]:
for p in step["properties"]:
for v in p["values"]:
value_name_map[str(v["valueId"])] = v["valueText"]
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price = float(base_row.base_price) if base_row else 1000.0
# ---------------- 选项因子 ----------------
detail_rows = []
if option_ids:
rows = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
rows = []
price = base_price
for r in rows:
before = price
if r.factor is not None:
price = price * float(r.factor)
if r.absolute_deduct is not None:
price = price - float(r.absolute_deduct)
detail_rows.append({
"option_id": r.option_id,
"option_name": value_name_map.get(str(r.option_id), str(r.option_id)),
"factor": r.factor,
"absolute": r.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
# ---------------- 调节因子 ----------------
# 全局
g = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if g:
before = price
price = price * float(g.factor)
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "全局调节",
"factor": g.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 品牌
b = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id
FROM t_machine
WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if b:
before = price
price = price * float(b.factor)
detail_rows.append({
"option_id": "BRAND",
"option_name": "品牌调节",
"factor": b.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 机型
m = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if m:
before = price
price = price * float(m.factor)
detail_rows.append({
"option_id": "MACHINE",
"option_name": "机型调节",
"factor": m.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(price, 2),
"details": detail_rows,
"selected": option_ids
}
}
)
@router.post("/simulate7", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
# 收集所有 option
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(str(v))
# ---------------- 重新取模板与机型名 ----------------
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
if not row:
return HTMLResponse("机型模板不存在")
tpl = json.loads(row.estimate_packet)
# ---------------- 构造 valueId -> valueText 映射 ----------------
value_name_map = {}
for step in tpl["template"]:
for p in step["properties"]:
for v in p["values"]:
value_name_map[str(v["valueId"])] = v["valueText"]
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price_input = form.get("base_price")
if base_price_input and str(base_price_input).strip():
base_price = float(base_price_input)
else:
base_price = float(base_row.base_price) if base_row else 1
# ---------------- 选项因子 ----------------
detail_rows = []
if option_ids:
rows = db.execute(text("""
SELECT option_id, factor, absolute_deduct
FROM price_option_factor
WHERE option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
else:
rows = []
price = base_price
total_deduct_rate = 0
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "-- 以下是扣减比例,累加",
"factor": 0,
"absolute": None,
"before": None,
"after": None
})
for r in rows:
before = price
if r.factor:
total_deduct_rate += float(r.factor)
# price = price * (1-float(r.factor))
# if r.absolute_deduct:
# price = price - float(r.absolute_deduct)
detail_rows.append({
"option_id": r.option_id,
"option_name": value_name_map.get(str(r.option_id), str(r.option_id)),
"factor": r.factor,
"absolute": r.absolute_deduct,
"before": round(before, 2),
"after": round(price, 2)
})
if total_deduct_rate:
price = price * (1 - total_deduct_rate)
price = max(price, 0.0)
# ---------------- 调节因子 ----------------
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "-- 以下是调节因子,累乘",
"factor": 0,
"absolute": None,
"before": None,
"after": None
})
# 全局
g = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='global'
LIMIT 1
""")).fetchone()
if g:
before = price
price = price * float(g.factor)
detail_rows.append({
"option_id": "GLOBAL",
"option_name": "全局调节系数",
"factor": g.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 品牌
b = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='brand'
AND ref_id=(
SELECT brand_id
FROM t_machine
WHERE machine_id=:mid
)
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if b:
before = price
price = price * float(b.factor)
detail_rows.append({
"option_id": "BRAND",
"option_name": "品牌调节系数",
"factor": b.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 机型
m = db.execute(text("""
SELECT factor FROM price_adjust_factor
WHERE level='machine'
AND ref_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
if m:
before = price
price = price * float(m.factor)
detail_rows.append({
"option_id": "MACHINE",
"option_name": "机型调节系数",
"factor": m.factor,
"absolute": None,
"before": round(before, 2),
"after": round(price, 2)
})
# 重新取模板与机型名
row = db.execute(text("""
SELECT m.name,
t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(price, 2),
"details": detail_rows,
"selected": option_ids
}
}
)
@router.post("/simulate", response_class=HTMLResponse)
async def simulate_calc(
request: Request,
machine_id: int = Form(...),
db: Session = Depends(get_db)
):
form = await request.form()
option_ids = []
for k, v in form.multi_items():
if k.startswith("option_"):
option_ids.append(int(v))
# ---------------- 基准价 ----------------
base_row = db.execute(text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid
LIMIT 1
"""), {"mid": machine_id}).fetchone()
base_price = float(base_row.base_price) if base_row else 1000.0
# ---------------- 读取选项扣减规则 ----------------
rows = []
if option_ids:
rows = db.execute(text("""
SELECT
p.option_id,
p.factor,
p.absolute_deduct,
p.group_code,
p.severity_level,
p.sub_weight,
p.is_special,
p.repair_level,
g.cap_ratio,
g.group_weight
FROM price_option_factor p
LEFT JOIN price_damage_group g
ON p.group_code = g.group_code
WHERE p.option_id IN :ids
"""), {"ids": tuple(option_ids)}).fetchall()
# ---------------- 覆盖关系 ----------------
overrides = db.execute(text("""
SELECT
trigger_group_code,
target_group_code,
override_type,
override_value
FROM price_group_override
""")).fetchall()
override_map = defaultdict(list)
for o in overrides:
override_map[o.trigger_group_code].append(o)
# ---------------- 分组 ----------------
groups = defaultdict(list)
specials = []
repairs = []
for r in rows:
if r.repair_level and r.repair_level > 0:
repairs.append(r)
continue
if r.is_special:
specials.append(r)
continue
groups[r.group_code].append(r)
# ---------------- 覆盖处理 ----------------
hit_groups = set(groups.keys())
skip_groups = set()
weight_override = {}
for g in hit_groups:
for o in override_map.get(g, []):
if o.override_type == "skip":
skip_groups.add(o.target_group_code)
elif o.override_type == "weight":
weight_override[o.target_group_code] = float(o.override_value)
# ---------------- 分组扣减 ----------------
total_ratio = 0.0
detail_rows = []
for group_code, items in groups.items():
if group_code in skip_groups:
continue
items = sorted(
items,
key=lambda x: (
float(x.factor or 0),
x.severity_level or 0
),
reverse=True
)
main = items[0]
group_deduct = float(main.factor or 0)
for sub in items[1:]:
group_deduct += float(sub.factor or 0) * float(sub.sub_weight or 0.3)
cap_ratio = float(main.cap_ratio or 1.0)
cap = float(main.factor or 0) * cap_ratio
group_deduct = min(group_deduct, cap)
group_weight = float(main.group_weight or 1.0)
if group_code in weight_override:
group_weight = weight_override[group_code]
final_group_deduct = group_deduct * group_weight
total_ratio += final_group_deduct
detail_rows.append({
"option_name": f"--{group_code} 组扣减",
"factor": round(final_group_deduct, 4)
})
for it in items:
detail_rows.append({
"option_name": it.option_id,
"factor": it.factor
})
# ---------------- special 处理 ----------------
special_ratio = 0.0
for s in specials:
special_ratio += float(s.factor or 0)
detail_rows.append({
"option_name": f"特殊项:{s.option_id}",
"factor": s.factor
})
# 特殊项封顶(示例:30%)
special_ratio = min(special_ratio, 0.30)
total_ratio += special_ratio
# ---------------- 维修分级 ----------------
repair_ratio = 0.0
max_repair_level = 0
for r in repairs:
repair_ratio += float(r.factor or 0)
max_repair_level = max(max_repair_level, r.repair_level or 0)
detail_rows.append({
"option_name": f"维修项:{r.option_id}",
"factor": r.factor
})
# 维修封顶(示例)
if max_repair_level >= 3:
repair_ratio = max(repair_ratio, 0.30)
elif max_repair_level == 2:
repair_ratio = min(repair_ratio, 0.20)
else:
repair_ratio = min(repair_ratio, 0.10)
total_ratio += repair_ratio
# ---------------- 最终价格 ----------------
total_ratio = min(total_ratio, 0.90)
final_price = base_price * (1 - total_ratio)
# ---------------- 返回 ----------------
row = db.execute(text("""
SELECT m.name, t.estimate_packet
FROM machine_temp t
JOIN t_machine m ON t.machine_id = m.machine_id
WHERE t.machine_id = :mid
"""), {"mid": machine_id}).fetchone()
import json
tpl = json.loads(row.estimate_packet)
return templates.TemplateResponse(
"simulate_one.html",
{
"request": request,
"machine_id": machine_id,
"machine_name": row.name,
"tpl": tpl,
"result": {
"base_price": round(base_price, 2),
"final_price": round(final_price, 2),
"total_ratio": round(total_ratio, 4),
"details": detail_rows,
"selected": [str(i) for i in option_ids]
}
}
)
@router.post("/estimate2", response_class=HTMLResponse)
def estimate_submit(
request: Request,
machine_id: int = Form(...),
capacity: str = Form(...),
option_ids: list[int] = Form([]),
db: Session = Depends(get_db)
):
# ---------- 基础价格 ----------
base_price = db.execute(
text("""
SELECT base_price
FROM machine_base_price
WHERE machine_id=:mid AND capacity=:cap
"""),
{"mid": machine_id, "cap": capacity}
).scalar() or 0
price = float(base_price)
# ---------- 扣减项 ----------
for oid in option_ids:
r = db.execute(
text("""
SELECT factor, absolute_deduct
FROM price_option_factor
WHERE option_id=:oid
"""),
{"oid": oid}
).fetchone()
if r:
price = price * float(r.factor) - float(r.absolute_deduct)
# ---------- 调节系数 ----------
brand_id = db.execute(
text("""
SELECT brand_id
FROM t_machine
WHERE machine_id=:mid
"""),
{"mid": machine_id}
).scalar()
price = apply_adjust(db, machine_id, brand_id, price)
price = round(max(price, 0), 2)
# ---------- 估价版本 ----------
version_no = str(uuid.uuid4())[:8]
db.execute(
text("""
INSERT INTO estimate_record
(machine_id, capacity, option_ids, final_price, version_no)
VALUES (:m, :c, :o, :p, :v)
"""),
{
"m": machine_id,
"c": capacity,
"o": ",".join(map(str, option_ids)),
"p": price,
"v": version_no
}
)
db.commit()
# ---------- 模板(Redis 缓存) ----------
cache_key = f"template:{machine_id}"
cached = redis_client.get(cache_key) if redis_client else None
if cached:
template = json.loads(cached)
else:
rows = db.execute(text("SELECT * FROM release_option")).fetchall()
template = build_template(rows)
if redis_client:
redis_client.set(cache_key, json.dumps(template), ex=3600)
# ---------- 重新加载页面 ----------
machines = db.execute(
text("""
SELECT machine_id, name
FROM t_machine
ORDER BY brand_name, name
""")
).fetchall()
capacities = db.execute(
text("""
SELECT capacity
FROM machine_base_price
WHERE machine_id=:mid
"""),
{"mid": machine_id}
).fetchall()
return request.app.state.templates.TemplateResponse(
"estimate.html",
{
"request": request,
"machines": machines,
"capacities": capacities,
"template": template,
"price": price,
"version": version_no,
"selected_machine": machine_id,
"selected_capacity": capacity
}
)