2699 lines
108 KiB
Python
2699 lines
108 KiB
Python
"""
|
||
main.py
|
||
FastAPI 主应用:设备管理平台后端服务。
|
||
|
||
启动方式:
|
||
uvicorn main:app --reload --host 0.0.0.0 --port 8000
|
||
|
||
访问文档:
|
||
http://localhost:8000/docs (Swagger UI)
|
||
http://localhost:8000/redoc (ReDoc)
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import hashlib
|
||
import shutil
|
||
from datetime import datetime
|
||
from typing import Optional, List
|
||
|
||
from pydantic import BaseModel
|
||
from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Query, Request, Body
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import RedirectResponse, FileResponse, StreamingResponse
|
||
from contextlib import asynccontextmanager
|
||
|
||
from database import init_db, seed_demo_data, get_db, migrate_db, UPLOAD_DIR, scan_firmware_directory, FIRMWARE_TYPE_FOLDER, FIRMWARE_PUBLIC_DIR
|
||
from models import (
|
||
DeviceCheckRequest,
|
||
LicenseGenerateRequest,
|
||
ActivateReportRequest,
|
||
CheckResponse,
|
||
LicenseResponse,
|
||
ActivateResponse,
|
||
DeviceInfo,
|
||
ConfigFileCreateRequest,
|
||
ConfigFileUpdateRequest,
|
||
CalibrationFileInfo,
|
||
CalibrationUpdateRequest,
|
||
AppVersionCreateRequest,
|
||
AppVersionUpdateRequest,
|
||
AppVersionInfo,
|
||
AppCheckUpdateRequest,
|
||
AppCheckUpdateResponse,
|
||
FirmwareCreateRequest,
|
||
FirmwareUpdateRequest,
|
||
FirmwareInfo,
|
||
FirmwareCheckUpdateRequest,
|
||
FirmwareCheckUpdateResponse,
|
||
)
|
||
from services import build_license_data, encrypt_license, decrypt_license
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 生命周期:启动时初始化数据库
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
"""应用启动时自动创建表、执行迁移并插入演示数据"""
|
||
init_db()
|
||
migrate_db()
|
||
seed_demo_data()
|
||
yield
|
||
# 关闭时可做清理(此处无需)
|
||
|
||
|
||
app = FastAPI(
|
||
title="设备管理平台 API",
|
||
description="基于 FastAPI + SQLite 的轻量级设备管理后端,支持设备校验、授权文件生成、激活上报、校准文件管理、APP版本管理、固件版本管理。",
|
||
version="2.0.0",
|
||
lifespan=lifespan,
|
||
)
|
||
|
||
# 允许跨域(方便前端/APP 调试)
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
def now_str() -> int:
|
||
"""返回当前时间的 Unix 时间戳(秒)"""
|
||
return int(datetime.now().timestamp())
|
||
|
||
|
||
def now_date_str() -> int:
|
||
"""返回当前日期 00:00:00 的 Unix 时间戳(秒)"""
|
||
today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
|
||
return int(today.timestamp())
|
||
|
||
|
||
def save_upload_file(upload_file: UploadFile, subdir: str) -> tuple:
|
||
"""
|
||
保存上传文件到指定子目录。
|
||
返回:(file_path, file_name, file_size, md5)
|
||
"""
|
||
target_dir = os.path.join(UPLOAD_DIR, subdir)
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
original_name = upload_file.filename or "unknown"
|
||
timestamp = int(datetime.now().timestamp())
|
||
safe_name = f"{timestamp}_{original_name}"
|
||
file_path = os.path.join(target_dir, safe_name)
|
||
|
||
file_size = 0
|
||
md5_hash = hashlib.md5()
|
||
with open(file_path, "wb") as f:
|
||
while chunk := upload_file.file.read(8192):
|
||
f.write(chunk)
|
||
file_size += len(chunk)
|
||
md5_hash.update(chunk)
|
||
|
||
md5 = md5_hash.hexdigest()
|
||
return file_path, safe_name, file_size, md5
|
||
|
||
|
||
def delete_file_if_exists(file_path: Optional[str]):
|
||
"""如果文件存在则删除"""
|
||
if file_path and os.path.exists(file_path):
|
||
os.remove(file_path)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 根路由:自动跳转到 Swagger 文档
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/", include_in_schema=False)
|
||
def root():
|
||
return RedirectResponse(url="/docs")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 1. 设备校验接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/devices/check", response_model=CheckResponse, summary="设备校验")
|
||
def device_check(payload: DeviceCheckRequest):
|
||
"""
|
||
APP 启动时调用,校验设备 SN 是否合法、是否已激活。
|
||
|
||
- 若 SN 不存在 → 非法设备,拒绝激活
|
||
- 若 SN 存在但未激活 → 允许进入激活流程
|
||
- 若 SN 已激活 → 正常使用
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT status FROM devices WHERE sn = ?", (payload.sn,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return CheckResponse(
|
||
valid=False,
|
||
activated=False,
|
||
message="设备 SN 不存在,请联系管理员注册"
|
||
)
|
||
|
||
status = row["status"]
|
||
if status == "已激活":
|
||
return CheckResponse(
|
||
valid=True,
|
||
activated=True,
|
||
message="设备已激活,正常使用"
|
||
)
|
||
elif status == "已禁用":
|
||
return CheckResponse(
|
||
valid=True,
|
||
activated=False,
|
||
message="设备已被禁用,请联系客服"
|
||
)
|
||
else:
|
||
return CheckResponse(
|
||
valid=True,
|
||
activated=False,
|
||
message="设备待激活,请获取授权文件并激活"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 根路由:自动跳转到 Swagger 文档
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/", include_in_schema=False)
|
||
def root():
|
||
return RedirectResponse(url="/docs")
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 0. 设备管理接口(CRUD)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/devices", summary="获取设备列表")
|
||
def get_devices(
|
||
sn: Optional[str] = Query(None, description="按 SN 搜索"),
|
||
model: Optional[str] = Query(None, description="按型号过滤"),
|
||
status: Optional[str] = Query(None, description="按状态过滤"),
|
||
):
|
||
"""
|
||
获取所有设备列表,支持按 SN、型号、状态过滤。
|
||
返回前端需要的字段格式。
|
||
"""
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM devices WHERE 1=1"
|
||
params = []
|
||
|
||
if sn:
|
||
sql += " AND sn LIKE ?"
|
||
params.append(f"%{sn}%")
|
||
if model:
|
||
sql += " AND model = ?"
|
||
params.append(model)
|
||
if status:
|
||
sql += " AND status = ?"
|
||
params.append(status)
|
||
|
||
sql += " ORDER BY created_at DESC"
|
||
rows = db.execute(sql, params).fetchall()
|
||
|
||
# 转换为字典列表,匹配前端期望的字段
|
||
result = []
|
||
for row in rows:
|
||
result.append({
|
||
"id": row["id"],
|
||
"sn": row["sn"],
|
||
"model": row["model"] or "",
|
||
"type": row["type"] or "",
|
||
"status": row["status"],
|
||
"firmware": row["firmware"] or "",
|
||
"production_date": row["production_date"] or "",
|
||
"customer": row["customer"] or "-",
|
||
"batch": row["batch"] or ""
|
||
})
|
||
|
||
return result
|
||
|
||
|
||
@app.post("/api/devices/register", summary="注册新设备")
|
||
def register_device(payload: dict):
|
||
"""
|
||
注册新设备到系统。
|
||
接收设备信息并插入数据库。
|
||
"""
|
||
sn = payload.get("sn", "")
|
||
model = payload.get("model", "")
|
||
type_val = payload.get("type", "")
|
||
firmware = payload.get("firmware", "")
|
||
production_date = payload.get("production_date", "")
|
||
customer = payload.get("customer", "-")
|
||
batch = payload.get("batch", "")
|
||
|
||
if not sn:
|
||
raise HTTPException(status_code=400, detail="设备 SN 不能为空")
|
||
|
||
with get_db() as db:
|
||
# 检查 SN 是否已存在
|
||
existing = db.execute("SELECT id FROM devices WHERE sn = ?", (sn,)).fetchone()
|
||
if existing:
|
||
raise HTTPException(status_code=409, detail=f"设备 SN {sn} 已存在")
|
||
|
||
# 插入新设备
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO devices (sn, model, type, status, firmware, production_date, customer, batch, created_at)
|
||
VALUES (?, ?, ?, '待激活', ?, ?, ?, ?, ?)
|
||
""",
|
||
(sn, model, type_val, firmware, production_date, customer, batch, now_str())
|
||
)
|
||
db.commit()
|
||
device_id = cur.lastrowid
|
||
|
||
return {
|
||
"success": True,
|
||
"id": device_id,
|
||
"message": "设备注册成功"
|
||
}
|
||
|
||
|
||
@app.put("/api/devices", summary="更新设备信息")
|
||
def update_device(payload: dict):
|
||
"""
|
||
更新设备信息(如状态、固件版本等)。
|
||
"""
|
||
device_id = payload.get("id")
|
||
if not device_id:
|
||
raise HTTPException(status_code=400, detail="设备 ID 不能为空")
|
||
|
||
with get_db() as db:
|
||
# 检查设备是否存在
|
||
existing = db.execute("SELECT id FROM devices WHERE id = ?", (device_id,)).fetchone()
|
||
if not existing:
|
||
raise HTTPException(status_code=404, detail="设备不存在")
|
||
|
||
# 构建更新语句
|
||
updates = []
|
||
params = []
|
||
|
||
if "status" in payload:
|
||
updates.append("status = ?")
|
||
params.append(payload["status"])
|
||
if "firmware" in payload:
|
||
updates.append("firmware = ?")
|
||
params.append(payload["firmware"])
|
||
if "activated_at" in payload:
|
||
updates.append("activated_at = ?")
|
||
params.append(payload["activated_at"])
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
params.append(device_id)
|
||
db.execute(
|
||
f"UPDATE devices SET {', '.join(updates)} WHERE id = ?",
|
||
params
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "设备信息更新成功"}
|
||
|
||
|
||
@app.delete("/api/devices", summary="删除设备")
|
||
def delete_device(payload: dict):
|
||
"""
|
||
删除指定设备。
|
||
"""
|
||
device_id = payload.get("id")
|
||
if not device_id:
|
||
raise HTTPException(status_code=400, detail="设备 ID 不能为空")
|
||
|
||
with get_db() as db:
|
||
# 检查设备是否存在
|
||
existing = db.execute("SELECT id FROM devices WHERE id = ?", (device_id,)).fetchone()
|
||
if not existing:
|
||
raise HTTPException(status_code=404, detail="设备不存在")
|
||
|
||
db.execute("DELETE FROM devices WHERE id = ?", (device_id,))
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "设备已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 2. 生成加密授权文件接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/licenses/generate", response_model=LicenseResponse, summary="生成加密授权文件")
|
||
def generate_license(payload: LicenseGenerateRequest):
|
||
"""
|
||
管理后台调用,为指定设备生成绑定 SN 的加密授权文件。
|
||
|
||
流程:
|
||
1. 校验设备 SN 是否存在
|
||
2. 若指定了 config_id,则读取配置文件内容
|
||
3. 构造授权数据(模块列表 + 有效期 + 配置信息)
|
||
4. XOR 加密(密钥由 SN 派生,一机一密)
|
||
5. 返回 Base64 加密字符串
|
||
|
||
APP 收到后必须用相同 SN 才能解密。解密后可获得完整的配置参数。
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT sn FROM devices WHERE sn = ?", (payload.sn,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="设备 SN 不存在")
|
||
|
||
# 读取配置文件(如果指定了 config_id)
|
||
config = None
|
||
if payload.config_id is not None:
|
||
with get_db() as db:
|
||
cfg_row = db.execute(
|
||
"SELECT * FROM config_files WHERE id = ?", (payload.config_id,)
|
||
).fetchone()
|
||
if not cfg_row:
|
||
raise HTTPException(status_code=404, detail=f"配置文件 ID {payload.config_id} 不存在")
|
||
config = dict(cfg_row)
|
||
|
||
# 构造明文授权数据
|
||
license_data = build_license_data(
|
||
device_sn=payload.sn,
|
||
modules=payload.modules,
|
||
valid_days=payload.valid_days,
|
||
)
|
||
|
||
# 将配置信息嵌入授权数据
|
||
raw_license = license_data.model_dump()
|
||
if config:
|
||
raw_license["config"] = {
|
||
"name": config.get("name"),
|
||
"model": config.get("model"),
|
||
"version": config.get("version"),
|
||
"max_tx_voltage": config.get("max_tx_voltage"),
|
||
"max_tx_current": config.get("max_tx_current"),
|
||
"tx_waveform": config.get("tx_waveform"),
|
||
"tx_pulse_width": config.get("tx_pulse_width"),
|
||
"acq_channels": config.get("acq_channels"),
|
||
"acq_sample_rate": config.get("acq_sample_rate"),
|
||
"acq_voltage_range": config.get("acq_voltage_range"),
|
||
"full_waveform_capture": config.get("full_waveform_capture"),
|
||
"ssid_prefix": config.get("ssid_prefix"),
|
||
}
|
||
|
||
# 加密(绑定 SN)
|
||
encrypted = encrypt_license(license_data)
|
||
# 重新加密包含 config 的完整授权
|
||
from services import _derive_key
|
||
import base64
|
||
json_bytes = json.dumps(raw_license, ensure_ascii=False).encode("utf-8")
|
||
key = _derive_key(payload.sn)
|
||
encrypted = bytearray()
|
||
for i, byte in enumerate(json_bytes):
|
||
encrypted.append(byte ^ key[i % len(key)])
|
||
encrypted_b64 = base64.b64encode(encrypted).decode("ascii")
|
||
|
||
return LicenseResponse(
|
||
success=True,
|
||
encrypted_license=encrypted_b64,
|
||
raw_license=raw_license, # 调试用,生产环境可去掉
|
||
message="授权文件生成成功,已绑定设备 SN 并嵌入配置信息" if config else "授权文件生成成功,已绑定设备 SN"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 3. 上报激活状态接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/devices/activate", response_model=ActivateResponse, summary="上报激活状态")
|
||
def report_activation(payload: ActivateReportRequest):
|
||
"""
|
||
设备主机首次联网后调用,上报激活结果。
|
||
|
||
- 若上报 "已激活" → 数据库更新状态与激活时间
|
||
- 若上报 "激活失败" → 状态保持原样,记录失败
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT status FROM devices WHERE sn = ?", (payload.sn,)
|
||
).fetchone()
|
||
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="设备 SN 不存在")
|
||
|
||
current_status = row["status"]
|
||
|
||
# 只有"待激活"或"已禁用"的设备允许重新激活
|
||
if payload.status == "已激活":
|
||
db.execute(
|
||
"""
|
||
UPDATE devices
|
||
SET status = '已激活',
|
||
activated_at = ?
|
||
WHERE sn = ?
|
||
""",
|
||
(now_str(), payload.sn),
|
||
)
|
||
db.commit()
|
||
|
||
# 查询更新后的时间
|
||
updated = db.execute(
|
||
"SELECT activated_at FROM devices WHERE sn = ?", (payload.sn,)
|
||
).fetchone()
|
||
|
||
return ActivateResponse(
|
||
success=True,
|
||
sn=payload.sn,
|
||
new_status="已激活",
|
||
activated_at=updated["activated_at"],
|
||
message="设备激活成功,已记录激活时间"
|
||
)
|
||
else:
|
||
return ActivateResponse(
|
||
success=False,
|
||
sn=payload.sn,
|
||
new_status=current_status,
|
||
message="激活失败,未更新状态"
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 4. 配置文件管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/config-files", summary="创建配置文件")
|
||
def create_config_file(payload: ConfigFileCreateRequest):
|
||
"""
|
||
创建设备技术参数配置文件。
|
||
配置文件包含发射参数、采集参数和网络参数,
|
||
生成授权文件时可通过 config_id 关联,将配置嵌入授权中下发给设备。
|
||
"""
|
||
with get_db() as db:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO config_files
|
||
(name, model, version, status,
|
||
max_tx_voltage, max_tx_current, tx_waveform, tx_pulse_width,
|
||
acq_channels, acq_sample_rate, acq_voltage_range, full_waveform_capture,
|
||
ssid_prefix, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
payload.name, payload.model, payload.version, payload.status,
|
||
payload.max_tx_voltage or "", payload.max_tx_current or "",
|
||
payload.tx_waveform or "", payload.tx_pulse_width or "",
|
||
payload.acq_channels or "", payload.acq_sample_rate or "",
|
||
payload.acq_voltage_range or "", payload.full_waveform_capture or "",
|
||
payload.ssid_prefix or "",
|
||
now_str(),
|
||
),
|
||
)
|
||
cfg_id = cur.lastrowid
|
||
db.commit()
|
||
|
||
return {"success": True, "id": cfg_id, "message": "配置文件创建成功"}
|
||
|
||
|
||
@app.get("/api/config-files", summary="查看配置文件列表")
|
||
def list_config_files(
|
||
model: Optional[str] = Query(None, description="按设备型号过滤"),
|
||
status: Optional[str] = Query(None, description="按状态过滤:生效/失效"),
|
||
):
|
||
"""获取配置文件列表,支持按设备型号和状态过滤。"""
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM config_files WHERE 1=1"
|
||
params = []
|
||
if model:
|
||
sql += " AND model = ?"
|
||
params.append(model)
|
||
if status:
|
||
sql += " AND status = ?"
|
||
params.append(status)
|
||
sql += " ORDER BY created_at DESC"
|
||
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = [dict(r) for r in rows]
|
||
|
||
return {"success": True, "data": result, "total": len(result)}
|
||
|
||
|
||
@app.get("/api/config-files/{cfg_id}", summary="查看单个配置文件详情")
|
||
def get_config_file_detail(cfg_id: int):
|
||
"""获取指定 ID 的配置文件详细信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT * FROM config_files WHERE id = ?", (cfg_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="配置文件不存在")
|
||
|
||
return {"success": True, "data": dict(row)}
|
||
|
||
|
||
@app.put("/api/config-files/{cfg_id}", summary="更新配置文件")
|
||
def update_config_file(cfg_id: int, payload: ConfigFileUpdateRequest):
|
||
"""更新配置文件的参数信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM config_files WHERE id = ?", (cfg_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="配置文件不存在")
|
||
|
||
updates = []
|
||
params = []
|
||
fields = [
|
||
("name", payload.name),
|
||
("model", payload.model),
|
||
("version", payload.version),
|
||
("status", payload.status),
|
||
("max_tx_voltage", payload.max_tx_voltage),
|
||
("max_tx_current", payload.max_tx_current),
|
||
("tx_waveform", payload.tx_waveform),
|
||
("tx_pulse_width", payload.tx_pulse_width),
|
||
("acq_channels", payload.acq_channels),
|
||
("acq_sample_rate", payload.acq_sample_rate),
|
||
("acq_voltage_range", payload.acq_voltage_range),
|
||
("full_waveform_capture", payload.full_waveform_capture),
|
||
("ssid_prefix", payload.ssid_prefix),
|
||
]
|
||
for field, value in fields:
|
||
if value is not None:
|
||
updates.append(f"{field} = ?")
|
||
params.append(value)
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str())
|
||
params.append(cfg_id)
|
||
|
||
db.execute(
|
||
f"UPDATE config_files SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "配置文件更新成功"}
|
||
|
||
|
||
@app.delete("/api/config-files/{cfg_id}", summary="删除配置文件")
|
||
def delete_config_file(cfg_id: int):
|
||
"""删除指定配置文件。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM config_files WHERE id = ?", (cfg_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="配置文件不存在")
|
||
|
||
db.execute("DELETE FROM config_files WHERE id = ?", (cfg_id,))
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "配置文件已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 5. 校准文件管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/calibration/upload", summary="上传校准文件")
|
||
async def upload_calibration(
|
||
file: UploadFile = File(..., description="校准 JSON 文件"),
|
||
material_sn: str = Form(..., description="物料 SN(采集板)"),
|
||
operator: Optional[str] = Form(None, description="操作员"),
|
||
remark: Optional[str] = Form(None, description="备注"),
|
||
):
|
||
"""
|
||
Windows 校准软件调用此接口,将采集板校准后的 JSON 文件上传至平台。
|
||
|
||
上传成功后:
|
||
1. 解析 JSON 中的 SN、UID、CalibrateFactor
|
||
2. 计算各通道校准系数,判断整体是否合格
|
||
3. 保存原始文件到服务器
|
||
4. 写入 calibration_files 和 calibration_channels 表
|
||
|
||
校准 JSON 格式示例:
|
||
```json
|
||
{
|
||
"SN": "456",
|
||
"UID": "FFFF589EFF0C50434D583530",
|
||
"CalibrateFactor": [
|
||
{
|
||
"ChannelId": 0,
|
||
"80V": {"factor": 7.709273, "offset": 0.055221},
|
||
"2.5V": {"factor": 1.006537, "offset": 0.010941}
|
||
}
|
||
]
|
||
}
|
||
```
|
||
"""
|
||
# 读取文件内容
|
||
content = await file.read()
|
||
json_text = content.decode("utf-8")
|
||
|
||
# 解析 JSON
|
||
try:
|
||
calib_data = json.loads(json_text)
|
||
except json.JSONDecodeError:
|
||
raise HTTPException(status_code=400, detail="无效的 JSON 格式")
|
||
|
||
sn = calib_data.get("SN", "")
|
||
uid = calib_data.get("UID", "")
|
||
factors = calib_data.get("CalibrateFactor", [])
|
||
|
||
if not isinstance(factors, list):
|
||
raise HTTPException(status_code=400, detail="CalibrateFactor 必须是数组")
|
||
|
||
# 判断整体结果:所有通道 factor 必须大于 0
|
||
all_reasonable = True
|
||
for ch in factors:
|
||
v80 = ch.get("80V", {})
|
||
v25 = ch.get("2.5V", {})
|
||
if v80.get("factor", 0) <= 0 or v25.get("factor", 0) <= 0:
|
||
all_reasonable = False
|
||
break
|
||
|
||
overall_result = "合格" if all_reasonable else "不合格"
|
||
|
||
# 保存文件
|
||
file_path, file_name, file_size, md5 = save_upload_file(file, "calibration")
|
||
|
||
with get_db() as db:
|
||
# 插入校准文件记录
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO calibration_files
|
||
(material_sn, sn, uid, file_name, file_path, file_size, md5, result, operator, remark, channels_count)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(material_sn, sn, uid, file_name, file_path, file_size, md5, overall_result, operator or "", remark or "", len(factors)),
|
||
)
|
||
calib_id = cur.lastrowid
|
||
|
||
# 插入通道数据
|
||
for ch in factors:
|
||
v80 = ch.get("80V", {})
|
||
v25 = ch.get("2.5V", {})
|
||
db.execute(
|
||
"""
|
||
INSERT INTO calibration_channels
|
||
(calibration_id, channel_id, factor_80v, offset_80v, factor_2_5v, offset_2_5v)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
calib_id,
|
||
ch.get("ChannelId", 0),
|
||
v80.get("factor", 0),
|
||
v80.get("offset", 0),
|
||
v25.get("factor", 0),
|
||
v25.get("offset", 0),
|
||
),
|
||
)
|
||
db.commit()
|
||
|
||
return {
|
||
"success": True,
|
||
"id": calib_id,
|
||
"material_sn": material_sn,
|
||
"sn": sn,
|
||
"uid": uid,
|
||
"file_name": file_name,
|
||
"file_size": file_size,
|
||
"md5": md5,
|
||
"result": overall_result,
|
||
"channels_count": len(factors),
|
||
"message": "校准文件上传成功",
|
||
}
|
||
|
||
|
||
@app.get("/api/calibration", summary="查看校准文件列表")
|
||
def list_calibration(
|
||
material_sn: Optional[str] = Query(None, description="按物料 SN 过滤"),
|
||
sn: Optional[str] = Query(None, description="按校准 SN 过滤"),
|
||
uid: Optional[str] = Query(None, description="按 UID 过滤"),
|
||
):
|
||
"""
|
||
获取校准文件列表,支持按物料 SN、校准 SN、UID 过滤。
|
||
可用于 Windows 校准软件查询已上传记录,也可用于 APP 获取校准信息。
|
||
"""
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM calibration_files WHERE 1=1"
|
||
params = []
|
||
if material_sn:
|
||
sql += " AND material_sn = ?"
|
||
params.append(material_sn)
|
||
if sn:
|
||
sql += " AND sn = ?"
|
||
params.append(sn)
|
||
if uid:
|
||
sql += " AND uid = ?"
|
||
params.append(uid)
|
||
sql += " ORDER BY upload_time DESC"
|
||
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = []
|
||
for r in rows:
|
||
item = dict(r)
|
||
# 查询通道数据
|
||
ch_rows = db.execute(
|
||
"SELECT * FROM calibration_channels WHERE calibration_id = ?",
|
||
(item["id"],),
|
||
).fetchall()
|
||
item["channels"] = [dict(ch) for ch in ch_rows]
|
||
result.append(item)
|
||
|
||
return {"success": True, "data": result, "total": len(result)}
|
||
|
||
|
||
@app.get("/api/calibration/{calib_id}", summary="查看单个校准文件详情")
|
||
def get_calibration_detail(calib_id: int):
|
||
"""获取指定 ID 的校准文件详细信息,包含所有通道数据。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT * FROM calibration_files WHERE id = ?", (calib_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="校准文件不存在")
|
||
|
||
result = dict(row)
|
||
ch_rows = db.execute(
|
||
"SELECT * FROM calibration_channels WHERE calibration_id = ?",
|
||
(calib_id,),
|
||
).fetchall()
|
||
result["channels"] = [dict(ch) for ch in ch_rows]
|
||
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
@app.put("/api/calibration/{calib_id}", summary="更新校准文件信息")
|
||
def update_calibration(calib_id: int, payload: CalibrationUpdateRequest):
|
||
"""
|
||
更新校准文件的元数据(操作员、备注、结果等)。
|
||
不支持修改原始 JSON 文件内容,如需修改请重新上传。
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM calibration_files WHERE id = ?", (calib_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="校准文件不存在")
|
||
|
||
updates = []
|
||
params = []
|
||
if payload.operator is not None:
|
||
updates.append("operator = ?")
|
||
params.append(payload.operator)
|
||
if payload.remark is not None:
|
||
updates.append("remark = ?")
|
||
params.append(payload.remark)
|
||
if payload.result is not None:
|
||
updates.append("result = ?")
|
||
params.append(payload.result)
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
params.append(calib_id)
|
||
db.execute(
|
||
f"UPDATE calibration_files SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "校准文件信息更新成功"}
|
||
|
||
|
||
@app.delete("/api/calibration/{calib_id}", summary="删除校准文件")
|
||
def delete_calibration(calib_id: int):
|
||
"""删除指定校准文件及其通道数据,同时删除服务器上的原始文件。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT file_path FROM calibration_files WHERE id = ?", (calib_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="校准文件不存在")
|
||
|
||
file_path = row["file_path"]
|
||
db.execute("DELETE FROM calibration_files WHERE id = ?", (calib_id,))
|
||
db.commit()
|
||
|
||
delete_file_if_exists(file_path)
|
||
return {"success": True, "message": "校准文件已删除"}
|
||
|
||
|
||
@app.get("/api/calibration/download/{calib_id}", summary="下载校准原始文件")
|
||
def download_calibration(calib_id: int):
|
||
"""
|
||
下载校准原始 JSON 文件。
|
||
APP 可通过此接口从平台下载校准文件到本地。
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT file_name, file_path FROM calibration_files WHERE id = ?", (calib_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="校准文件不存在")
|
||
|
||
file_path = row["file_path"]
|
||
if not os.path.exists(file_path):
|
||
raise HTTPException(status_code=404, detail="服务器上文件已丢失")
|
||
|
||
return FileResponse(
|
||
path=file_path,
|
||
filename=row["file_name"],
|
||
media_type="application/json",
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 5. APP 版本管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/apps/upload", summary="上传 APP 安装包并创建版本")
|
||
async def upload_app(
|
||
file: UploadFile = File(..., description="APP 安装包文件(apk/ipa)"),
|
||
app_name: str = Form(..., description="应用名称"),
|
||
package_name: Optional[str] = Form(None, description="包名"),
|
||
platform_type: int = Form(2, description="平台类型:1=iOS, 2=Android"),
|
||
version_name: str = Form(..., description="版本号名称,如 1.2.0"),
|
||
major_version: int = Form(1, ge=0),
|
||
minor_version: int = Form(0, ge=0),
|
||
patch_version: int = Form(0, ge=0),
|
||
os_min_version: Optional[str] = Form(None, description="最低系统版本"),
|
||
is_force_update: bool = Form(False, description="是否强制更新"),
|
||
changelog: Optional[str] = Form(None, description="更新日志,JSON 字符串数组"),
|
||
status: int = Form(1, description="状态:0=草稿, 1=已发布, 2=已下架"),
|
||
):
|
||
"""
|
||
上传 APP 安装包(apk/ipa)并创建版本记录。
|
||
|
||
- 文件保存到 uploads/apps/ 目录
|
||
- 记录写入 app_versions 表
|
||
- 返回下载 URL 和版本信息
|
||
"""
|
||
file_path, file_name, file_size, md5 = save_upload_file(file, "apps")
|
||
file_type = "ipa" if file.filename and file.filename.lower().endswith(".ipa") else "apk"
|
||
primary_url = f"/api/apps/download/{file_name}"
|
||
|
||
with get_db() as db:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO app_versions
|
||
(app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version,
|
||
file_name, file_path, file_size, file_type, distribution_type, primary_url,
|
||
os_min_version, is_force_update, changelog, status, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version,
|
||
file_name, file_path, file_size, file_type, "direct", primary_url,
|
||
os_min_version or "", int(is_force_update), changelog or "", status, now_str(),
|
||
),
|
||
)
|
||
app_id = cur.lastrowid
|
||
db.commit()
|
||
|
||
return {
|
||
"success": True,
|
||
"id": app_id,
|
||
"app_name": app_name,
|
||
"version_name": version_name,
|
||
"file_size": file_size,
|
||
"primary_url": primary_url,
|
||
"message": "APP 上传成功",
|
||
}
|
||
|
||
|
||
@app.post("/api/apps", summary="创建 APP/平台/版本(支持 action 模式)")
|
||
def create_app_record(payload: dict = Body(...)):
|
||
"""
|
||
支持三种操作模式:
|
||
1. action='create_app': 创建应用
|
||
2. action='add_platform': 添加平台
|
||
3. action='add_version': 添加版本
|
||
4. 无action字段: 使用原有的 AppVersionCreateRequest 模式
|
||
"""
|
||
action = payload.get('action')
|
||
|
||
with get_db() as db:
|
||
# 模式1: 创建应用
|
||
if action == 'create_app':
|
||
name = payload.get('name', '')
|
||
package_name = payload.get('package_name', '')
|
||
description = payload.get('description', '')
|
||
|
||
cur = db.execute(
|
||
"INSERT INTO app_versions (app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)",
|
||
(name, package_name, 2, '1.0.0', 1, 0, 0, 1, now_str())
|
||
)
|
||
app_id = cur.lastrowid
|
||
db.commit()
|
||
return {"success": True, "id": app_id}
|
||
|
||
# 模式2: 添加平台(当前实现为占位,因为数据库表结构不支持独立的platforms表)
|
||
elif action == 'add_platform':
|
||
app_id = payload.get('app_id')
|
||
platform_type = payload.get('platform_type', 2)
|
||
description = payload.get('description', '')
|
||
|
||
# 由于当前表结构不支持独立平台,这里返回一个虚拟ID
|
||
# 实际应该创建platforms表
|
||
return {"success": True, "id": platform_type, "message": "平台添加成功(模拟)"}
|
||
|
||
# 模式3: 添加版本
|
||
elif action == 'add_version':
|
||
app_id = payload.get('app_id')
|
||
platform_id = payload.get('platform_id')
|
||
major_version = payload.get('major_version', 0)
|
||
minor_version = payload.get('minor_version', 0)
|
||
patch_version = payload.get('patch_version', 0)
|
||
version_name = payload.get('version_name', f'{major_version}.{minor_version}.{patch_version}')
|
||
file_type = payload.get('file_type', '')
|
||
file_size = payload.get('file_size', 0)
|
||
distribution_type = payload.get('distribution_type', 'direct')
|
||
primary_url = payload.get('primary_url', '')
|
||
os_min_version = payload.get('os_min_version', '')
|
||
is_force_update = int(payload.get('is_force_update', False))
|
||
changelog = json.dumps(payload.get('changelog', []), ensure_ascii=False)
|
||
|
||
# 获取应用名称
|
||
app_row = db.execute("SELECT app_name FROM app_versions WHERE id = ?", (app_id,)).fetchone()
|
||
app_name = app_row['app_name'] if app_row else 'Unknown'
|
||
|
||
cur = db.execute(
|
||
"""INSERT INTO app_versions
|
||
(app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version,
|
||
file_type, file_size, distribution_type, primary_url, os_min_version, is_force_update, changelog, status, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
|
||
(
|
||
app_name, '', platform_id, version_name,
|
||
major_version, minor_version, patch_version,
|
||
file_type, file_size, distribution_type, primary_url,
|
||
os_min_version, is_force_update, changelog, 1, now_str()
|
||
)
|
||
)
|
||
version_id = cur.lastrowid
|
||
db.commit()
|
||
return {"success": True, "id": version_id}
|
||
|
||
# 模式4: 原有的 AppVersionCreateRequest 模式
|
||
else:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO app_versions
|
||
(app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version,
|
||
file_type, distribution_type, os_min_version, is_force_update, changelog, status, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
payload.get('app_name', ''), payload.get('package_name', ''), payload.get('platform_type', 2),
|
||
payload.get('version_name', '1.0.0'), payload.get('major_version', 1), payload.get('minor_version', 0), payload.get('patch_version', 0),
|
||
payload.get('file_type', ''), payload.get('distribution_type', 'direct'), payload.get('os_min_version', ''),
|
||
int(payload.get('is_force_update', False)), json.dumps(payload.get('changelog', []), ensure_ascii=False),
|
||
payload.get('status', 1), now_str(),
|
||
),
|
||
)
|
||
app_id = cur.lastrowid
|
||
db.commit()
|
||
return {"success": True, "id": app_id, "message": "APP 版本记录创建成功"}
|
||
|
||
|
||
@app.get("/api/apps", summary="查看 APP 列表(支持 action 模式)")
|
||
def list_apps(
|
||
app_id: Optional[int] = Query(None, description="应用ID,获取单个应用详情"),
|
||
app_name: Optional[str] = Query(None, description="应用名称"),
|
||
platform_type: Optional[int] = Query(None, description="平台类型:1=iOS, 2=Android"),
|
||
status: Optional[int] = Query(None, description="状态:0=草稿, 1=已发布, 2=已下架"),
|
||
):
|
||
"""
|
||
获取 APP 列表或详情。
|
||
如果提供 app_id,返回单个应用的详细信息(包含 platforms 和 versions)。
|
||
否则返回应用列表。
|
||
"""
|
||
with get_db() as db:
|
||
# 获取单个应用详情
|
||
if app_id:
|
||
# 获取应用基本信息(使用第一条记录作为应用信息)
|
||
app_row = db.execute(
|
||
"SELECT * FROM app_versions WHERE id = ? ORDER BY created_at ASC LIMIT 1", (app_id,)
|
||
).fetchone()
|
||
|
||
if not app_row:
|
||
raise HTTPException(status_code=404, detail="应用不存在")
|
||
|
||
app_dict = dict(app_row)
|
||
|
||
# 构建平台列表(根据platform_type去重)
|
||
platforms_rows = db.execute(
|
||
"SELECT DISTINCT platform_type FROM app_versions WHERE id = ? OR (app_name = ? AND platform_type > 0)",
|
||
(app_id, app_dict['app_name'])
|
||
).fetchall()
|
||
|
||
platforms = []
|
||
for i, p_row in enumerate(platforms_rows):
|
||
p_type = p_row['platform_type']
|
||
platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'}
|
||
platforms.append({
|
||
'id': p_type,
|
||
'app_id': app_id,
|
||
'platform_type': p_type,
|
||
'platform_name': platform_names.get(p_type, f'Platform {p_type}'),
|
||
'description': '',
|
||
'file_types': [],
|
||
'dist_types': []
|
||
})
|
||
|
||
# 如果没有平台数据,创建一个默认平台
|
||
if not platforms:
|
||
platforms.append({
|
||
'id': 2,
|
||
'app_id': app_id,
|
||
'platform_type': 2,
|
||
'platform_name': 'Android',
|
||
'description': '',
|
||
'file_types': [],
|
||
'dist_types': []
|
||
})
|
||
|
||
# 获取版本列表
|
||
versions_rows = db.execute(
|
||
"SELECT * FROM app_versions WHERE (id = ? OR app_name = ?) AND version_name != '1.0.0' ORDER BY created_at DESC",
|
||
(app_id, app_dict['app_name'])
|
||
).fetchall()
|
||
|
||
versions = []
|
||
for v_row in versions_rows:
|
||
v_dict = dict(v_row)
|
||
platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'}
|
||
versions.append({
|
||
'id': v_dict['id'],
|
||
'app_id': app_id,
|
||
'platform_id': v_dict.get('platform_type', 2),
|
||
'major_version': v_dict.get('major_version', 0),
|
||
'minor_version': v_dict.get('minor_version', 0),
|
||
'patch_version': v_dict.get('patch_version', 0),
|
||
'version_name': v_dict.get('version_name', ''),
|
||
'description': '',
|
||
'file_type': v_dict.get('file_type', ''),
|
||
'file_url': v_dict.get('primary_url', ''),
|
||
'file_size': v_dict.get('file_size', 0),
|
||
'distribution_type': v_dict.get('distribution_type', 'direct'),
|
||
'primary_url': v_dict.get('primary_url', ''),
|
||
'min_support_version': '',
|
||
'os_min_version': v_dict.get('os_min_version', ''),
|
||
'status': v_dict.get('status', 1),
|
||
'is_force_update': v_dict.get('is_force_update', 0),
|
||
'release_date': str(v_dict.get('created_at', '')),
|
||
'changelog': json.loads(v_dict.get('changelog', '[]')) if v_dict.get('changelog') else [],
|
||
'platform_name': platform_names.get(v_dict.get('platform_type', 2), 'Unknown')
|
||
})
|
||
|
||
return {
|
||
'id': app_id,
|
||
'name': app_dict.get('app_name', ''),
|
||
'package_name': app_dict.get('package_name', ''),
|
||
'description': '',
|
||
'status': app_dict.get('status', 1),
|
||
'create_time': str(app_dict.get('created_at', '')),
|
||
'platforms': platforms,
|
||
'versions': versions
|
||
}
|
||
|
||
# 获取应用列表
|
||
else:
|
||
sql = "SELECT * FROM app_versions WHERE version_name = '1.0.0'"
|
||
params = []
|
||
if app_name:
|
||
sql += " AND app_name LIKE ?"
|
||
params.append(f"%{app_name}%")
|
||
if platform_type is not None:
|
||
sql += " AND platform_type = ?"
|
||
params.append(platform_type)
|
||
if status is not None:
|
||
sql += " AND status = ?"
|
||
params.append(status)
|
||
sql += " ORDER BY created_at DESC"
|
||
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = []
|
||
for row in rows:
|
||
row_dict = dict(row)
|
||
# 统计该应用的版本数量
|
||
version_count = db.execute(
|
||
"SELECT COUNT(*) as count FROM app_versions WHERE app_name = ? AND version_name != '1.0.0'",
|
||
(row_dict['app_name'],)
|
||
).fetchone()['count']
|
||
|
||
# 获取该应用的平台列表
|
||
platform_rows = db.execute(
|
||
"SELECT DISTINCT platform_type FROM app_versions WHERE app_name = ?",
|
||
(row_dict['app_name'],)
|
||
).fetchall()
|
||
|
||
platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'}
|
||
platforms = []
|
||
for p_row in platform_rows:
|
||
p_type = p_row['platform_type']
|
||
platforms.append({
|
||
'id': p_type,
|
||
'platform_type': p_type,
|
||
'platform_name': platform_names.get(p_type, f'Platform {p_type}')
|
||
})
|
||
|
||
result.append({
|
||
'id': row_dict['id'],
|
||
'name': row_dict.get('app_name', ''),
|
||
'package_name': row_dict.get('package_name', ''),
|
||
'description': '',
|
||
'status': row_dict.get('status', 1),
|
||
'create_time': str(row_dict.get('created_at', '')),
|
||
'platforms': platforms,
|
||
'versionCount': version_count
|
||
})
|
||
|
||
return result
|
||
|
||
|
||
@app.get("/api/apps/{app_id}", summary="查看单个 APP 版本详情")
|
||
def get_app_detail(app_id: int):
|
||
"""获取指定 ID 的 APP 版本详细信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT * FROM app_versions WHERE id = ?", (app_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="APP 版本不存在")
|
||
|
||
return {"success": True, "data": dict(row)}
|
||
|
||
|
||
@app.put("/api/apps/{app_id}", summary="更新 APP 版本信息")
|
||
def update_app(app_id: int, payload: AppVersionUpdateRequest):
|
||
"""更新 APP 版本的元数据信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM app_versions WHERE id = ?", (app_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="APP 版本不存在")
|
||
|
||
updates = []
|
||
params = []
|
||
fields = [
|
||
("app_name", payload.app_name),
|
||
("package_name", payload.package_name),
|
||
("version_name", payload.version_name),
|
||
("platform_type", payload.platform_type),
|
||
("os_min_version", payload.os_min_version),
|
||
("is_force_update", int(payload.is_force_update) if payload.is_force_update is not None else None),
|
||
("status", payload.status),
|
||
]
|
||
for field, value in fields:
|
||
if value is not None:
|
||
updates.append(f"{field} = ?")
|
||
params.append(value)
|
||
|
||
if payload.changelog is not None:
|
||
updates.append("changelog = ?")
|
||
params.append(json.dumps(payload.changelog, ensure_ascii=False))
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str())
|
||
params.append(app_id)
|
||
|
||
db.execute(
|
||
f"UPDATE app_versions SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "APP 版本更新成功"}
|
||
|
||
|
||
@app.put("/api/apps", summary="更新 APP 版本信息(兼容前端 body 传 id)")
|
||
def update_app_compat(payload: dict):
|
||
"""兼容前端通过 body 中的 id 更新 APP 版本。"""
|
||
app_id = payload.get("id")
|
||
if not app_id:
|
||
raise HTTPException(status_code=400, detail="APP 版本 ID 不能为空")
|
||
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM app_versions WHERE id = ?", (app_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="APP 版本不存在")
|
||
|
||
updates = []
|
||
params = []
|
||
fields = [
|
||
("app_name", payload.get("app_name")),
|
||
("package_name", payload.get("package_name")),
|
||
("version_name", payload.get("version_name")),
|
||
("platform_type", payload.get("platform_type")),
|
||
("os_min_version", payload.get("os_min_version")),
|
||
("is_force_update", int(payload["is_force_update"]) if payload.get("is_force_update") is not None else None),
|
||
("status", payload.get("status")),
|
||
]
|
||
for field, value in fields:
|
||
if value is not None:
|
||
updates.append(f"{field} = ?")
|
||
params.append(value)
|
||
|
||
if payload.get("changelog") is not None:
|
||
updates.append("changelog = ?")
|
||
params.append(json.dumps(payload["changelog"], ensure_ascii=False))
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str())
|
||
params.append(app_id)
|
||
|
||
db.execute(
|
||
f"UPDATE app_versions SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "APP 版本更新成功"}
|
||
|
||
|
||
@app.delete("/api/apps", summary="删除 APP 版本(兼容前端 query 传 id)")
|
||
def delete_app_compat(id: int = Query(..., description="APP 版本 ID"), type: str = Query("", description="类型标记,暂不使用")):
|
||
"""兼容前端通过 query 参数删除 APP 版本。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT file_path FROM app_versions WHERE id = ?", (id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="APP 版本不存在")
|
||
|
||
file_path = row["file_path"]
|
||
db.execute("DELETE FROM app_versions WHERE id = ?", (id,))
|
||
db.commit()
|
||
|
||
delete_file_if_exists(file_path)
|
||
return {"success": True, "message": "APP 版本已删除"}
|
||
|
||
|
||
@app.delete("/api/apps/{app_id}", summary="删除 APP 版本")
|
||
def delete_app(app_id: int):
|
||
"""删除指定 APP 版本记录,同时删除服务器上的安装包文件。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT file_path FROM app_versions WHERE id = ?", (app_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="APP 版本不存在")
|
||
|
||
file_path = row["file_path"]
|
||
db.execute("DELETE FROM app_versions WHERE id = ?", (app_id,))
|
||
db.commit()
|
||
|
||
delete_file_if_exists(file_path)
|
||
return {"success": True, "message": "APP 版本已删除"}
|
||
|
||
|
||
@app.get("/api/apps/download/{file_name}", summary="下载 APP 安装包")
|
||
def download_app(file_name: str):
|
||
"""
|
||
下载 APP 安装包文件(apk/ipa)。
|
||
APP 可通过此接口从平台下载更新包。
|
||
"""
|
||
file_path = os.path.join(UPLOAD_DIR, "apps", file_name)
|
||
if not os.path.exists(file_path):
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
media_type = "application/vnd.android.package-archive" if file_name.endswith(".apk") else "application/octet-stream"
|
||
return FileResponse(path=file_path, filename=file_name, media_type=media_type)
|
||
|
||
|
||
@app.post("/api/apps/check-update", response_model=AppCheckUpdateResponse, summary="APP 检查更新")
|
||
def check_app_update(payload: AppCheckUpdateRequest):
|
||
"""
|
||
APP 启动时调用此接口检查是否有新版本。
|
||
|
||
请求中携带当前版本号,后端比较最新已发布版本的版本号,
|
||
返回是否有更新、是否强制更新、下载地址等信息。
|
||
"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"""
|
||
SELECT * FROM app_versions
|
||
WHERE app_name = ? AND platform_type = ? AND status = 1
|
||
ORDER BY major_version DESC, minor_version DESC, patch_version DESC
|
||
LIMIT 1
|
||
""",
|
||
(payload.app_name, payload.platform_type),
|
||
).fetchone()
|
||
|
||
if not row:
|
||
return AppCheckUpdateResponse(
|
||
has_update=False,
|
||
force_update=False,
|
||
message="没有找到该应用的版本记录",
|
||
)
|
||
|
||
latest = dict(row)
|
||
current_parts = payload.current_version.strip("vV").split(".")
|
||
try:
|
||
cur_major = int(current_parts[0]) if len(current_parts) > 0 else 0
|
||
cur_minor = int(current_parts[1]) if len(current_parts) > 1 else 0
|
||
cur_patch = int(current_parts[2]) if len(current_parts) > 2 else 0
|
||
except ValueError:
|
||
cur_major = cur_minor = cur_patch = 0
|
||
|
||
new_major = latest.get("major_version", 0)
|
||
new_minor = latest.get("minor_version", 0)
|
||
new_patch = latest.get("patch_version", 0)
|
||
|
||
has_update = (
|
||
new_major > cur_major
|
||
or (new_major == cur_major and new_minor > cur_minor)
|
||
or (new_major == cur_major and new_minor == cur_minor and new_patch > cur_patch)
|
||
)
|
||
|
||
if not has_update:
|
||
return AppCheckUpdateResponse(
|
||
has_update=False,
|
||
force_update=False,
|
||
message="当前已是最新版本",
|
||
)
|
||
|
||
changelog = []
|
||
if latest.get("changelog"):
|
||
try:
|
||
changelog = json.loads(latest["changelog"])
|
||
except Exception:
|
||
changelog = []
|
||
|
||
download_url = latest.get("primary_url") or f"/api/apps/download/{latest.get('file_name', '')}"
|
||
|
||
return AppCheckUpdateResponse(
|
||
has_update=True,
|
||
force_update=bool(latest.get("is_force_update", 0)),
|
||
version_name=latest.get("version_name"),
|
||
download_url=download_url,
|
||
changelog=changelog,
|
||
file_size=latest.get("file_size"),
|
||
message="发现新版本,请下载更新",
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 6. 固件版本管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/firmware/upload", summary="上传固件文件并创建版本")
|
||
async def upload_firmware(
|
||
file: UploadFile = File(..., description="固件文件(bin/hex/tar.gz 等)"),
|
||
version: str = Form(..., description="固件版本号,如 v3.2.1"),
|
||
firmware_type: str = Form(..., description="固件类型:采集板/发射板/主协板/主机服务"),
|
||
board_model: Optional[str] = Form(None, description="板卡型号,如 GD30-ACQ-01"),
|
||
device_model: Optional[str] = Form(None, description="设备型号,如 GD30-2024"),
|
||
hw_range: Optional[str] = Form(None, description="硬件适用范围"),
|
||
upgrade_type: str = Form("可选", description="升级类型:强制/可选"),
|
||
signed: bool = Form(False, description="是否已签名"),
|
||
notes: Optional[str] = Form(None, description="更新说明,JSON 字符串数组"),
|
||
status: str = Form("已发布", description="状态:草稿/已发布/已下架/兼容"),
|
||
):
|
||
"""
|
||
上传固件文件并创建固件版本记录。
|
||
|
||
支持的固件类型:
|
||
- 采集板(CJB 子目录)
|
||
- 发射板(FSB 子目录)
|
||
- 主协板(XCL 子目录)
|
||
- 主机服务(根目录,如 package_arm_YYYYMMDD.tar.gz)
|
||
|
||
文件保存到 public/uploads/GD/firmware/ 对应子目录。
|
||
"""
|
||
# 根据固件类型确定保存子目录
|
||
folder = FIRMWARE_TYPE_FOLDER.get(firmware_type, "")
|
||
target_dir = os.path.join(FIRMWARE_PUBLIC_DIR, folder)
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
|
||
original_name = file.filename or "unknown"
|
||
timestamp = int(datetime.now().timestamp())
|
||
safe_name = f"{timestamp}_{original_name}"
|
||
file_path = os.path.join(target_dir, safe_name)
|
||
|
||
file_size = 0
|
||
md5_hash = hashlib.md5()
|
||
with open(file_path, "wb") as f:
|
||
while chunk := file.file.read(8192):
|
||
f.write(chunk)
|
||
file_size += len(chunk)
|
||
md5_hash.update(chunk)
|
||
md5 = md5_hash.hexdigest()
|
||
|
||
# 相对路径用于下载
|
||
rel_folder = folder + "/" if folder else ""
|
||
primary_url = f"/uploads/GD/firmware/{rel_folder}{safe_name}"
|
||
|
||
with get_db() as db:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO firmware_versions
|
||
(version, firmware_type, board_model, device_model, file_name, file_path, file_size,
|
||
hw_range, upgrade_type, signed, notes, status, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
version, firmware_type, board_model or "", device_model or "",
|
||
safe_name, file_path, file_size,
|
||
hw_range or "", upgrade_type, int(signed), notes or "", status, now_str(),
|
||
),
|
||
)
|
||
fw_id = cur.lastrowid
|
||
db.commit()
|
||
|
||
return {
|
||
"success": True,
|
||
"id": fw_id,
|
||
"version": version,
|
||
"firmware_type": firmware_type,
|
||
"file_size": file_size,
|
||
"primary_url": primary_url,
|
||
"message": "固件上传成功",
|
||
}
|
||
|
||
|
||
@app.post("/api/firmware", summary="创建固件版本记录(不上传文件)")
|
||
def create_firmware_record(payload: FirmwareCreateRequest):
|
||
"""
|
||
仅创建固件版本元数据记录,不实际上传文件。
|
||
适用于外部托管或稍后上传文件的场景。
|
||
"""
|
||
with get_db() as db:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO firmware_versions
|
||
(version, firmware_type, board_model, device_model, hw_range, upgrade_type, signed, notes, status, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
payload.version, payload.firmware_type, payload.board_model or "",
|
||
payload.device_model or "", payload.hw_range or "", payload.upgrade_type,
|
||
int(payload.signed), json.dumps(payload.notes or [], ensure_ascii=False),
|
||
payload.status, now_str(),
|
||
),
|
||
)
|
||
fw_id = cur.lastrowid
|
||
db.commit()
|
||
|
||
return {"success": True, "id": fw_id, "message": "固件版本记录创建成功"}
|
||
|
||
|
||
@app.get("/api/firmware", summary="查看固件版本列表")
|
||
def list_firmware(
|
||
firmware_type: Optional[str] = Query(None, description="固件类型过滤"),
|
||
device_model: Optional[str] = Query(None, description="设备型号过滤"),
|
||
board_model: Optional[str] = Query(None, description="板卡型号过滤"),
|
||
status: Optional[str] = Query(None, description="状态过滤"),
|
||
discover: bool = Query(False, description="是否同时扫描实际目录发现未入库固件"),
|
||
):
|
||
"""获取固件版本列表,支持按固件类型、设备型号、板卡型号、状态过滤。
|
||
当 discover=true 时,会扫描 public/uploads/GD/firmware/ 目录,
|
||
将实际存在但数据库中无记录的文件也返回(id 为 0 表示仅存在于目录)。
|
||
"""
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM firmware_versions WHERE 1=1"
|
||
params = []
|
||
if firmware_type:
|
||
sql += " AND firmware_type = ?"
|
||
params.append(firmware_type)
|
||
if device_model:
|
||
sql += " AND device_model = ?"
|
||
params.append(device_model)
|
||
if board_model:
|
||
sql += " AND board_model = ?"
|
||
params.append(board_model)
|
||
if status:
|
||
sql += " AND status = ?"
|
||
params.append(status)
|
||
sql += " ORDER BY created_at DESC"
|
||
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = []
|
||
for r in rows:
|
||
row = dict(r)
|
||
if row.get("notes"):
|
||
try:
|
||
row["notes"] = json.loads(row["notes"])
|
||
except Exception:
|
||
row["notes"] = []
|
||
else:
|
||
row["notes"] = []
|
||
result.append(row)
|
||
|
||
# 如果开启发现模式,扫描目录并合并
|
||
if discover:
|
||
scanned = scan_firmware_directory()
|
||
db_file_names = {r.get("file_name", "") for r in result}
|
||
for item in scanned:
|
||
if item["file_name"] not in db_file_names:
|
||
if firmware_type and item["type"] != firmware_type:
|
||
continue
|
||
result.append({
|
||
"id": 0,
|
||
"version": item["version"],
|
||
"firmware_type": item["type"],
|
||
"board_model": "",
|
||
"device_model": "",
|
||
"file_name": item["file_name"],
|
||
"file_path": item["file_path"],
|
||
"file_size": item["file_size"],
|
||
"hw_range": "",
|
||
"upgrade_type": "可选",
|
||
"signed": 0,
|
||
"notes": [],
|
||
"status": "已发布",
|
||
"created_at": None,
|
||
"updated_at": None,
|
||
})
|
||
|
||
return {"success": True, "data": result, "total": len(result)}
|
||
|
||
|
||
@app.get("/api/firmware/{fw_id}", summary="查看单个固件版本详情")
|
||
def get_firmware_detail(fw_id: int):
|
||
"""获取指定 ID 的固件版本详细信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT * FROM firmware_versions WHERE id = ?", (fw_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="固件版本不存在")
|
||
|
||
data = dict(row)
|
||
if data.get("notes"):
|
||
try:
|
||
data["notes"] = json.loads(data["notes"])
|
||
except Exception:
|
||
data["notes"] = []
|
||
else:
|
||
data["notes"] = []
|
||
return {"success": True, "data": data}
|
||
|
||
|
||
@app.put("/api/firmware/{fw_id}", summary="更新固件版本信息")
|
||
def update_firmware(fw_id: int, payload: FirmwareUpdateRequest):
|
||
"""更新固件版本的元数据信息。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT id FROM firmware_versions WHERE id = ?", (fw_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="固件版本不存在")
|
||
|
||
updates = []
|
||
params = []
|
||
fields = [
|
||
("version", payload.version),
|
||
("firmware_type", payload.firmware_type),
|
||
("board_model", payload.board_model),
|
||
("device_model", payload.device_model),
|
||
("hw_range", payload.hw_range),
|
||
("upgrade_type", payload.upgrade_type),
|
||
("signed", int(payload.signed) if payload.signed is not None else None),
|
||
("status", payload.status),
|
||
]
|
||
for field, value in fields:
|
||
if value is not None:
|
||
updates.append(f"{field} = ?")
|
||
params.append(value)
|
||
|
||
if payload.notes is not None:
|
||
updates.append("notes = ?")
|
||
params.append(json.dumps(payload.notes, ensure_ascii=False))
|
||
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str())
|
||
params.append(fw_id)
|
||
|
||
db.execute(
|
||
f"UPDATE firmware_versions SET {', '.join(updates)} WHERE id = ?",
|
||
params,
|
||
)
|
||
db.commit()
|
||
|
||
return {"success": True, "message": "固件版本更新成功"}
|
||
|
||
|
||
@app.delete("/api/firmware/{fw_id}", summary="删除固件版本")
|
||
def delete_firmware(fw_id: int):
|
||
"""删除指定固件版本记录,同时删除服务器上的固件文件。"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT file_path FROM firmware_versions WHERE id = ?", (fw_id,)
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="固件版本不存在")
|
||
|
||
file_path = row["file_path"]
|
||
db.execute("DELETE FROM firmware_versions WHERE id = ?", (fw_id,))
|
||
db.commit()
|
||
|
||
delete_file_if_exists(file_path)
|
||
return {"success": True, "message": "固件版本已删除"}
|
||
|
||
|
||
@app.get("/api/firmware/download/{file_name}", summary="下载固件文件")
|
||
def download_firmware(file_name: str):
|
||
"""
|
||
下载固件文件。
|
||
APP 或设备可通过此接口从平台下载固件进行升级。
|
||
优先从 public/uploads/GD/firmware/ 各子目录查找,回退到旧 uploads/firmware/。
|
||
"""
|
||
# 优先在新目录结构查找
|
||
for folder in ["", "CJB", "FSB", "XCL"]:
|
||
file_path = os.path.join(FIRMWARE_PUBLIC_DIR, folder, file_name)
|
||
if os.path.exists(file_path):
|
||
return FileResponse(
|
||
path=file_path,
|
||
filename=file_name,
|
||
media_type="application/octet-stream",
|
||
)
|
||
|
||
# 回退到旧路径
|
||
file_path = os.path.join(UPLOAD_DIR, "firmware", file_name)
|
||
if os.path.exists(file_path):
|
||
return FileResponse(
|
||
path=file_path,
|
||
filename=file_name,
|
||
media_type="application/octet-stream",
|
||
)
|
||
|
||
raise HTTPException(status_code=404, detail="文件不存在")
|
||
|
||
|
||
@app.get("/api/firmware/scan", summary="扫描固件目录")
|
||
def scan_firmware():
|
||
"""扫描 public/uploads/GD/firmware/ 目录,返回实际存在的固件文件列表。"""
|
||
return {"success": True, "data": scan_firmware_directory()}
|
||
|
||
|
||
@app.post("/api/firmware/check-update", response_model=FirmwareCheckUpdateResponse, summary="检查固件更新")
|
||
def check_firmware_update(payload: FirmwareCheckUpdateRequest):
|
||
"""
|
||
设备调用此接口检查固件是否有新版本。
|
||
|
||
根据固件类型、设备型号、板卡型号匹配最新已发布的固件版本,
|
||
返回是否有更新、是否强制升级、下载地址等信息。
|
||
"""
|
||
with get_db() as db:
|
||
sql = """
|
||
SELECT * FROM firmware_versions
|
||
WHERE firmware_type = ? AND status = '已发布'
|
||
"""
|
||
params = [payload.firmware_type]
|
||
if payload.device_model:
|
||
sql += " AND (device_model = ? OR device_model = '' OR device_model IS NULL)"
|
||
params.append(payload.device_model)
|
||
if payload.board_model:
|
||
sql += " AND (board_model = ? OR board_model = '' OR board_model IS NULL)"
|
||
params.append(payload.board_model)
|
||
sql += " ORDER BY created_at DESC LIMIT 1"
|
||
|
||
row = db.execute(sql, params).fetchone()
|
||
|
||
if not row:
|
||
return FirmwareCheckUpdateResponse(
|
||
has_update=False,
|
||
force_update=False,
|
||
message="没有找到匹配的固件版本",
|
||
)
|
||
|
||
latest = dict(row)
|
||
current_version = payload.current_version.strip("vV")
|
||
latest_version = latest.get("version", "").strip("vV")
|
||
|
||
# 简单版本号比较(假设版本格式为 x.y.z)
|
||
def parse_version(v: str):
|
||
parts = v.split(".")
|
||
return [int(p) if p.isdigit() else 0 for p in parts] + [0, 0, 0]
|
||
|
||
has_update = parse_version(latest_version) > parse_version(current_version)
|
||
|
||
if not has_update:
|
||
return FirmwareCheckUpdateResponse(
|
||
has_update=False,
|
||
force_update=False,
|
||
message="当前已是最新固件版本",
|
||
)
|
||
|
||
notes = []
|
||
if latest.get("notes"):
|
||
try:
|
||
notes = json.loads(latest["notes"])
|
||
except Exception:
|
||
notes = []
|
||
|
||
download_url = f"/api/firmware/download/{latest.get('file_name', '')}"
|
||
|
||
return FirmwareCheckUpdateResponse(
|
||
has_update=True,
|
||
force_update=latest.get("upgrade_type") == "强制",
|
||
version=latest.get("version"),
|
||
download_url=download_url,
|
||
notes=notes,
|
||
file_size=latest.get("file_size"),
|
||
message="发现新固件版本,请下载升级",
|
||
)
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 辅助接口(方便调试和管理)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
class ReferenceValueCreate(BaseModel):
|
||
category: str
|
||
code: str
|
||
label: str
|
||
sort_order: int = 0
|
||
status: str = "启用"
|
||
description: Optional[str] = ""
|
||
extra: Optional[dict] = None
|
||
|
||
|
||
@app.get("/api/reference-values", summary="获取字典值列表")
|
||
def list_reference_values(
|
||
category: Optional[str] = Query(None, description="字典分类过滤"),
|
||
status: str = Query("启用", description="状态过滤"),
|
||
):
|
||
"""获取通用字典值列表,支持按分类和状态过滤。"""
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM reference_values WHERE status = ?"
|
||
params = [status]
|
||
if category:
|
||
sql += " AND category = ?"
|
||
params.append(category)
|
||
sql += " ORDER BY category, sort_order, id"
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = []
|
||
for r in rows:
|
||
row = dict(r)
|
||
try:
|
||
row["extra"] = json.loads(row.get("extra", "{}")) if row.get("extra") else {}
|
||
except Exception:
|
||
row["extra"] = {}
|
||
result.append(row)
|
||
return {"success": True, "data": result, "total": len(result)}
|
||
|
||
|
||
@app.post("/api/reference-values", summary="创建字典值")
|
||
def create_reference_value(payload: ReferenceValueCreate):
|
||
"""创建新的字典值。"""
|
||
with get_db() as db:
|
||
try:
|
||
cur = db.execute(
|
||
"""
|
||
INSERT INTO reference_values (category, code, label, sort_order, status, description, extra)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
payload.category, payload.code, payload.label, payload.sort_order,
|
||
payload.status, payload.description or "", json.dumps(payload.extra or {}, ensure_ascii=False),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "id": cur.lastrowid, "message": "字典值创建成功"}
|
||
except Exception:
|
||
raise HTTPException(status_code=400, detail="字典值已存在(category + code 重复)")
|
||
|
||
|
||
@app.put("/api/reference-values/{ref_id}", summary="更新字典值")
|
||
def update_reference_value(ref_id: int, payload: ReferenceValueCreate):
|
||
"""更新字典值。"""
|
||
with get_db() as db:
|
||
row = db.execute("SELECT id FROM reference_values WHERE id = ?", (ref_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="字典值不存在")
|
||
db.execute(
|
||
"""
|
||
UPDATE reference_values
|
||
SET category = ?, code = ?, label = ?, sort_order = ?, status = ?, description = ?, extra = ?
|
||
WHERE id = ?
|
||
""",
|
||
(
|
||
payload.category, payload.code, payload.label, payload.sort_order,
|
||
payload.status, payload.description or "", json.dumps(payload.extra or {}, ensure_ascii=False), ref_id,
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "字典值更新成功"}
|
||
|
||
|
||
@app.delete("/api/reference-values/{ref_id}", summary="删除字典值")
|
||
def delete_reference_value(ref_id: int):
|
||
"""删除字典值。"""
|
||
with get_db() as db:
|
||
row = db.execute("SELECT id FROM reference_values WHERE id = ?", (ref_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="字典值不存在")
|
||
db.execute("DELETE FROM reference_values WHERE id = ?", (ref_id,))
|
||
db.commit()
|
||
return {"success": True, "message": "字典值已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 授权管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/auth-items", summary="获取授权项定义列表")
|
||
def list_auth_items():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM auth_items WHERE status = '启用' ORDER BY sort_order, id").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.get("/api/license-templates", summary="获取授权模板列表")
|
||
def list_license_templates(model_code: str = Query("", description="按型号筛选")):
|
||
with get_db() as db:
|
||
if model_code:
|
||
rows = db.execute("SELECT * FROM license_templates WHERE model_code = ? AND status = '启用' ORDER BY id", (model_code,)).fetchall()
|
||
else:
|
||
rows = db.execute("SELECT * FROM license_templates WHERE status = '启用' ORDER BY id").fetchall()
|
||
result = []
|
||
for r in rows:
|
||
item = dict(r)
|
||
try:
|
||
item["auth_items"] = json.loads(item.get("auth_items", "[]") or "[]")
|
||
except Exception:
|
||
item["auth_items"] = []
|
||
result.append(item)
|
||
return result
|
||
|
||
|
||
@app.get("/api/licenses", summary="获取授权记录列表")
|
||
def list_licenses():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM licenses ORDER BY id").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/licenses", summary="创建授权记录")
|
||
def create_license(body: dict = Body(...)):
|
||
model = body.get("model", "")
|
||
modules = body.get("modules", "")
|
||
expiry = body.get("expiry", "")
|
||
status = body.get("status", "生效")
|
||
config_id = body.get("config_id")
|
||
device_sn = body.get("device_sn", "")
|
||
license_file = body.get("license_file", "")
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
INSERT INTO licenses (model, modules, expiry, status, config_id, license_file, device_sn, created_at, updated_at)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(model, modules, expiry, status, config_id, license_file, device_sn, now_str(), now_str()),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "授权记录创建成功"}
|
||
|
||
|
||
@app.put("/api/licenses", summary="更新授权记录")
|
||
def update_license(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
updates = []
|
||
params = []
|
||
for field in ["model", "modules", "expiry", "status", "config_id", "license_file", "device_sn"]:
|
||
if field in body:
|
||
updates.append(f"{field} = ?")
|
||
params.append(body[field])
|
||
if not updates:
|
||
return {"success": True, "message": "没有要更新的字段"}
|
||
updates.append("updated_at = ?")
|
||
params.append(now_str())
|
||
params.append(id_)
|
||
with get_db() as db:
|
||
db.execute(f"UPDATE licenses SET {', '.join(updates)} WHERE id = ?", params)
|
||
db.commit()
|
||
return {"success": True, "message": "授权记录更新成功"}
|
||
|
||
|
||
@app.delete("/api/licenses", summary="删除授权记录")
|
||
def delete_license(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM licenses WHERE id = ?", (id_,))
|
||
db.commit()
|
||
return {"success": True, "message": "授权记录已删除"}
|
||
|
||
|
||
@app.post("/api/licenses/verify", summary="验证授权文件")
|
||
def verify_license(sn: str, encrypted_license: str):
|
||
"""
|
||
调试验证:传入加密授权文件和设备 SN,验证能否正确解密。
|
||
用于测试一机一密绑定是否生效。
|
||
"""
|
||
decrypted = decrypt_license(encrypted_license, sn)
|
||
if decrypted:
|
||
return {
|
||
"valid": True,
|
||
"decrypted": decrypted,
|
||
"message": "授权文件解密成功,SN 匹配"
|
||
}
|
||
return {
|
||
"valid": False,
|
||
"decrypted": None,
|
||
"message": "授权文件无效或 SN 不匹配"
|
||
}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 7. 仪表盘接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/dashboard", summary="获取首页统计数据")
|
||
def get_dashboard():
|
||
"""获取首页各类统计数据"""
|
||
with get_db() as db:
|
||
def q(sql, params=()):
|
||
return db.execute(sql, params).fetchone()[0]
|
||
|
||
devices = {
|
||
"total": q("SELECT COUNT(*) FROM devices"),
|
||
"assembling": q("SELECT COUNT(*) FROM devices WHERE status = '装配中'"),
|
||
"activated": q("SELECT COUNT(*) FROM devices WHERE status = '已激活'"),
|
||
"shipped": q("SELECT COUNT(*) FROM devices WHERE status = '已出厂'"),
|
||
}
|
||
materials = {
|
||
"total": q("SELECT COUNT(*) FROM materials"),
|
||
"inStock": q("SELECT COUNT(*) FROM materials WHERE status = '在库'"),
|
||
"assembled": q("SELECT COUNT(*) FROM materials WHERE status = '已装配'"),
|
||
"faulty": q("SELECT COUNT(*) FROM materials WHERE status = '故障'"),
|
||
}
|
||
repair = {
|
||
"total": q("SELECT COUNT(*) FROM repair_orders"),
|
||
"pending": q("SELECT COUNT(*) FROM repair_orders WHERE status = '待处理'"),
|
||
"processing": q("SELECT COUNT(*) FROM repair_orders WHERE status = '处理中'"),
|
||
"done": q("SELECT COUNT(*) FROM repair_orders WHERE status = '已处理'"),
|
||
}
|
||
scrap = {
|
||
"total": q("SELECT COUNT(*) FROM scrap_records"),
|
||
"pendingApproval": q("SELECT COUNT(*) FROM scrap_records WHERE status = '待审批'"),
|
||
}
|
||
firmware = {"total": q("SELECT COUNT(*) FROM firmware_versions")}
|
||
licenses = {"total": q("SELECT COUNT(*) FROM licenses")}
|
||
|
||
recentRepairs = [
|
||
dict(r) for r in db.execute(
|
||
"SELECT id, sn, fault_type, status, priority, create_date FROM repair_orders WHERE status != '已处理' ORDER BY create_date DESC LIMIT 4"
|
||
).fetchall()
|
||
]
|
||
recentScraps = [
|
||
dict(r) for r in db.execute(
|
||
"SELECT id, sn, model, status, date FROM scrap_records WHERE status = '待审批' OR status = '审批中' ORDER BY date DESC LIMIT 4"
|
||
).fetchall()
|
||
]
|
||
|
||
return {
|
||
"devices": devices,
|
||
"materials": materials,
|
||
"repair": repair,
|
||
"scrap": scrap,
|
||
"firmware": firmware,
|
||
"licenses": licenses,
|
||
"recentRepairs": recentRepairs,
|
||
"recentScraps": recentScraps,
|
||
}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 8. 设备详情接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/devices/detail", summary="获取设备详细信息")
|
||
def get_device_detail(sn: str = Query(..., description="设备SN")):
|
||
"""获取设备详细信息(含BOM、日志、授权、配置、Checklist)"""
|
||
with get_db() as db:
|
||
device = db.execute("SELECT * FROM devices WHERE sn = ?", (sn,)).fetchone()
|
||
if not device:
|
||
raise HTTPException(status_code=404, detail="设备不存在")
|
||
|
||
bom = [dict(r) for r in db.execute(
|
||
"SELECT * FROM device_bom_records WHERE device_sn = ?", (sn,)
|
||
).fetchall()]
|
||
logs = [dict(r) for r in db.execute(
|
||
"SELECT * FROM device_logs WHERE device_sn = ? ORDER BY date DESC", (sn,)
|
||
).fetchall()]
|
||
license_row = db.execute(
|
||
"SELECT * FROM licenses WHERE device_sn = ? OR model = ? ORDER BY id DESC LIMIT 1",
|
||
(sn, dict(device).get("model", ""))
|
||
).fetchone()
|
||
config_row = db.execute(
|
||
"SELECT * FROM config_files WHERE model = ? AND status = '生效' ORDER BY id DESC LIMIT 1",
|
||
(dict(device).get("model", ""),)
|
||
).fetchone()
|
||
|
||
# 获取该型号的 checklist 模板,并与设备实际检查记录合并
|
||
device_model = dict(device).get("model", "")
|
||
templates = db.execute(
|
||
"SELECT name, required FROM checklist_templates WHERE model_code = ? ORDER BY sort_order ASC",
|
||
(device_model,)
|
||
).fetchall()
|
||
records = db.execute(
|
||
"SELECT checklist_name, passed, photos, note FROM device_checklist_records WHERE device_sn = ?",
|
||
(sn,)
|
||
).fetchall()
|
||
record_map = {r["checklist_name"]: dict(r) for r in records}
|
||
checklist = []
|
||
for t in templates:
|
||
name = t["name"]
|
||
rec = record_map.get(name, {})
|
||
photos = rec.get("photos", "[]")
|
||
try:
|
||
photos = json.loads(photos) if photos else []
|
||
except Exception:
|
||
photos = []
|
||
checklist.append({
|
||
"name": name,
|
||
"required": bool(t["required"]),
|
||
"passed": bool(rec.get("passed", 0)),
|
||
"photos": photos,
|
||
"note": rec.get("note", ""),
|
||
})
|
||
|
||
return {
|
||
"device": dict(device),
|
||
"bom": bom,
|
||
"logs": logs,
|
||
"license": dict(license_row) if license_row else None,
|
||
"config": dict(config_row) if config_row else None,
|
||
"checklist": checklist,
|
||
}
|
||
|
||
|
||
@app.post("/api/devices/detail", summary="保存设备BOM记录和操作日志")
|
||
def save_device_detail(body: dict = Body(...)):
|
||
# 实际实现需要解析body
|
||
return {"success": True, "message": "设备详情保存成功"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 9. 设备型号接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/models", summary="获取所有设备型号")
|
||
def list_models():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM device_models ORDER BY id DESC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/models", summary="创建设备型号")
|
||
def create_model(body: dict = Body(...)):
|
||
name = body.get("name", "")
|
||
code = body.get("code", "")
|
||
status = body.get("status", "在产")
|
||
description = body.get("description", "")
|
||
with get_db() as db:
|
||
db.execute(
|
||
"INSERT INTO device_models (name, code, status, description, create_date) VALUES (?, ?, ?, ?, ?)",
|
||
(name, code, status, description, now_date_str()),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "设备型号创建成功"}
|
||
|
||
|
||
@app.put("/api/models", summary="更新设备型号状态")
|
||
def update_model(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
status = body.get("status")
|
||
with get_db() as db:
|
||
db.execute("UPDATE device_models SET status = ? WHERE id = ?", (status, id_))
|
||
db.commit()
|
||
return {"success": True, "message": "设备型号更新成功"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 10. BOM模板接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/models/bom", summary="获取BOM模板")
|
||
def list_bom(model: Optional[str] = Query(None, description="设备型号编码")):
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM bom_templates WHERE 1=1"
|
||
params = []
|
||
if model:
|
||
sql += " AND model_code = ?"
|
||
params.append(model)
|
||
rows = db.execute(sql, params).fetchall()
|
||
result = []
|
||
for r in rows:
|
||
item = dict(r)
|
||
try:
|
||
item["versions"] = json.loads(item.get("versions", "[]"))
|
||
except Exception:
|
||
item["versions"] = []
|
||
result.append(item)
|
||
return {"success": True, "data": result}
|
||
|
||
|
||
@app.post("/api/models/bom", summary="添加BOM模板项")
|
||
def create_bom(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
INSERT INTO bom_templates
|
||
(model_code, name, material_name, model, versions, qty, required, need_calibration, enforce_version_match)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
body.get("model_code", ""), body.get("name", ""), body.get("material_name", ""),
|
||
body.get("model", ""), json.dumps(body.get("versions", [])), body.get("qty", 1),
|
||
1 if body.get("required") else 0, 1 if body.get("need_calibration") else 0,
|
||
1 if body.get("enforce_version_match") else 0,
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "BOM模板项添加成功"}
|
||
|
||
|
||
@app.delete("/api/models/bom", summary="删除BOM模板项")
|
||
def delete_bom(id: int = Query(..., description="BOM项ID")):
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM bom_templates WHERE id = ?", (id,))
|
||
db.commit()
|
||
return {"success": True, "message": "BOM模板项已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 11. Checklist模板接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/models/checklist", summary="获取装配Checklist")
|
||
def list_checklist(model: Optional[str] = Query(None, description="设备型号编码")):
|
||
with get_db() as db:
|
||
sql = "SELECT * FROM checklist_templates WHERE 1=1"
|
||
params = []
|
||
if model:
|
||
sql += " AND model_code = ?"
|
||
params.append(model)
|
||
sql += " ORDER BY sort_order ASC"
|
||
rows = db.execute(sql, params).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/models/checklist", summary="批量保存Checklist模板")
|
||
def save_checklist(body: dict = Body(...)):
|
||
model_code = body.get("model_code", "")
|
||
items = body.get("items", [])
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM checklist_templates WHERE model_code = ?", (model_code,))
|
||
for idx, item in enumerate(items):
|
||
db.execute(
|
||
"INSERT INTO checklist_templates (model_code, name, required, standard, sort_order) VALUES (?, ?, ?, ?, ?)",
|
||
(model_code, item.get("name", ""), 1 if item.get("required") else 0, item.get("standard", ""), idx),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "Checklist模板保存成功"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 12. 物料管理接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/materials", summary="获取物料列表")
|
||
def list_materials():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM materials ORDER BY id DESC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/materials", summary="登记物料")
|
||
def create_material(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
INSERT INTO materials
|
||
(sn, name, category, type, device_model, version, description, firmware, status, device_sn, production_date, calib_status, calib_date)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
body.get("sn", ""), body.get("name", ""), body.get("category", ""), body.get("type", ""),
|
||
body.get("device_model", ""), body.get("version", ""), body.get("description", ""),
|
||
body.get("firmware", "-"), body.get("status", "在库"), body.get("device_sn", "-"),
|
||
body.get("production_date", ""), body.get("calib_status", "-"), body.get("calib_date", "-"),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "物料登记成功"}
|
||
|
||
|
||
@app.put("/api/materials", summary="更新物料信息")
|
||
def update_material(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
UPDATE materials SET sn=?, name=?, category=?, type=?, device_model=?, version=?,
|
||
description=?, firmware=?, status=?, device_sn=?, calib_status=?, calib_date=? WHERE id=?
|
||
""",
|
||
(
|
||
body.get("sn", ""), body.get("name", ""), body.get("category", ""), body.get("type", ""),
|
||
body.get("device_model", ""), body.get("version", ""), body.get("description", ""),
|
||
body.get("firmware", "-"), body.get("status", "在库"), body.get("device_sn", "-"),
|
||
body.get("calib_status", "-"), body.get("calib_date", "-"), id_,
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "物料更新成功"}
|
||
|
||
|
||
@app.delete("/api/materials", summary="删除物料")
|
||
def delete_material(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM materials WHERE id = ?", (id_,))
|
||
db.commit()
|
||
return {"success": True, "message": "物料已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 13. 物料分类接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/material-categories", summary="获取物料分类列表")
|
||
def list_material_categories():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM material_categories ORDER BY sort_order ASC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/material-categories", summary="创建物料分类")
|
||
def create_material_category(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"INSERT INTO material_categories (name, description, has_firmware, has_calibration, sort_order, status) VALUES (?, ?, ?, ?, ?, ?)",
|
||
(
|
||
body.get("name", ""), body.get("description", ""), 1 if body.get("has_firmware") else 0,
|
||
1 if body.get("has_calibration") else 0, body.get("sort_order", 0), body.get("status", "启用"),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "物料分类创建成功"}
|
||
|
||
|
||
@app.put("/api/material-categories", summary="更新物料分类")
|
||
def update_material_category(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
with get_db() as db:
|
||
db.execute(
|
||
"UPDATE material_categories SET name=?, description=?, has_firmware=?, has_calibration=?, sort_order=?, status=? WHERE id=?",
|
||
(
|
||
body.get("name", ""), body.get("description", ""), 1 if body.get("has_firmware") else 0,
|
||
1 if body.get("has_calibration") else 0, body.get("sort_order", 0), body.get("status", "启用"), id_,
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "物料分类更新成功"}
|
||
|
||
|
||
@app.delete("/api/material-categories", summary="删除物料分类")
|
||
def delete_material_category(id: int = Query(..., description="分类ID")):
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM material_categories WHERE id = ?", (id,))
|
||
db.commit()
|
||
return {"success": True, "message": "物料分类已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 14. 板卡类型接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/board-types", summary="获取板卡类型列表")
|
||
def list_board_types():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM board_types ORDER BY id DESC").fetchall()
|
||
result = []
|
||
for r in rows:
|
||
item = dict(r)
|
||
try:
|
||
item["deviceModels"] = json.loads(item.get("device_models", "[]"))
|
||
except Exception:
|
||
item["deviceModels"] = []
|
||
result.append(item)
|
||
return result
|
||
|
||
|
||
@app.post("/api/board-types", summary="创建板卡类型")
|
||
def create_board_type(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"INSERT INTO board_types (name, category, device_models, description, status) VALUES (?, ?, ?, ?, ?)",
|
||
(
|
||
body.get("name", ""), body.get("category", ""), json.dumps(body.get("deviceModels", [])),
|
||
body.get("description", ""), body.get("status", "启用"),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "板卡类型创建成功"}
|
||
|
||
|
||
@app.put("/api/board-types", summary="更新板卡类型")
|
||
def update_board_type(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
with get_db() as db:
|
||
db.execute(
|
||
"UPDATE board_types SET name=?, category=?, device_models=?, description=?, status=? WHERE id=?",
|
||
(
|
||
body.get("name", ""), body.get("category", ""), json.dumps(body.get("deviceModels", [])),
|
||
body.get("description", ""), body.get("status", "启用"), id_,
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "板卡类型更新成功"}
|
||
|
||
|
||
@app.delete("/api/board-types", summary="删除板卡类型")
|
||
def delete_board_type(id: int = Query(..., description="板卡类型ID")):
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM board_types WHERE id = ?", (id,))
|
||
db.commit()
|
||
return {"success": True, "message": "板卡类型已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 15. 板卡版本接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/boards", summary="获取板卡版本列表")
|
||
def list_boards():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM board_versions ORDER BY id DESC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/boards", summary="创建板卡版本")
|
||
def create_board(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"INSERT INTO board_versions (type, version, status) VALUES (?, ?, ?)",
|
||
(body.get("type", ""), body.get("version", ""), body.get("status", "在产")),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "板卡版本创建成功"}
|
||
|
||
|
||
@app.put("/api/boards", summary="更新板卡版本状态")
|
||
def update_board(body: dict = Body(...)):
|
||
id_ = body.get("id")
|
||
status = body.get("status")
|
||
with get_db() as db:
|
||
db.execute("UPDATE board_versions SET status = ? WHERE id = ?", (status, id_))
|
||
db.commit()
|
||
return {"success": True, "message": "板卡版本更新成功"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 16. 维修工单接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/repair", summary="获取维修工单列表")
|
||
def list_repair():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM repair_orders ORDER BY create_date DESC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/repair", summary="创建维修工单")
|
||
def create_repair(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
INSERT INTO repair_orders (id, sn, fault_type, status, priority, assignee, create_date, description)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
body.get("id", ""), body.get("sn", ""), body.get("fault_type", ""),
|
||
body.get("status", "待处理"), body.get("priority", "中"),
|
||
body.get("assignee", ""), body.get("create_date", now_date_str()), body.get("description", ""),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "维修工单创建成功"}
|
||
|
||
|
||
@app.get("/api/repair/{order_id}/process-records", summary="获取维修处理记录")
|
||
def list_repair_process_records(order_id: str):
|
||
with get_db() as db:
|
||
rows = db.execute(
|
||
"SELECT * FROM repair_process_records WHERE order_id = ? ORDER BY date DESC",
|
||
(order_id,)
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.get("/api/repair/{order_id}/board-replacements", summary="获取维修板卡更换记录")
|
||
def list_repair_board_replacements(order_id: str):
|
||
with get_db() as db:
|
||
rows = db.execute(
|
||
"SELECT * FROM repair_board_replacements WHERE order_id = ? ORDER BY date DESC",
|
||
(order_id,)
|
||
).fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 17. 报废回收接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/scrap", summary="获取报废记录列表")
|
||
def list_scrap():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM scrap_records ORDER BY date DESC").fetchall()
|
||
result = []
|
||
for r in rows:
|
||
item = dict(r)
|
||
try:
|
||
item["materials"] = json.loads(item.get("materials", "[]"))
|
||
except Exception:
|
||
item["materials"] = []
|
||
result.append(item)
|
||
return result
|
||
|
||
|
||
@app.post("/api/scrap", summary="创建报废记录")
|
||
def create_scrap(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"""
|
||
INSERT INTO scrap_records (sn, model, reason, applicant, status, order_id, date, value, materials)
|
||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(
|
||
body.get("sn", ""), body.get("model", ""), body.get("reason", ""),
|
||
body.get("applicant", ""), body.get("status", "待审批"), body.get("order_id", ""),
|
||
body.get("date", now_date_str()), body.get("value", 0), json.dumps(body.get("materials", [])),
|
||
),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "报废记录创建成功"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 18. 更新日志接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/update-logs", summary="获取更新日志列表")
|
||
def list_update_logs():
|
||
with get_db() as db:
|
||
rows = db.execute("SELECT * FROM update_logs ORDER BY created_at DESC").fetchall()
|
||
return [dict(r) for r in rows]
|
||
|
||
|
||
@app.post("/api/update-logs", summary="创建更新日志")
|
||
def create_update_log(body: dict = Body(...)):
|
||
with get_db() as db:
|
||
db.execute(
|
||
"INSERT INTO update_logs (title, content, category, version) VALUES (?, ?, ?, ?)",
|
||
(body.get("title", ""), body.get("content", ""), body.get("category", "feature"), body.get("version", "")),
|
||
)
|
||
db.commit()
|
||
return {"success": True, "message": "更新日志创建成功"}
|
||
|
||
|
||
@app.delete("/api/update-logs", summary="删除更新日志")
|
||
def delete_update_log(id: int = Query(..., description="日志ID")):
|
||
with get_db() as db:
|
||
db.execute("DELETE FROM update_logs WHERE id = ?", (id,))
|
||
db.commit()
|
||
return {"success": True, "message": "更新日志已删除"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 19. 通用文件上传接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/upload", summary="通用文件上传")
|
||
async def upload_file(
|
||
file: UploadFile = File(..., description="待上传文件"),
|
||
folder: str = Form("apps", description="存储子目录"),
|
||
):
|
||
"""通用文件上传接口,文件保存到 uploads/{folder}/ 目录"""
|
||
file_path, file_name, file_size, md5 = save_upload_file(file, folder)
|
||
return {
|
||
"url": f"/uploads/{folder}/{file_name}",
|
||
"fileName": file.filename or "unknown",
|
||
"fileSize": file_size,
|
||
"fileType": file.content_type or "application/octet-stream",
|
||
}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 20. 授权下载与预览接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.get("/api/licenses/download", summary="APP下载授权文件")
|
||
def download_license(sn: str = Query(..., description="设备SN")):
|
||
"""APP专用接口:根据设备SN获取授权文件JSON"""
|
||
with get_db() as db:
|
||
row = db.execute(
|
||
"SELECT * FROM licenses WHERE device_sn = ? OR model = ? ORDER BY id DESC LIMIT 1",
|
||
(sn, "")
|
||
).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="未找到授权记录")
|
||
license_data = dict(row)
|
||
|
||
return {
|
||
"success": True,
|
||
"license": license_data,
|
||
"message": "授权文件获取成功",
|
||
}
|
||
|
||
|
||
@app.get("/api/licenses/{license_id}/preview", summary="预览授权文件")
|
||
def preview_license(license_id: int):
|
||
"""预览指定授权文件的完整JSON内容(包含授权项+配置文件+数字签名)"""
|
||
with get_db() as db:
|
||
row = db.execute("SELECT * FROM licenses WHERE id = ?", (license_id,)).fetchone()
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="授权记录不存在")
|
||
license = dict(row)
|
||
|
||
# 授权项映射表
|
||
AUTH_ITEM_MAP = {
|
||
"一维自电/电阻率/激电测试模块": {"id": "1D", "category": "一维"},
|
||
"二维自电/电阻率/激电测试模块": {"id": "2D", "category": "二维"},
|
||
"三维自电/电阻率/激电测试模块": {"id": "3D", "category": "三维"},
|
||
"水上": {"id": "WATER", "category": "水上"},
|
||
"跨孔": {"id": "CROSS", "category": "跨孔"},
|
||
"电流场法": {"id": "CF", "category": "电流场法"},
|
||
}
|
||
|
||
# 解析授权模块列表
|
||
modules_raw = license.get("modules", "")
|
||
module_names = []
|
||
if modules_raw:
|
||
try:
|
||
parsed = json.loads(modules_raw)
|
||
if isinstance(parsed, list):
|
||
module_names = [str(m) for m in parsed]
|
||
except Exception:
|
||
module_names = [m.strip() for m in modules_raw.split(",") if m.strip()]
|
||
|
||
auth_modules = []
|
||
for name in module_names:
|
||
mapped = AUTH_ITEM_MAP.get(name, {})
|
||
auth_modules.append({
|
||
"id": mapped.get("id", ""),
|
||
"name": name,
|
||
"category": mapped.get("category", ""),
|
||
"enabled": True,
|
||
})
|
||
|
||
# 读取关联的配置文件
|
||
config = {}
|
||
config_id = license.get("config_id")
|
||
if config_id:
|
||
with get_db() as db:
|
||
cfg = db.execute("SELECT * FROM config_files WHERE id = ?", (config_id,)).fetchone()
|
||
if cfg:
|
||
cfg = dict(cfg)
|
||
config = {
|
||
"name": cfg.get("name", ""),
|
||
"version": cfg.get("version", ""),
|
||
"emissionParams": {
|
||
"maxVoltage": cfg.get("max_tx_voltage", ""),
|
||
"maxCurrent": cfg.get("max_tx_current", ""),
|
||
"waveform": cfg.get("tx_waveform", ""),
|
||
"pulseWidth": cfg.get("tx_pulse_width", ""),
|
||
},
|
||
"acquisitionParams": {
|
||
"channels": cfg.get("acq_channels", ""),
|
||
"sampleRate": cfg.get("acq_sample_rate", ""),
|
||
"voltageRange": cfg.get("acq_voltage_range", ""),
|
||
"fullWaveform": cfg.get("full_waveform_capture", ""),
|
||
},
|
||
"networkParams": {
|
||
"wifiSSIDPrefix": cfg.get("ssid_prefix", ""),
|
||
},
|
||
}
|
||
|
||
# 组装授权文件主体(不含签名)
|
||
now = datetime.utcnow().isoformat() + "Z"
|
||
expiry = license.get("expiry", "")
|
||
valid_until = expiry if expiry else (datetime.utcnow().replace(year=datetime.utcnow().year + 1)).strftime("%Y-%m-%d")
|
||
|
||
license_data = {
|
||
"version": "1.0",
|
||
"generatedAt": now,
|
||
"deviceModel": license.get("model", ""),
|
||
"deviceSN": license.get("device_sn", ""),
|
||
"validUntil": valid_until,
|
||
"status": "active" if license.get("status") == "生效" else "inactive",
|
||
"authModules": auth_modules,
|
||
}
|
||
if config:
|
||
license_data["config"] = config
|
||
|
||
# 计算 SHA256 签名
|
||
json_string = json.dumps(license_data, ensure_ascii=False, separators=(",", ":"))
|
||
signature_value = hashlib.sha256(json_string.encode("utf-8")).hexdigest()
|
||
|
||
final_license = {
|
||
**license_data,
|
||
"signature": {
|
||
"algorithm": "SHA256",
|
||
"value": signature_value,
|
||
"publicKey": "platform-public-key-placeholder",
|
||
},
|
||
}
|
||
|
||
return {"success": True, "data": final_license}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 21. 种子数据接口
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
@app.post("/api/seed", summary="一键填充演示数据")
|
||
def seed_data():
|
||
"""清空现有数据后重新生成演示数据(仅用于测试环境)"""
|
||
seed_demo_data()
|
||
return {"success": True, "message": "演示数据已填充"}
|
||
|
||
|
||
# ═══════════════════════════════════════════════════════════════
|
||
# 运行入口(直接 python main.py 启动)
|
||
# ═══════════════════════════════════════════════════════════════
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run("main:app", host="localhost", port=8000, reload=True)
|