""" main.py FastAPI 主应用:设备管理平台后端服务。 启动方式: uvicorn main:app --reload --host 0.0.0.0 --port 8000 访问文档: http://localhost:8000/docs (Swagger UI) http://localhost:8000/redoc (ReDoc) """ import json import os import hashlib import shutil from datetime import datetime from typing import Optional, List from pydantic import BaseModel from fastapi import FastAPI, HTTPException, Depends, UploadFile, File, Form, Query, Request, Body from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import RedirectResponse, FileResponse, StreamingResponse from contextlib import asynccontextmanager from database import init_db, seed_demo_data, get_db, migrate_db, UPLOAD_DIR, scan_firmware_directory, FIRMWARE_TYPE_FOLDER, FIRMWARE_PUBLIC_DIR from models import ( DeviceCheckRequest, LicenseGenerateRequest, ActivateReportRequest, CheckResponse, LicenseResponse, ActivateResponse, DeviceInfo, ConfigFileCreateRequest, ConfigFileUpdateRequest, CalibrationFileInfo, CalibrationUpdateRequest, AppVersionCreateRequest, AppVersionUpdateRequest, AppVersionInfo, AppCheckUpdateRequest, AppCheckUpdateResponse, FirmwareCreateRequest, FirmwareUpdateRequest, FirmwareInfo, FirmwareCheckUpdateRequest, FirmwareCheckUpdateResponse, ) from services import build_license_data, encrypt_license, decrypt_license # ═══════════════════════════════════════════════════════════════ # 生命周期:启动时初始化数据库 # ═══════════════════════════════════════════════════════════════ @asynccontextmanager async def lifespan(app: FastAPI): """应用启动时自动创建表、执行迁移并插入演示数据""" init_db() migrate_db() seed_demo_data() yield # 关闭时可做清理(此处无需) app = FastAPI( title="设备管理平台 API", description="基于 FastAPI + SQLite 的轻量级设备管理后端,支持设备校验、授权文件生成、激活上报、校准文件管理、APP版本管理、固件版本管理。", version="2.0.0", lifespan=lifespan, ) # 允许跨域(方便前端/APP 调试) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) def now_str() -> int: """返回当前时间的 Unix 时间戳(秒)""" return int(datetime.now().timestamp()) def now_date_str() -> int: """返回当前日期 00:00:00 的 Unix 时间戳(秒)""" today = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0) return int(today.timestamp()) def save_upload_file(upload_file: UploadFile, subdir: str) -> tuple: """ 保存上传文件到指定子目录。 返回:(file_path, file_name, file_size, md5) """ target_dir = os.path.join(UPLOAD_DIR, subdir) os.makedirs(target_dir, exist_ok=True) original_name = upload_file.filename or "unknown" timestamp = int(datetime.now().timestamp()) safe_name = f"{timestamp}_{original_name}" file_path = os.path.join(target_dir, safe_name) file_size = 0 md5_hash = hashlib.md5() with open(file_path, "wb") as f: while chunk := upload_file.file.read(8192): f.write(chunk) file_size += len(chunk) md5_hash.update(chunk) md5 = md5_hash.hexdigest() return file_path, safe_name, file_size, md5 def delete_file_if_exists(file_path: Optional[str]): """如果文件存在则删除""" if file_path and os.path.exists(file_path): os.remove(file_path) # ═══════════════════════════════════════════════════════════════ # 根路由:自动跳转到 Swagger 文档 # ═══════════════════════════════════════════════════════════════ @app.get("/", include_in_schema=False) def root(): return RedirectResponse(url="/docs") # ═══════════════════════════════════════════════════════════════ # 1. 设备校验接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/devices/check", response_model=CheckResponse, summary="设备校验") def device_check(payload: DeviceCheckRequest): """ APP 启动时调用,校验设备 SN 是否合法、是否已激活。 - 若 SN 不存在 → 非法设备,拒绝激活 - 若 SN 存在但未激活 → 允许进入激活流程 - 若 SN 已激活 → 正常使用 """ with get_db() as db: row = db.execute( "SELECT status FROM devices WHERE sn = ?", (payload.sn,) ).fetchone() if not row: return CheckResponse( valid=False, activated=False, message="设备 SN 不存在,请联系管理员注册" ) status = row["status"] if status == "已激活": return CheckResponse( valid=True, activated=True, message="设备已激活,正常使用" ) elif status == "已禁用": return CheckResponse( valid=True, activated=False, message="设备已被禁用,请联系客服" ) else: return CheckResponse( valid=True, activated=False, message="设备待激活,请获取授权文件并激活" ) # ═══════════════════════════════════════════════════════════════ # 根路由:自动跳转到 Swagger 文档 # ═══════════════════════════════════════════════════════════════ @app.get("/", include_in_schema=False) def root(): return RedirectResponse(url="/docs") # ═══════════════════════════════════════════════════════════════ # 0. 设备管理接口(CRUD) # ═══════════════════════════════════════════════════════════════ @app.get("/api/devices", summary="获取设备列表") def get_devices( sn: Optional[str] = Query(None, description="按 SN 搜索"), model: Optional[str] = Query(None, description="按型号过滤"), status: Optional[str] = Query(None, description="按状态过滤"), ): """ 获取所有设备列表,支持按 SN、型号、状态过滤。 返回前端需要的字段格式。 """ with get_db() as db: sql = "SELECT * FROM devices WHERE 1=1" params = [] if sn: sql += " AND sn LIKE ?" params.append(f"%{sn}%") if model: sql += " AND model = ?" params.append(model) if status: sql += " AND status = ?" params.append(status) sql += " ORDER BY created_at DESC" rows = db.execute(sql, params).fetchall() # 转换为字典列表,匹配前端期望的字段 result = [] for row in rows: result.append({ "id": row["id"], "sn": row["sn"], "model": row["model"] or "", "type": row["type"] or "", "status": row["status"], "firmware": row["firmware"] or "", "production_date": row["production_date"] or "", "customer": row["customer"] or "-", "batch": row["batch"] or "" }) return result @app.post("/api/devices/register", summary="注册新设备") def register_device(payload: dict): """ 注册新设备到系统。 接收设备信息并插入数据库。 """ sn = payload.get("sn", "") model = payload.get("model", "") type_val = payload.get("type", "") firmware = payload.get("firmware", "") production_date = payload.get("production_date", "") customer = payload.get("customer", "-") batch = payload.get("batch", "") if not sn: raise HTTPException(status_code=400, detail="设备 SN 不能为空") with get_db() as db: # 检查 SN 是否已存在 existing = db.execute("SELECT id FROM devices WHERE sn = ?", (sn,)).fetchone() if existing: raise HTTPException(status_code=409, detail=f"设备 SN {sn} 已存在") # 插入新设备 cur = db.execute( """ INSERT INTO devices (sn, model, type, status, firmware, production_date, customer, batch, created_at) VALUES (?, ?, ?, '待激活', ?, ?, ?, ?, ?) """, (sn, model, type_val, firmware, production_date, customer, batch, now_str()) ) db.commit() device_id = cur.lastrowid return { "success": True, "id": device_id, "message": "设备注册成功" } @app.put("/api/devices", summary="更新设备信息") def update_device(payload: dict): """ 更新设备信息(如状态、固件版本等)。 """ device_id = payload.get("id") if not device_id: raise HTTPException(status_code=400, detail="设备 ID 不能为空") with get_db() as db: # 检查设备是否存在 existing = db.execute("SELECT id FROM devices WHERE id = ?", (device_id,)).fetchone() if not existing: raise HTTPException(status_code=404, detail="设备不存在") # 构建更新语句 updates = [] params = [] if "status" in payload: updates.append("status = ?") params.append(payload["status"]) if "firmware" in payload: updates.append("firmware = ?") params.append(payload["firmware"]) if "activated_at" in payload: updates.append("activated_at = ?") params.append(payload["activated_at"]) if not updates: return {"success": True, "message": "没有要更新的字段"} params.append(device_id) db.execute( f"UPDATE devices SET {', '.join(updates)} WHERE id = ?", params ) db.commit() return {"success": True, "message": "设备信息更新成功"} @app.delete("/api/devices", summary="删除设备") def delete_device(payload: dict): """ 删除指定设备。 """ device_id = payload.get("id") if not device_id: raise HTTPException(status_code=400, detail="设备 ID 不能为空") with get_db() as db: # 检查设备是否存在 existing = db.execute("SELECT id FROM devices WHERE id = ?", (device_id,)).fetchone() if not existing: raise HTTPException(status_code=404, detail="设备不存在") db.execute("DELETE FROM devices WHERE id = ?", (device_id,)) db.commit() return {"success": True, "message": "设备已删除"} # ═══════════════════════════════════════════════════════════════ # 2. 生成加密授权文件接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/licenses/generate", response_model=LicenseResponse, summary="生成加密授权文件") def generate_license(payload: LicenseGenerateRequest): """ 管理后台调用,为指定设备生成绑定 SN 的加密授权文件。 流程: 1. 校验设备 SN 是否存在 2. 若指定了 config_id,则读取配置文件内容 3. 构造授权数据(模块列表 + 有效期 + 配置信息) 4. XOR 加密(密钥由 SN 派生,一机一密) 5. 返回 Base64 加密字符串 APP 收到后必须用相同 SN 才能解密。解密后可获得完整的配置参数。 """ with get_db() as db: row = db.execute( "SELECT sn FROM devices WHERE sn = ?", (payload.sn,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="设备 SN 不存在") # 读取配置文件(如果指定了 config_id) config = None if payload.config_id is not None: with get_db() as db: cfg_row = db.execute( "SELECT * FROM config_files WHERE id = ?", (payload.config_id,) ).fetchone() if not cfg_row: raise HTTPException(status_code=404, detail=f"配置文件 ID {payload.config_id} 不存在") config = dict(cfg_row) # 构造明文授权数据 license_data = build_license_data( device_sn=payload.sn, modules=payload.modules, valid_days=payload.valid_days, ) # 将配置信息嵌入授权数据 raw_license = license_data.model_dump() if config: raw_license["config"] = { "name": config.get("name"), "model": config.get("model"), "version": config.get("version"), "max_tx_voltage": config.get("max_tx_voltage"), "max_tx_current": config.get("max_tx_current"), "tx_waveform": config.get("tx_waveform"), "tx_pulse_width": config.get("tx_pulse_width"), "acq_channels": config.get("acq_channels"), "acq_sample_rate": config.get("acq_sample_rate"), "acq_voltage_range": config.get("acq_voltage_range"), "full_waveform_capture": config.get("full_waveform_capture"), "ssid_prefix": config.get("ssid_prefix"), } # 加密(绑定 SN) encrypted = encrypt_license(license_data) # 重新加密包含 config 的完整授权 from services import _derive_key import base64 json_bytes = json.dumps(raw_license, ensure_ascii=False).encode("utf-8") key = _derive_key(payload.sn) encrypted = bytearray() for i, byte in enumerate(json_bytes): encrypted.append(byte ^ key[i % len(key)]) encrypted_b64 = base64.b64encode(encrypted).decode("ascii") return LicenseResponse( success=True, encrypted_license=encrypted_b64, raw_license=raw_license, # 调试用,生产环境可去掉 message="授权文件生成成功,已绑定设备 SN 并嵌入配置信息" if config else "授权文件生成成功,已绑定设备 SN" ) # ═══════════════════════════════════════════════════════════════ # 3. 上报激活状态接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/devices/activate", response_model=ActivateResponse, summary="上报激活状态") def report_activation(payload: ActivateReportRequest): """ 设备主机首次联网后调用,上报激活结果。 - 若上报 "已激活" → 数据库更新状态与激活时间 - 若上报 "激活失败" → 状态保持原样,记录失败 """ with get_db() as db: row = db.execute( "SELECT status FROM devices WHERE sn = ?", (payload.sn,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="设备 SN 不存在") current_status = row["status"] # 只有"待激活"或"已禁用"的设备允许重新激活 if payload.status == "已激活": db.execute( """ UPDATE devices SET status = '已激活', activated_at = ? WHERE sn = ? """, (now_str(), payload.sn), ) db.commit() # 查询更新后的时间 updated = db.execute( "SELECT activated_at FROM devices WHERE sn = ?", (payload.sn,) ).fetchone() return ActivateResponse( success=True, sn=payload.sn, new_status="已激活", activated_at=updated["activated_at"], message="设备激活成功,已记录激活时间" ) else: return ActivateResponse( success=False, sn=payload.sn, new_status=current_status, message="激活失败,未更新状态" ) # ═══════════════════════════════════════════════════════════════ # 4. 配置文件管理接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/config-files", summary="创建配置文件") def create_config_file(payload: ConfigFileCreateRequest): """ 创建设备技术参数配置文件。 配置文件包含发射参数、采集参数和网络参数, 生成授权文件时可通过 config_id 关联,将配置嵌入授权中下发给设备。 """ with get_db() as db: cur = db.execute( """ INSERT INTO config_files (name, model, version, status, max_tx_voltage, max_tx_current, tx_waveform, tx_pulse_width, acq_channels, acq_sample_rate, acq_voltage_range, full_waveform_capture, ssid_prefix, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( payload.name, payload.model, payload.version, payload.status, payload.max_tx_voltage or "", payload.max_tx_current or "", payload.tx_waveform or "", payload.tx_pulse_width or "", payload.acq_channels or "", payload.acq_sample_rate or "", payload.acq_voltage_range or "", payload.full_waveform_capture or "", payload.ssid_prefix or "", now_str(), ), ) cfg_id = cur.lastrowid db.commit() return {"success": True, "id": cfg_id, "message": "配置文件创建成功"} @app.get("/api/config-files", summary="查看配置文件列表") def list_config_files( model: Optional[str] = Query(None, description="按设备型号过滤"), status: Optional[str] = Query(None, description="按状态过滤:生效/失效"), ): """获取配置文件列表,支持按设备型号和状态过滤。""" with get_db() as db: sql = "SELECT * FROM config_files WHERE 1=1" params = [] if model: sql += " AND model = ?" params.append(model) if status: sql += " AND status = ?" params.append(status) sql += " ORDER BY created_at DESC" rows = db.execute(sql, params).fetchall() result = [dict(r) for r in rows] return {"success": True, "data": result, "total": len(result)} @app.get("/api/config-files/{cfg_id}", summary="查看单个配置文件详情") def get_config_file_detail(cfg_id: int): """获取指定 ID 的配置文件详细信息。""" with get_db() as db: row = db.execute( "SELECT * FROM config_files WHERE id = ?", (cfg_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="配置文件不存在") return {"success": True, "data": dict(row)} @app.put("/api/config-files/{cfg_id}", summary="更新配置文件") def update_config_file(cfg_id: int, payload: ConfigFileUpdateRequest): """更新配置文件的参数信息。""" with get_db() as db: row = db.execute( "SELECT id FROM config_files WHERE id = ?", (cfg_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="配置文件不存在") updates = [] params = [] fields = [ ("name", payload.name), ("model", payload.model), ("version", payload.version), ("status", payload.status), ("max_tx_voltage", payload.max_tx_voltage), ("max_tx_current", payload.max_tx_current), ("tx_waveform", payload.tx_waveform), ("tx_pulse_width", payload.tx_pulse_width), ("acq_channels", payload.acq_channels), ("acq_sample_rate", payload.acq_sample_rate), ("acq_voltage_range", payload.acq_voltage_range), ("full_waveform_capture", payload.full_waveform_capture), ("ssid_prefix", payload.ssid_prefix), ] for field, value in fields: if value is not None: updates.append(f"{field} = ?") params.append(value) if not updates: return {"success": True, "message": "没有要更新的字段"} updates.append("updated_at = ?") params.append(now_str()) params.append(cfg_id) db.execute( f"UPDATE config_files SET {', '.join(updates)} WHERE id = ?", params, ) db.commit() return {"success": True, "message": "配置文件更新成功"} @app.delete("/api/config-files/{cfg_id}", summary="删除配置文件") def delete_config_file(cfg_id: int): """删除指定配置文件。""" with get_db() as db: row = db.execute( "SELECT id FROM config_files WHERE id = ?", (cfg_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="配置文件不存在") db.execute("DELETE FROM config_files WHERE id = ?", (cfg_id,)) db.commit() return {"success": True, "message": "配置文件已删除"} # ═══════════════════════════════════════════════════════════════ # 5. 校准文件管理接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/calibration/upload", summary="上传校准文件") async def upload_calibration( file: UploadFile = File(..., description="校准 JSON 文件"), material_sn: str = Form(..., description="物料 SN(采集板)"), operator: Optional[str] = Form(None, description="操作员"), remark: Optional[str] = Form(None, description="备注"), ): """ Windows 校准软件调用此接口,将采集板校准后的 JSON 文件上传至平台。 上传成功后: 1. 解析 JSON 中的 SN、UID、CalibrateFactor 2. 计算各通道校准系数,判断整体是否合格 3. 保存原始文件到服务器 4. 写入 calibration_files 和 calibration_channels 表 校准 JSON 格式示例: ```json { "SN": "456", "UID": "FFFF589EFF0C50434D583530", "CalibrateFactor": [ { "ChannelId": 0, "80V": {"factor": 7.709273, "offset": 0.055221}, "2.5V": {"factor": 1.006537, "offset": 0.010941} } ] } ``` """ # 读取文件内容 content = await file.read() json_text = content.decode("utf-8") # 解析 JSON try: calib_data = json.loads(json_text) except json.JSONDecodeError: raise HTTPException(status_code=400, detail="无效的 JSON 格式") sn = calib_data.get("SN", "") uid = calib_data.get("UID", "") factors = calib_data.get("CalibrateFactor", []) if not isinstance(factors, list): raise HTTPException(status_code=400, detail="CalibrateFactor 必须是数组") # 判断整体结果:所有通道 factor 必须大于 0 all_reasonable = True for ch in factors: v80 = ch.get("80V", {}) v25 = ch.get("2.5V", {}) if v80.get("factor", 0) <= 0 or v25.get("factor", 0) <= 0: all_reasonable = False break overall_result = "合格" if all_reasonable else "不合格" # 保存文件 file_path, file_name, file_size, md5 = save_upload_file(file, "calibration") with get_db() as db: # 插入校准文件记录 cur = db.execute( """ INSERT INTO calibration_files (material_sn, sn, uid, file_name, file_path, file_size, md5, result, operator, remark, channels_count) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, (material_sn, sn, uid, file_name, file_path, file_size, md5, overall_result, operator or "", remark or "", len(factors)), ) calib_id = cur.lastrowid # 插入通道数据 for ch in factors: v80 = ch.get("80V", {}) v25 = ch.get("2.5V", {}) db.execute( """ INSERT INTO calibration_channels (calibration_id, channel_id, factor_80v, offset_80v, factor_2_5v, offset_2_5v) VALUES (?, ?, ?, ?, ?, ?) """, ( calib_id, ch.get("ChannelId", 0), v80.get("factor", 0), v80.get("offset", 0), v25.get("factor", 0), v25.get("offset", 0), ), ) db.commit() return { "success": True, "id": calib_id, "material_sn": material_sn, "sn": sn, "uid": uid, "file_name": file_name, "file_size": file_size, "md5": md5, "result": overall_result, "channels_count": len(factors), "message": "校准文件上传成功", } @app.get("/api/calibration", summary="查看校准文件列表") def list_calibration( material_sn: Optional[str] = Query(None, description="按物料 SN 过滤"), sn: Optional[str] = Query(None, description="按校准 SN 过滤"), uid: Optional[str] = Query(None, description="按 UID 过滤"), ): """ 获取校准文件列表,支持按物料 SN、校准 SN、UID 过滤。 可用于 Windows 校准软件查询已上传记录,也可用于 APP 获取校准信息。 """ with get_db() as db: sql = "SELECT * FROM calibration_files WHERE 1=1" params = [] if material_sn: sql += " AND material_sn = ?" params.append(material_sn) if sn: sql += " AND sn = ?" params.append(sn) if uid: sql += " AND uid = ?" params.append(uid) sql += " ORDER BY upload_time DESC" rows = db.execute(sql, params).fetchall() result = [] for r in rows: item = dict(r) # 查询通道数据 ch_rows = db.execute( "SELECT * FROM calibration_channels WHERE calibration_id = ?", (item["id"],), ).fetchall() item["channels"] = [dict(ch) for ch in ch_rows] result.append(item) return {"success": True, "data": result, "total": len(result)} @app.get("/api/calibration/{calib_id}", summary="查看单个校准文件详情") def get_calibration_detail(calib_id: int): """获取指定 ID 的校准文件详细信息,包含所有通道数据。""" with get_db() as db: row = db.execute( "SELECT * FROM calibration_files WHERE id = ?", (calib_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="校准文件不存在") result = dict(row) ch_rows = db.execute( "SELECT * FROM calibration_channels WHERE calibration_id = ?", (calib_id,), ).fetchall() result["channels"] = [dict(ch) for ch in ch_rows] return {"success": True, "data": result} @app.put("/api/calibration/{calib_id}", summary="更新校准文件信息") def update_calibration(calib_id: int, payload: CalibrationUpdateRequest): """ 更新校准文件的元数据(操作员、备注、结果等)。 不支持修改原始 JSON 文件内容,如需修改请重新上传。 """ with get_db() as db: row = db.execute( "SELECT id FROM calibration_files WHERE id = ?", (calib_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="校准文件不存在") updates = [] params = [] if payload.operator is not None: updates.append("operator = ?") params.append(payload.operator) if payload.remark is not None: updates.append("remark = ?") params.append(payload.remark) if payload.result is not None: updates.append("result = ?") params.append(payload.result) if not updates: return {"success": True, "message": "没有要更新的字段"} params.append(calib_id) db.execute( f"UPDATE calibration_files SET {', '.join(updates)} WHERE id = ?", params, ) db.commit() return {"success": True, "message": "校准文件信息更新成功"} @app.delete("/api/calibration/{calib_id}", summary="删除校准文件") def delete_calibration(calib_id: int): """删除指定校准文件及其通道数据,同时删除服务器上的原始文件。""" with get_db() as db: row = db.execute( "SELECT file_path FROM calibration_files WHERE id = ?", (calib_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="校准文件不存在") file_path = row["file_path"] db.execute("DELETE FROM calibration_files WHERE id = ?", (calib_id,)) db.commit() delete_file_if_exists(file_path) return {"success": True, "message": "校准文件已删除"} @app.get("/api/calibration/download/{calib_id}", summary="下载校准原始文件") def download_calibration(calib_id: int): """ 下载校准原始 JSON 文件。 APP 可通过此接口从平台下载校准文件到本地。 """ with get_db() as db: row = db.execute( "SELECT file_name, file_path FROM calibration_files WHERE id = ?", (calib_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="校准文件不存在") file_path = row["file_path"] if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="服务器上文件已丢失") return FileResponse( path=file_path, filename=row["file_name"], media_type="application/json", ) # ═══════════════════════════════════════════════════════════════ # 5. APP 版本管理接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/apps/upload", summary="上传 APP 安装包并创建版本") async def upload_app( file: UploadFile = File(..., description="APP 安装包文件(apk/ipa)"), app_name: str = Form(..., description="应用名称"), package_name: Optional[str] = Form(None, description="包名"), platform_type: int = Form(2, description="平台类型:1=iOS, 2=Android"), version_name: str = Form(..., description="版本号名称,如 1.2.0"), major_version: int = Form(1, ge=0), minor_version: int = Form(0, ge=0), patch_version: int = Form(0, ge=0), os_min_version: Optional[str] = Form(None, description="最低系统版本"), is_force_update: bool = Form(False, description="是否强制更新"), changelog: Optional[str] = Form(None, description="更新日志,JSON 字符串数组"), status: int = Form(1, description="状态:0=草稿, 1=已发布, 2=已下架"), ): """ 上传 APP 安装包(apk/ipa)并创建版本记录。 - 文件保存到 uploads/apps/ 目录 - 记录写入 app_versions 表 - 返回下载 URL 和版本信息 """ file_path, file_name, file_size, md5 = save_upload_file(file, "apps") file_type = "ipa" if file.filename and file.filename.lower().endswith(".ipa") else "apk" primary_url = f"/api/apps/download/{file_name}" with get_db() as db: cur = db.execute( """ INSERT INTO app_versions (app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, file_name, file_path, file_size, file_type, distribution_type, primary_url, os_min_version, is_force_update, changelog, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, file_name, file_path, file_size, file_type, "direct", primary_url, os_min_version or "", int(is_force_update), changelog or "", status, now_str(), ), ) app_id = cur.lastrowid db.commit() return { "success": True, "id": app_id, "app_name": app_name, "version_name": version_name, "file_size": file_size, "primary_url": primary_url, "message": "APP 上传成功", } @app.post("/api/apps", summary="创建 APP/平台/版本(支持 action 模式)") def create_app_record(payload: dict = Body(...)): """ 支持三种操作模式: 1. action='create_app': 创建应用 2. action='add_platform': 添加平台 3. action='add_version': 添加版本 4. 无action字段: 使用原有的 AppVersionCreateRequest 模式 """ action = payload.get('action') with get_db() as db: # 模式1: 创建应用 if action == 'create_app': name = payload.get('name', '') package_name = payload.get('package_name', '') description = payload.get('description', '') cur = db.execute( "INSERT INTO app_versions (app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)", (name, package_name, 2, '1.0.0', 1, 0, 0, 1, now_str()) ) app_id = cur.lastrowid db.commit() return {"success": True, "id": app_id} # 模式2: 添加平台(当前实现为占位,因为数据库表结构不支持独立的platforms表) elif action == 'add_platform': app_id = payload.get('app_id') platform_type = payload.get('platform_type', 2) description = payload.get('description', '') # 由于当前表结构不支持独立平台,这里返回一个虚拟ID # 实际应该创建platforms表 return {"success": True, "id": platform_type, "message": "平台添加成功(模拟)"} # 模式3: 添加版本 elif action == 'add_version': app_id = payload.get('app_id') platform_id = payload.get('platform_id') major_version = payload.get('major_version', 0) minor_version = payload.get('minor_version', 0) patch_version = payload.get('patch_version', 0) version_name = payload.get('version_name', f'{major_version}.{minor_version}.{patch_version}') file_type = payload.get('file_type', '') file_size = payload.get('file_size', 0) distribution_type = payload.get('distribution_type', 'direct') primary_url = payload.get('primary_url', '') os_min_version = payload.get('os_min_version', '') is_force_update = int(payload.get('is_force_update', False)) changelog = json.dumps(payload.get('changelog', []), ensure_ascii=False) # 获取应用名称 app_row = db.execute("SELECT app_name FROM app_versions WHERE id = ?", (app_id,)).fetchone() app_name = app_row['app_name'] if app_row else 'Unknown' cur = db.execute( """INSERT INTO app_versions (app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, file_type, file_size, distribution_type, primary_url, os_min_version, is_force_update, changelog, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( app_name, '', platform_id, version_name, major_version, minor_version, patch_version, file_type, file_size, distribution_type, primary_url, os_min_version, is_force_update, changelog, 1, now_str() ) ) version_id = cur.lastrowid db.commit() return {"success": True, "id": version_id} # 模式4: 原有的 AppVersionCreateRequest 模式 else: cur = db.execute( """ INSERT INTO app_versions (app_name, package_name, platform_type, version_name, major_version, minor_version, patch_version, file_type, distribution_type, os_min_version, is_force_update, changelog, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( payload.get('app_name', ''), payload.get('package_name', ''), payload.get('platform_type', 2), payload.get('version_name', '1.0.0'), payload.get('major_version', 1), payload.get('minor_version', 0), payload.get('patch_version', 0), payload.get('file_type', ''), payload.get('distribution_type', 'direct'), payload.get('os_min_version', ''), int(payload.get('is_force_update', False)), json.dumps(payload.get('changelog', []), ensure_ascii=False), payload.get('status', 1), now_str(), ), ) app_id = cur.lastrowid db.commit() return {"success": True, "id": app_id, "message": "APP 版本记录创建成功"} @app.get("/api/apps", summary="查看 APP 列表(支持 action 模式)") def list_apps( app_id: Optional[int] = Query(None, description="应用ID,获取单个应用详情"), app_name: Optional[str] = Query(None, description="应用名称"), platform_type: Optional[int] = Query(None, description="平台类型:1=iOS, 2=Android"), status: Optional[int] = Query(None, description="状态:0=草稿, 1=已发布, 2=已下架"), ): """ 获取 APP 列表或详情。 如果提供 app_id,返回单个应用的详细信息(包含 platforms 和 versions)。 否则返回应用列表。 """ with get_db() as db: # 获取单个应用详情 if app_id: # 获取应用基本信息(使用第一条记录作为应用信息) app_row = db.execute( "SELECT * FROM app_versions WHERE id = ? ORDER BY created_at ASC LIMIT 1", (app_id,) ).fetchone() if not app_row: raise HTTPException(status_code=404, detail="应用不存在") app_dict = dict(app_row) # 构建平台列表(根据platform_type去重) platforms_rows = db.execute( "SELECT DISTINCT platform_type FROM app_versions WHERE id = ? OR (app_name = ? AND platform_type > 0)", (app_id, app_dict['app_name']) ).fetchall() platforms = [] for i, p_row in enumerate(platforms_rows): p_type = p_row['platform_type'] platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'} platforms.append({ 'id': p_type, 'app_id': app_id, 'platform_type': p_type, 'platform_name': platform_names.get(p_type, f'Platform {p_type}'), 'description': '', 'file_types': [], 'dist_types': [] }) # 如果没有平台数据,创建一个默认平台 if not platforms: platforms.append({ 'id': 2, 'app_id': app_id, 'platform_type': 2, 'platform_name': 'Android', 'description': '', 'file_types': [], 'dist_types': [] }) # 获取版本列表 versions_rows = db.execute( "SELECT * FROM app_versions WHERE (id = ? OR app_name = ?) AND version_name != '1.0.0' ORDER BY created_at DESC", (app_id, app_dict['app_name']) ).fetchall() versions = [] for v_row in versions_rows: v_dict = dict(v_row) platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'} versions.append({ 'id': v_dict['id'], 'app_id': app_id, 'platform_id': v_dict.get('platform_type', 2), 'major_version': v_dict.get('major_version', 0), 'minor_version': v_dict.get('minor_version', 0), 'patch_version': v_dict.get('patch_version', 0), 'version_name': v_dict.get('version_name', ''), 'description': '', 'file_type': v_dict.get('file_type', ''), 'file_url': v_dict.get('primary_url', ''), 'file_size': v_dict.get('file_size', 0), 'distribution_type': v_dict.get('distribution_type', 'direct'), 'primary_url': v_dict.get('primary_url', ''), 'min_support_version': '', 'os_min_version': v_dict.get('os_min_version', ''), 'status': v_dict.get('status', 1), 'is_force_update': v_dict.get('is_force_update', 0), 'release_date': str(v_dict.get('created_at', '')), 'changelog': json.loads(v_dict.get('changelog', '[]')) if v_dict.get('changelog') else [], 'platform_name': platform_names.get(v_dict.get('platform_type', 2), 'Unknown') }) return { 'id': app_id, 'name': app_dict.get('app_name', ''), 'package_name': app_dict.get('package_name', ''), 'description': '', 'status': app_dict.get('status', 1), 'create_time': str(app_dict.get('created_at', '')), 'platforms': platforms, 'versions': versions } # 获取应用列表 else: sql = "SELECT * FROM app_versions WHERE version_name = '1.0.0'" params = [] if app_name: sql += " AND app_name LIKE ?" params.append(f"%{app_name}%") if platform_type is not None: sql += " AND platform_type = ?" params.append(platform_type) if status is not None: sql += " AND status = ?" params.append(status) sql += " ORDER BY created_at DESC" rows = db.execute(sql, params).fetchall() result = [] for row in rows: row_dict = dict(row) # 统计该应用的版本数量 version_count = db.execute( "SELECT COUNT(*) as count FROM app_versions WHERE app_name = ? AND version_name != '1.0.0'", (row_dict['app_name'],) ).fetchone()['count'] # 获取该应用的平台列表 platform_rows = db.execute( "SELECT DISTINCT platform_type FROM app_versions WHERE app_name = ?", (row_dict['app_name'],) ).fetchall() platform_names = {1: 'iOS', 2: 'Android', 3: 'HarmonyOS', 4: 'Windows', 5: 'macOS', 6: 'Linux', 7: 'Web'} platforms = [] for p_row in platform_rows: p_type = p_row['platform_type'] platforms.append({ 'id': p_type, 'platform_type': p_type, 'platform_name': platform_names.get(p_type, f'Platform {p_type}') }) result.append({ 'id': row_dict['id'], 'name': row_dict.get('app_name', ''), 'package_name': row_dict.get('package_name', ''), 'description': '', 'status': row_dict.get('status', 1), 'create_time': str(row_dict.get('created_at', '')), 'platforms': platforms, 'versionCount': version_count }) return result @app.get("/api/apps/{app_id}", summary="查看单个 APP 版本详情") def get_app_detail(app_id: int): """获取指定 ID 的 APP 版本详细信息。""" with get_db() as db: row = db.execute( "SELECT * FROM app_versions WHERE id = ?", (app_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="APP 版本不存在") return {"success": True, "data": dict(row)} @app.put("/api/apps/{app_id}", summary="更新 APP 版本信息") def update_app(app_id: int, payload: AppVersionUpdateRequest): """更新 APP 版本的元数据信息。""" with get_db() as db: row = db.execute( "SELECT id FROM app_versions WHERE id = ?", (app_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="APP 版本不存在") updates = [] params = [] fields = [ ("app_name", payload.app_name), ("package_name", payload.package_name), ("version_name", payload.version_name), ("platform_type", payload.platform_type), ("os_min_version", payload.os_min_version), ("is_force_update", int(payload.is_force_update) if payload.is_force_update is not None else None), ("status", payload.status), ] for field, value in fields: if value is not None: updates.append(f"{field} = ?") params.append(value) if payload.changelog is not None: updates.append("changelog = ?") params.append(json.dumps(payload.changelog, ensure_ascii=False)) if not updates: return {"success": True, "message": "没有要更新的字段"} updates.append("updated_at = ?") params.append(now_str()) params.append(app_id) db.execute( f"UPDATE app_versions SET {', '.join(updates)} WHERE id = ?", params, ) db.commit() return {"success": True, "message": "APP 版本更新成功"} @app.put("/api/apps", summary="更新 APP 版本信息(兼容前端 body 传 id)") def update_app_compat(payload: dict): """兼容前端通过 body 中的 id 更新 APP 版本。""" app_id = payload.get("id") if not app_id: raise HTTPException(status_code=400, detail="APP 版本 ID 不能为空") with get_db() as db: row = db.execute( "SELECT id FROM app_versions WHERE id = ?", (app_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="APP 版本不存在") updates = [] params = [] fields = [ ("app_name", payload.get("app_name")), ("package_name", payload.get("package_name")), ("version_name", payload.get("version_name")), ("platform_type", payload.get("platform_type")), ("os_min_version", payload.get("os_min_version")), ("is_force_update", int(payload["is_force_update"]) if payload.get("is_force_update") is not None else None), ("status", payload.get("status")), ] for field, value in fields: if value is not None: updates.append(f"{field} = ?") params.append(value) if payload.get("changelog") is not None: updates.append("changelog = ?") params.append(json.dumps(payload["changelog"], ensure_ascii=False)) if not updates: return {"success": True, "message": "没有要更新的字段"} updates.append("updated_at = ?") params.append(now_str()) params.append(app_id) db.execute( f"UPDATE app_versions SET {', '.join(updates)} WHERE id = ?", params, ) db.commit() return {"success": True, "message": "APP 版本更新成功"} @app.delete("/api/apps", summary="删除 APP 版本(兼容前端 query 传 id)") def delete_app_compat(id: int = Query(..., description="APP 版本 ID"), type: str = Query("", description="类型标记,暂不使用")): """兼容前端通过 query 参数删除 APP 版本。""" with get_db() as db: row = db.execute( "SELECT file_path FROM app_versions WHERE id = ?", (id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="APP 版本不存在") file_path = row["file_path"] db.execute("DELETE FROM app_versions WHERE id = ?", (id,)) db.commit() delete_file_if_exists(file_path) return {"success": True, "message": "APP 版本已删除"} @app.delete("/api/apps/{app_id}", summary="删除 APP 版本") def delete_app(app_id: int): """删除指定 APP 版本记录,同时删除服务器上的安装包文件。""" with get_db() as db: row = db.execute( "SELECT file_path FROM app_versions WHERE id = ?", (app_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="APP 版本不存在") file_path = row["file_path"] db.execute("DELETE FROM app_versions WHERE id = ?", (app_id,)) db.commit() delete_file_if_exists(file_path) return {"success": True, "message": "APP 版本已删除"} @app.get("/api/apps/download/{file_name}", summary="下载 APP 安装包") def download_app(file_name: str): """ 下载 APP 安装包文件(apk/ipa)。 APP 可通过此接口从平台下载更新包。 """ file_path = os.path.join(UPLOAD_DIR, "apps", file_name) if not os.path.exists(file_path): raise HTTPException(status_code=404, detail="文件不存在") media_type = "application/vnd.android.package-archive" if file_name.endswith(".apk") else "application/octet-stream" return FileResponse(path=file_path, filename=file_name, media_type=media_type) @app.post("/api/apps/check-update", response_model=AppCheckUpdateResponse, summary="APP 检查更新") def check_app_update(payload: AppCheckUpdateRequest): """ APP 启动时调用此接口检查是否有新版本。 请求中携带当前版本号,后端比较最新已发布版本的版本号, 返回是否有更新、是否强制更新、下载地址等信息。 """ with get_db() as db: row = db.execute( """ SELECT * FROM app_versions WHERE app_name = ? AND platform_type = ? AND status = 1 ORDER BY major_version DESC, minor_version DESC, patch_version DESC LIMIT 1 """, (payload.app_name, payload.platform_type), ).fetchone() if not row: return AppCheckUpdateResponse( has_update=False, force_update=False, message="没有找到该应用的版本记录", ) latest = dict(row) current_parts = payload.current_version.strip("vV").split(".") try: cur_major = int(current_parts[0]) if len(current_parts) > 0 else 0 cur_minor = int(current_parts[1]) if len(current_parts) > 1 else 0 cur_patch = int(current_parts[2]) if len(current_parts) > 2 else 0 except ValueError: cur_major = cur_minor = cur_patch = 0 new_major = latest.get("major_version", 0) new_minor = latest.get("minor_version", 0) new_patch = latest.get("patch_version", 0) has_update = ( new_major > cur_major or (new_major == cur_major and new_minor > cur_minor) or (new_major == cur_major and new_minor == cur_minor and new_patch > cur_patch) ) if not has_update: return AppCheckUpdateResponse( has_update=False, force_update=False, message="当前已是最新版本", ) changelog = [] if latest.get("changelog"): try: changelog = json.loads(latest["changelog"]) except Exception: changelog = [] download_url = latest.get("primary_url") or f"/api/apps/download/{latest.get('file_name', '')}" return AppCheckUpdateResponse( has_update=True, force_update=bool(latest.get("is_force_update", 0)), version_name=latest.get("version_name"), download_url=download_url, changelog=changelog, file_size=latest.get("file_size"), message="发现新版本,请下载更新", ) # ═══════════════════════════════════════════════════════════════ # 6. 固件版本管理接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/firmware/upload", summary="上传固件文件并创建版本") async def upload_firmware( file: UploadFile = File(..., description="固件文件(bin/hex/tar.gz 等)"), version: str = Form(..., description="固件版本号,如 v3.2.1"), firmware_type: str = Form(..., description="固件类型:采集板/发射板/主协板/主机服务"), board_model: Optional[str] = Form(None, description="板卡型号,如 GD30-ACQ-01"), device_model: Optional[str] = Form(None, description="设备型号,如 GD30-2024"), hw_range: Optional[str] = Form(None, description="硬件适用范围"), upgrade_type: str = Form("可选", description="升级类型:强制/可选"), signed: bool = Form(False, description="是否已签名"), notes: Optional[str] = Form(None, description="更新说明,JSON 字符串数组"), status: str = Form("已发布", description="状态:草稿/已发布/已下架/兼容"), ): """ 上传固件文件并创建固件版本记录。 支持的固件类型: - 采集板(CJB 子目录) - 发射板(FSB 子目录) - 主协板(XCL 子目录) - 主机服务(根目录,如 package_arm_YYYYMMDD.tar.gz) 文件保存到 public/uploads/GD/firmware/ 对应子目录。 """ # 根据固件类型确定保存子目录 folder = FIRMWARE_TYPE_FOLDER.get(firmware_type, "") target_dir = os.path.join(FIRMWARE_PUBLIC_DIR, folder) os.makedirs(target_dir, exist_ok=True) original_name = file.filename or "unknown" timestamp = int(datetime.now().timestamp()) safe_name = f"{timestamp}_{original_name}" file_path = os.path.join(target_dir, safe_name) file_size = 0 md5_hash = hashlib.md5() with open(file_path, "wb") as f: while chunk := file.file.read(8192): f.write(chunk) file_size += len(chunk) md5_hash.update(chunk) md5 = md5_hash.hexdigest() # 相对路径用于下载 rel_folder = folder + "/" if folder else "" primary_url = f"/uploads/GD/firmware/{rel_folder}{safe_name}" with get_db() as db: cur = db.execute( """ INSERT INTO firmware_versions (version, firmware_type, board_model, device_model, file_name, file_path, file_size, hw_range, upgrade_type, signed, notes, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( version, firmware_type, board_model or "", device_model or "", safe_name, file_path, file_size, hw_range or "", upgrade_type, int(signed), notes or "", status, now_str(), ), ) fw_id = cur.lastrowid db.commit() return { "success": True, "id": fw_id, "version": version, "firmware_type": firmware_type, "file_size": file_size, "primary_url": primary_url, "message": "固件上传成功", } @app.post("/api/firmware", summary="创建固件版本记录(不上传文件)") def create_firmware_record(payload: FirmwareCreateRequest): """ 仅创建固件版本元数据记录,不实际上传文件。 适用于外部托管或稍后上传文件的场景。 """ with get_db() as db: cur = db.execute( """ INSERT INTO firmware_versions (version, firmware_type, board_model, device_model, hw_range, upgrade_type, signed, notes, status, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( payload.version, payload.firmware_type, payload.board_model or "", payload.device_model or "", payload.hw_range or "", payload.upgrade_type, int(payload.signed), json.dumps(payload.notes or [], ensure_ascii=False), payload.status, now_str(), ), ) fw_id = cur.lastrowid db.commit() return {"success": True, "id": fw_id, "message": "固件版本记录创建成功"} @app.get("/api/firmware", summary="查看固件版本列表") def list_firmware( firmware_type: Optional[str] = Query(None, description="固件类型过滤"), device_model: Optional[str] = Query(None, description="设备型号过滤"), board_model: Optional[str] = Query(None, description="板卡型号过滤"), status: Optional[str] = Query(None, description="状态过滤"), discover: bool = Query(False, description="是否同时扫描实际目录发现未入库固件"), ): """获取固件版本列表,支持按固件类型、设备型号、板卡型号、状态过滤。 当 discover=true 时,会扫描 public/uploads/GD/firmware/ 目录, 将实际存在但数据库中无记录的文件也返回(id 为 0 表示仅存在于目录)。 """ with get_db() as db: sql = "SELECT * FROM firmware_versions WHERE 1=1" params = [] if firmware_type: sql += " AND firmware_type = ?" params.append(firmware_type) if device_model: sql += " AND device_model = ?" params.append(device_model) if board_model: sql += " AND board_model = ?" params.append(board_model) if status: sql += " AND status = ?" params.append(status) sql += " ORDER BY created_at DESC" rows = db.execute(sql, params).fetchall() result = [] for r in rows: row = dict(r) if row.get("notes"): try: row["notes"] = json.loads(row["notes"]) except Exception: row["notes"] = [] else: row["notes"] = [] result.append(row) # 如果开启发现模式,扫描目录并合并 if discover: scanned = scan_firmware_directory() db_file_names = {r.get("file_name", "") for r in result} for item in scanned: if item["file_name"] not in db_file_names: if firmware_type and item["type"] != firmware_type: continue result.append({ "id": 0, "version": item["version"], "firmware_type": item["type"], "board_model": "", "device_model": "", "file_name": item["file_name"], "file_path": item["file_path"], "file_size": item["file_size"], "hw_range": "", "upgrade_type": "可选", "signed": 0, "notes": [], "status": "已发布", "created_at": None, "updated_at": None, }) return {"success": True, "data": result, "total": len(result)} @app.get("/api/firmware/{fw_id}", summary="查看单个固件版本详情") def get_firmware_detail(fw_id: int): """获取指定 ID 的固件版本详细信息。""" with get_db() as db: row = db.execute( "SELECT * FROM firmware_versions WHERE id = ?", (fw_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="固件版本不存在") data = dict(row) if data.get("notes"): try: data["notes"] = json.loads(data["notes"]) except Exception: data["notes"] = [] else: data["notes"] = [] return {"success": True, "data": data} @app.put("/api/firmware/{fw_id}", summary="更新固件版本信息") def update_firmware(fw_id: int, payload: FirmwareUpdateRequest): """更新固件版本的元数据信息。""" with get_db() as db: row = db.execute( "SELECT id FROM firmware_versions WHERE id = ?", (fw_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="固件版本不存在") updates = [] params = [] fields = [ ("version", payload.version), ("firmware_type", payload.firmware_type), ("board_model", payload.board_model), ("device_model", payload.device_model), ("hw_range", payload.hw_range), ("upgrade_type", payload.upgrade_type), ("signed", int(payload.signed) if payload.signed is not None else None), ("status", payload.status), ] for field, value in fields: if value is not None: updates.append(f"{field} = ?") params.append(value) if payload.notes is not None: updates.append("notes = ?") params.append(json.dumps(payload.notes, ensure_ascii=False)) if not updates: return {"success": True, "message": "没有要更新的字段"} updates.append("updated_at = ?") params.append(now_str()) params.append(fw_id) db.execute( f"UPDATE firmware_versions SET {', '.join(updates)} WHERE id = ?", params, ) db.commit() return {"success": True, "message": "固件版本更新成功"} @app.delete("/api/firmware/{fw_id}", summary="删除固件版本") def delete_firmware(fw_id: int): """删除指定固件版本记录,同时删除服务器上的固件文件。""" with get_db() as db: row = db.execute( "SELECT file_path FROM firmware_versions WHERE id = ?", (fw_id,) ).fetchone() if not row: raise HTTPException(status_code=404, detail="固件版本不存在") file_path = row["file_path"] db.execute("DELETE FROM firmware_versions WHERE id = ?", (fw_id,)) db.commit() delete_file_if_exists(file_path) return {"success": True, "message": "固件版本已删除"} @app.get("/api/firmware/download/{file_name}", summary="下载固件文件") def download_firmware(file_name: str): """ 下载固件文件。 APP 或设备可通过此接口从平台下载固件进行升级。 优先从 public/uploads/GD/firmware/ 各子目录查找,回退到旧 uploads/firmware/。 """ # 优先在新目录结构查找 for folder in ["", "CJB", "FSB", "XCL"]: file_path = os.path.join(FIRMWARE_PUBLIC_DIR, folder, file_name) if os.path.exists(file_path): return FileResponse( path=file_path, filename=file_name, media_type="application/octet-stream", ) # 回退到旧路径 file_path = os.path.join(UPLOAD_DIR, "firmware", file_name) if os.path.exists(file_path): return FileResponse( path=file_path, filename=file_name, media_type="application/octet-stream", ) raise HTTPException(status_code=404, detail="文件不存在") @app.get("/api/firmware/scan", summary="扫描固件目录") def scan_firmware(): """扫描 public/uploads/GD/firmware/ 目录,返回实际存在的固件文件列表。""" return {"success": True, "data": scan_firmware_directory()} @app.post("/api/firmware/check-update", response_model=FirmwareCheckUpdateResponse, summary="检查固件更新") def check_firmware_update(payload: FirmwareCheckUpdateRequest): """ 设备调用此接口检查固件是否有新版本。 根据固件类型、设备型号、板卡型号匹配最新已发布的固件版本, 返回是否有更新、是否强制升级、下载地址等信息。 """ with get_db() as db: sql = """ SELECT * FROM firmware_versions WHERE firmware_type = ? AND status = '已发布' """ params = [payload.firmware_type] if payload.device_model: sql += " AND (device_model = ? OR device_model = '' OR device_model IS NULL)" params.append(payload.device_model) if payload.board_model: sql += " AND (board_model = ? OR board_model = '' OR board_model IS NULL)" params.append(payload.board_model) sql += " ORDER BY created_at DESC LIMIT 1" row = db.execute(sql, params).fetchone() if not row: return FirmwareCheckUpdateResponse( has_update=False, force_update=False, message="没有找到匹配的固件版本", ) latest = dict(row) current_version = payload.current_version.strip("vV") latest_version = latest.get("version", "").strip("vV") # 简单版本号比较(假设版本格式为 x.y.z) def parse_version(v: str): parts = v.split(".") return [int(p) if p.isdigit() else 0 for p in parts] + [0, 0, 0] has_update = parse_version(latest_version) > parse_version(current_version) if not has_update: return FirmwareCheckUpdateResponse( has_update=False, force_update=False, message="当前已是最新固件版本", ) notes = [] if latest.get("notes"): try: notes = json.loads(latest["notes"]) except Exception: notes = [] download_url = f"/api/firmware/download/{latest.get('file_name', '')}" return FirmwareCheckUpdateResponse( has_update=True, force_update=latest.get("upgrade_type") == "强制", version=latest.get("version"), download_url=download_url, notes=notes, file_size=latest.get("file_size"), message="发现新固件版本,请下载升级", ) # ═══════════════════════════════════════════════════════════════ # 辅助接口(方便调试和管理) # ═══════════════════════════════════════════════════════════════ class ReferenceValueCreate(BaseModel): category: str code: str label: str sort_order: int = 0 status: str = "启用" description: Optional[str] = "" extra: Optional[dict] = None @app.get("/api/reference-values", summary="获取字典值列表") def list_reference_values( category: Optional[str] = Query(None, description="字典分类过滤"), status: str = Query("启用", description="状态过滤"), ): """获取通用字典值列表,支持按分类和状态过滤。""" with get_db() as db: sql = "SELECT * FROM reference_values WHERE status = ?" params = [status] if category: sql += " AND category = ?" params.append(category) sql += " ORDER BY category, sort_order, id" rows = db.execute(sql, params).fetchall() result = [] for r in rows: row = dict(r) try: row["extra"] = json.loads(row.get("extra", "{}")) if row.get("extra") else {} except Exception: row["extra"] = {} result.append(row) return {"success": True, "data": result, "total": len(result)} @app.post("/api/reference-values", summary="创建字典值") def create_reference_value(payload: ReferenceValueCreate): """创建新的字典值。""" with get_db() as db: try: cur = db.execute( """ INSERT INTO reference_values (category, code, label, sort_order, status, description, extra) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( payload.category, payload.code, payload.label, payload.sort_order, payload.status, payload.description or "", json.dumps(payload.extra or {}, ensure_ascii=False), ), ) db.commit() return {"success": True, "id": cur.lastrowid, "message": "字典值创建成功"} except Exception: raise HTTPException(status_code=400, detail="字典值已存在(category + code 重复)") @app.put("/api/reference-values/{ref_id}", summary="更新字典值") def update_reference_value(ref_id: int, payload: ReferenceValueCreate): """更新字典值。""" with get_db() as db: row = db.execute("SELECT id FROM reference_values WHERE id = ?", (ref_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="字典值不存在") db.execute( """ UPDATE reference_values SET category = ?, code = ?, label = ?, sort_order = ?, status = ?, description = ?, extra = ? WHERE id = ? """, ( payload.category, payload.code, payload.label, payload.sort_order, payload.status, payload.description or "", json.dumps(payload.extra or {}, ensure_ascii=False), ref_id, ), ) db.commit() return {"success": True, "message": "字典值更新成功"} @app.delete("/api/reference-values/{ref_id}", summary="删除字典值") def delete_reference_value(ref_id: int): """删除字典值。""" with get_db() as db: row = db.execute("SELECT id FROM reference_values WHERE id = ?", (ref_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="字典值不存在") db.execute("DELETE FROM reference_values WHERE id = ?", (ref_id,)) db.commit() return {"success": True, "message": "字典值已删除"} # ═══════════════════════════════════════════════════════════════ # 授权管理接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/auth-items", summary="获取授权项定义列表") def list_auth_items(): with get_db() as db: rows = db.execute("SELECT * FROM auth_items WHERE status = '启用' ORDER BY sort_order, id").fetchall() return [dict(r) for r in rows] @app.get("/api/license-templates", summary="获取授权模板列表") def list_license_templates(model_code: str = Query("", description="按型号筛选")): with get_db() as db: if model_code: rows = db.execute("SELECT * FROM license_templates WHERE model_code = ? AND status = '启用' ORDER BY id", (model_code,)).fetchall() else: rows = db.execute("SELECT * FROM license_templates WHERE status = '启用' ORDER BY id").fetchall() result = [] for r in rows: item = dict(r) try: item["auth_items"] = json.loads(item.get("auth_items", "[]") or "[]") except Exception: item["auth_items"] = [] result.append(item) return result @app.get("/api/licenses", summary="获取授权记录列表") def list_licenses(): with get_db() as db: rows = db.execute("SELECT * FROM licenses ORDER BY id").fetchall() return [dict(r) for r in rows] @app.post("/api/licenses", summary="创建授权记录") def create_license(body: dict = Body(...)): model = body.get("model", "") modules = body.get("modules", "") expiry = body.get("expiry", "") status = body.get("status", "生效") config_id = body.get("config_id") device_sn = body.get("device_sn", "") license_file = body.get("license_file", "") with get_db() as db: db.execute( """ INSERT INTO licenses (model, modules, expiry, status, config_id, license_file, device_sn, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, (model, modules, expiry, status, config_id, license_file, device_sn, now_str(), now_str()), ) db.commit() return {"success": True, "message": "授权记录创建成功"} @app.put("/api/licenses", summary="更新授权记录") def update_license(body: dict = Body(...)): id_ = body.get("id") updates = [] params = [] for field in ["model", "modules", "expiry", "status", "config_id", "license_file", "device_sn"]: if field in body: updates.append(f"{field} = ?") params.append(body[field]) if not updates: return {"success": True, "message": "没有要更新的字段"} updates.append("updated_at = ?") params.append(now_str()) params.append(id_) with get_db() as db: db.execute(f"UPDATE licenses SET {', '.join(updates)} WHERE id = ?", params) db.commit() return {"success": True, "message": "授权记录更新成功"} @app.delete("/api/licenses", summary="删除授权记录") def delete_license(body: dict = Body(...)): id_ = body.get("id") with get_db() as db: db.execute("DELETE FROM licenses WHERE id = ?", (id_,)) db.commit() return {"success": True, "message": "授权记录已删除"} @app.post("/api/licenses/verify", summary="验证授权文件") def verify_license(sn: str, encrypted_license: str): """ 调试验证:传入加密授权文件和设备 SN,验证能否正确解密。 用于测试一机一密绑定是否生效。 """ decrypted = decrypt_license(encrypted_license, sn) if decrypted: return { "valid": True, "decrypted": decrypted, "message": "授权文件解密成功,SN 匹配" } return { "valid": False, "decrypted": None, "message": "授权文件无效或 SN 不匹配" } # ═══════════════════════════════════════════════════════════════ # 7. 仪表盘接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/dashboard", summary="获取首页统计数据") def get_dashboard(): """获取首页各类统计数据""" with get_db() as db: def q(sql, params=()): return db.execute(sql, params).fetchone()[0] devices = { "total": q("SELECT COUNT(*) FROM devices"), "assembling": q("SELECT COUNT(*) FROM devices WHERE status = '装配中'"), "activated": q("SELECT COUNT(*) FROM devices WHERE status = '已激活'"), "shipped": q("SELECT COUNT(*) FROM devices WHERE status = '已出厂'"), } materials = { "total": q("SELECT COUNT(*) FROM materials"), "inStock": q("SELECT COUNT(*) FROM materials WHERE status = '在库'"), "assembled": q("SELECT COUNT(*) FROM materials WHERE status = '已装配'"), "faulty": q("SELECT COUNT(*) FROM materials WHERE status = '故障'"), } repair = { "total": q("SELECT COUNT(*) FROM repair_orders"), "pending": q("SELECT COUNT(*) FROM repair_orders WHERE status = '待处理'"), "processing": q("SELECT COUNT(*) FROM repair_orders WHERE status = '处理中'"), "done": q("SELECT COUNT(*) FROM repair_orders WHERE status = '已处理'"), } scrap = { "total": q("SELECT COUNT(*) FROM scrap_records"), "pendingApproval": q("SELECT COUNT(*) FROM scrap_records WHERE status = '待审批'"), } firmware = {"total": q("SELECT COUNT(*) FROM firmware_versions")} licenses = {"total": q("SELECT COUNT(*) FROM licenses")} recentRepairs = [ dict(r) for r in db.execute( "SELECT id, sn, fault_type, status, priority, create_date FROM repair_orders WHERE status != '已处理' ORDER BY create_date DESC LIMIT 4" ).fetchall() ] recentScraps = [ dict(r) for r in db.execute( "SELECT id, sn, model, status, date FROM scrap_records WHERE status = '待审批' OR status = '审批中' ORDER BY date DESC LIMIT 4" ).fetchall() ] return { "devices": devices, "materials": materials, "repair": repair, "scrap": scrap, "firmware": firmware, "licenses": licenses, "recentRepairs": recentRepairs, "recentScraps": recentScraps, } # ═══════════════════════════════════════════════════════════════ # 8. 设备详情接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/devices/detail", summary="获取设备详细信息") def get_device_detail(sn: str = Query(..., description="设备SN")): """获取设备详细信息(含BOM、日志、授权、配置、Checklist)""" with get_db() as db: device = db.execute("SELECT * FROM devices WHERE sn = ?", (sn,)).fetchone() if not device: raise HTTPException(status_code=404, detail="设备不存在") bom = [dict(r) for r in db.execute( "SELECT * FROM device_bom_records WHERE device_sn = ?", (sn,) ).fetchall()] logs = [dict(r) for r in db.execute( "SELECT * FROM device_logs WHERE device_sn = ? ORDER BY date DESC", (sn,) ).fetchall()] license_row = db.execute( "SELECT * FROM licenses WHERE device_sn = ? OR model = ? ORDER BY id DESC LIMIT 1", (sn, dict(device).get("model", "")) ).fetchone() config_row = db.execute( "SELECT * FROM config_files WHERE model = ? AND status = '生效' ORDER BY id DESC LIMIT 1", (dict(device).get("model", ""),) ).fetchone() # 获取该型号的 checklist 模板,并与设备实际检查记录合并 device_model = dict(device).get("model", "") templates = db.execute( "SELECT name, required FROM checklist_templates WHERE model_code = ? ORDER BY sort_order ASC", (device_model,) ).fetchall() records = db.execute( "SELECT checklist_name, passed, photos, note FROM device_checklist_records WHERE device_sn = ?", (sn,) ).fetchall() record_map = {r["checklist_name"]: dict(r) for r in records} checklist = [] for t in templates: name = t["name"] rec = record_map.get(name, {}) photos = rec.get("photos", "[]") try: photos = json.loads(photos) if photos else [] except Exception: photos = [] checklist.append({ "name": name, "required": bool(t["required"]), "passed": bool(rec.get("passed", 0)), "photos": photos, "note": rec.get("note", ""), }) return { "device": dict(device), "bom": bom, "logs": logs, "license": dict(license_row) if license_row else None, "config": dict(config_row) if config_row else None, "checklist": checklist, } @app.post("/api/devices/detail", summary="保存设备BOM记录和操作日志") def save_device_detail(body: dict = Body(...)): # 实际实现需要解析body return {"success": True, "message": "设备详情保存成功"} # ═══════════════════════════════════════════════════════════════ # 9. 设备型号接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/models", summary="获取所有设备型号") def list_models(): with get_db() as db: rows = db.execute("SELECT * FROM device_models ORDER BY id DESC").fetchall() return [dict(r) for r in rows] @app.post("/api/models", summary="创建设备型号") def create_model(body: dict = Body(...)): name = body.get("name", "") code = body.get("code", "") status = body.get("status", "在产") description = body.get("description", "") with get_db() as db: db.execute( "INSERT INTO device_models (name, code, status, description, create_date) VALUES (?, ?, ?, ?, ?)", (name, code, status, description, now_date_str()), ) db.commit() return {"success": True, "message": "设备型号创建成功"} @app.put("/api/models", summary="更新设备型号状态") def update_model(body: dict = Body(...)): id_ = body.get("id") status = body.get("status") with get_db() as db: db.execute("UPDATE device_models SET status = ? WHERE id = ?", (status, id_)) db.commit() return {"success": True, "message": "设备型号更新成功"} # ═══════════════════════════════════════════════════════════════ # 10. BOM模板接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/models/bom", summary="获取BOM模板") def list_bom(model: Optional[str] = Query(None, description="设备型号编码")): with get_db() as db: sql = "SELECT * FROM bom_templates WHERE 1=1" params = [] if model: sql += " AND model_code = ?" params.append(model) rows = db.execute(sql, params).fetchall() result = [] for r in rows: item = dict(r) try: item["versions"] = json.loads(item.get("versions", "[]")) except Exception: item["versions"] = [] result.append(item) return {"success": True, "data": result} @app.post("/api/models/bom", summary="添加BOM模板项") def create_bom(body: dict = Body(...)): with get_db() as db: db.execute( """ INSERT INTO bom_templates (model_code, name, material_name, model, versions, qty, required, need_calibration, enforce_version_match) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( body.get("model_code", ""), body.get("name", ""), body.get("material_name", ""), body.get("model", ""), json.dumps(body.get("versions", [])), body.get("qty", 1), 1 if body.get("required") else 0, 1 if body.get("need_calibration") else 0, 1 if body.get("enforce_version_match") else 0, ), ) db.commit() return {"success": True, "message": "BOM模板项添加成功"} @app.delete("/api/models/bom", summary="删除BOM模板项") def delete_bom(id: int = Query(..., description="BOM项ID")): with get_db() as db: db.execute("DELETE FROM bom_templates WHERE id = ?", (id,)) db.commit() return {"success": True, "message": "BOM模板项已删除"} # ═══════════════════════════════════════════════════════════════ # 11. Checklist模板接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/models/checklist", summary="获取装配Checklist") def list_checklist(model: Optional[str] = Query(None, description="设备型号编码")): with get_db() as db: sql = "SELECT * FROM checklist_templates WHERE 1=1" params = [] if model: sql += " AND model_code = ?" params.append(model) sql += " ORDER BY sort_order ASC" rows = db.execute(sql, params).fetchall() return [dict(r) for r in rows] @app.post("/api/models/checklist", summary="批量保存Checklist模板") def save_checklist(body: dict = Body(...)): model_code = body.get("model_code", "") items = body.get("items", []) with get_db() as db: db.execute("DELETE FROM checklist_templates WHERE model_code = ?", (model_code,)) for idx, item in enumerate(items): db.execute( "INSERT INTO checklist_templates (model_code, name, required, standard, sort_order) VALUES (?, ?, ?, ?, ?)", (model_code, item.get("name", ""), 1 if item.get("required") else 0, item.get("standard", ""), idx), ) db.commit() return {"success": True, "message": "Checklist模板保存成功"} # ═══════════════════════════════════════════════════════════════ # 12. 物料管理接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/materials", summary="获取物料列表") def list_materials(): with get_db() as db: rows = db.execute("SELECT * FROM materials ORDER BY id DESC").fetchall() return [dict(r) for r in rows] @app.post("/api/materials", summary="登记物料") def create_material(body: dict = Body(...)): with get_db() as db: db.execute( """ INSERT INTO materials (sn, name, category, type, device_model, version, description, firmware, status, device_sn, production_date, calib_status, calib_date) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( body.get("sn", ""), body.get("name", ""), body.get("category", ""), body.get("type", ""), body.get("device_model", ""), body.get("version", ""), body.get("description", ""), body.get("firmware", "-"), body.get("status", "在库"), body.get("device_sn", "-"), body.get("production_date", ""), body.get("calib_status", "-"), body.get("calib_date", "-"), ), ) db.commit() return {"success": True, "message": "物料登记成功"} @app.put("/api/materials", summary="更新物料信息") def update_material(body: dict = Body(...)): id_ = body.get("id") with get_db() as db: db.execute( """ UPDATE materials SET sn=?, name=?, category=?, type=?, device_model=?, version=?, description=?, firmware=?, status=?, device_sn=?, calib_status=?, calib_date=? WHERE id=? """, ( body.get("sn", ""), body.get("name", ""), body.get("category", ""), body.get("type", ""), body.get("device_model", ""), body.get("version", ""), body.get("description", ""), body.get("firmware", "-"), body.get("status", "在库"), body.get("device_sn", "-"), body.get("calib_status", "-"), body.get("calib_date", "-"), id_, ), ) db.commit() return {"success": True, "message": "物料更新成功"} @app.delete("/api/materials", summary="删除物料") def delete_material(body: dict = Body(...)): id_ = body.get("id") with get_db() as db: db.execute("DELETE FROM materials WHERE id = ?", (id_,)) db.commit() return {"success": True, "message": "物料已删除"} # ═══════════════════════════════════════════════════════════════ # 13. 物料分类接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/material-categories", summary="获取物料分类列表") def list_material_categories(): with get_db() as db: rows = db.execute("SELECT * FROM material_categories ORDER BY sort_order ASC").fetchall() return [dict(r) for r in rows] @app.post("/api/material-categories", summary="创建物料分类") def create_material_category(body: dict = Body(...)): with get_db() as db: db.execute( "INSERT INTO material_categories (name, description, has_firmware, has_calibration, sort_order, status) VALUES (?, ?, ?, ?, ?, ?)", ( body.get("name", ""), body.get("description", ""), 1 if body.get("has_firmware") else 0, 1 if body.get("has_calibration") else 0, body.get("sort_order", 0), body.get("status", "启用"), ), ) db.commit() return {"success": True, "message": "物料分类创建成功"} @app.put("/api/material-categories", summary="更新物料分类") def update_material_category(body: dict = Body(...)): id_ = body.get("id") with get_db() as db: db.execute( "UPDATE material_categories SET name=?, description=?, has_firmware=?, has_calibration=?, sort_order=?, status=? WHERE id=?", ( body.get("name", ""), body.get("description", ""), 1 if body.get("has_firmware") else 0, 1 if body.get("has_calibration") else 0, body.get("sort_order", 0), body.get("status", "启用"), id_, ), ) db.commit() return {"success": True, "message": "物料分类更新成功"} @app.delete("/api/material-categories", summary="删除物料分类") def delete_material_category(id: int = Query(..., description="分类ID")): with get_db() as db: db.execute("DELETE FROM material_categories WHERE id = ?", (id,)) db.commit() return {"success": True, "message": "物料分类已删除"} # ═══════════════════════════════════════════════════════════════ # 14. 板卡类型接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/board-types", summary="获取板卡类型列表") def list_board_types(): with get_db() as db: rows = db.execute("SELECT * FROM board_types ORDER BY id DESC").fetchall() result = [] for r in rows: item = dict(r) try: item["deviceModels"] = json.loads(item.get("device_models", "[]")) except Exception: item["deviceModels"] = [] result.append(item) return result @app.post("/api/board-types", summary="创建板卡类型") def create_board_type(body: dict = Body(...)): with get_db() as db: db.execute( "INSERT INTO board_types (name, category, device_models, description, status) VALUES (?, ?, ?, ?, ?)", ( body.get("name", ""), body.get("category", ""), json.dumps(body.get("deviceModels", [])), body.get("description", ""), body.get("status", "启用"), ), ) db.commit() return {"success": True, "message": "板卡类型创建成功"} @app.put("/api/board-types", summary="更新板卡类型") def update_board_type(body: dict = Body(...)): id_ = body.get("id") with get_db() as db: db.execute( "UPDATE board_types SET name=?, category=?, device_models=?, description=?, status=? WHERE id=?", ( body.get("name", ""), body.get("category", ""), json.dumps(body.get("deviceModels", [])), body.get("description", ""), body.get("status", "启用"), id_, ), ) db.commit() return {"success": True, "message": "板卡类型更新成功"} @app.delete("/api/board-types", summary="删除板卡类型") def delete_board_type(id: int = Query(..., description="板卡类型ID")): with get_db() as db: db.execute("DELETE FROM board_types WHERE id = ?", (id,)) db.commit() return {"success": True, "message": "板卡类型已删除"} # ═══════════════════════════════════════════════════════════════ # 15. 板卡版本接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/boards", summary="获取板卡版本列表") def list_boards(): with get_db() as db: rows = db.execute("SELECT * FROM board_versions ORDER BY id DESC").fetchall() return [dict(r) for r in rows] @app.post("/api/boards", summary="创建板卡版本") def create_board(body: dict = Body(...)): with get_db() as db: db.execute( "INSERT INTO board_versions (type, version, status) VALUES (?, ?, ?)", (body.get("type", ""), body.get("version", ""), body.get("status", "在产")), ) db.commit() return {"success": True, "message": "板卡版本创建成功"} @app.put("/api/boards", summary="更新板卡版本状态") def update_board(body: dict = Body(...)): id_ = body.get("id") status = body.get("status") with get_db() as db: db.execute("UPDATE board_versions SET status = ? WHERE id = ?", (status, id_)) db.commit() return {"success": True, "message": "板卡版本更新成功"} # ═══════════════════════════════════════════════════════════════ # 16. 维修工单接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/repair", summary="获取维修工单列表") def list_repair(): with get_db() as db: rows = db.execute("SELECT * FROM repair_orders ORDER BY create_date DESC").fetchall() return [dict(r) for r in rows] @app.post("/api/repair", summary="创建维修工单") def create_repair(body: dict = Body(...)): with get_db() as db: db.execute( """ INSERT INTO repair_orders (id, sn, fault_type, status, priority, assignee, create_date, description) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, ( body.get("id", ""), body.get("sn", ""), body.get("fault_type", ""), body.get("status", "待处理"), body.get("priority", "中"), body.get("assignee", ""), body.get("create_date", now_date_str()), body.get("description", ""), ), ) db.commit() return {"success": True, "message": "维修工单创建成功"} @app.get("/api/repair/{order_id}/process-records", summary="获取维修处理记录") def list_repair_process_records(order_id: str): with get_db() as db: rows = db.execute( "SELECT * FROM repair_process_records WHERE order_id = ? ORDER BY date DESC", (order_id,) ).fetchall() return [dict(r) for r in rows] @app.get("/api/repair/{order_id}/board-replacements", summary="获取维修板卡更换记录") def list_repair_board_replacements(order_id: str): with get_db() as db: rows = db.execute( "SELECT * FROM repair_board_replacements WHERE order_id = ? ORDER BY date DESC", (order_id,) ).fetchall() return [dict(r) for r in rows] # ═══════════════════════════════════════════════════════════════ # 17. 报废回收接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/scrap", summary="获取报废记录列表") def list_scrap(): with get_db() as db: rows = db.execute("SELECT * FROM scrap_records ORDER BY date DESC").fetchall() result = [] for r in rows: item = dict(r) try: item["materials"] = json.loads(item.get("materials", "[]")) except Exception: item["materials"] = [] result.append(item) return result @app.post("/api/scrap", summary="创建报废记录") def create_scrap(body: dict = Body(...)): with get_db() as db: db.execute( """ INSERT INTO scrap_records (sn, model, reason, applicant, status, order_id, date, value, materials) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( body.get("sn", ""), body.get("model", ""), body.get("reason", ""), body.get("applicant", ""), body.get("status", "待审批"), body.get("order_id", ""), body.get("date", now_date_str()), body.get("value", 0), json.dumps(body.get("materials", [])), ), ) db.commit() return {"success": True, "message": "报废记录创建成功"} # ═══════════════════════════════════════════════════════════════ # 18. 更新日志接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/update-logs", summary="获取更新日志列表") def list_update_logs(): with get_db() as db: rows = db.execute("SELECT * FROM update_logs ORDER BY created_at DESC").fetchall() return [dict(r) for r in rows] @app.post("/api/update-logs", summary="创建更新日志") def create_update_log(body: dict = Body(...)): with get_db() as db: db.execute( "INSERT INTO update_logs (title, content, category, version) VALUES (?, ?, ?, ?)", (body.get("title", ""), body.get("content", ""), body.get("category", "feature"), body.get("version", "")), ) db.commit() return {"success": True, "message": "更新日志创建成功"} @app.delete("/api/update-logs", summary="删除更新日志") def delete_update_log(id: int = Query(..., description="日志ID")): with get_db() as db: db.execute("DELETE FROM update_logs WHERE id = ?", (id,)) db.commit() return {"success": True, "message": "更新日志已删除"} # ═══════════════════════════════════════════════════════════════ # 19. 通用文件上传接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/upload", summary="通用文件上传") async def upload_file( file: UploadFile = File(..., description="待上传文件"), folder: str = Form("apps", description="存储子目录"), ): """通用文件上传接口,文件保存到 uploads/{folder}/ 目录""" file_path, file_name, file_size, md5 = save_upload_file(file, folder) return { "url": f"/uploads/{folder}/{file_name}", "fileName": file.filename or "unknown", "fileSize": file_size, "fileType": file.content_type or "application/octet-stream", } # ═══════════════════════════════════════════════════════════════ # 20. 授权下载与预览接口 # ═══════════════════════════════════════════════════════════════ @app.get("/api/licenses/download", summary="APP下载授权文件") def download_license(sn: str = Query(..., description="设备SN")): """APP专用接口:根据设备SN获取授权文件JSON""" with get_db() as db: row = db.execute( "SELECT * FROM licenses WHERE device_sn = ? OR model = ? ORDER BY id DESC LIMIT 1", (sn, "") ).fetchone() if not row: raise HTTPException(status_code=404, detail="未找到授权记录") license_data = dict(row) return { "success": True, "license": license_data, "message": "授权文件获取成功", } @app.get("/api/licenses/{license_id}/preview", summary="预览授权文件") def preview_license(license_id: int): """预览指定授权文件的完整JSON内容(包含授权项+配置文件+数字签名)""" with get_db() as db: row = db.execute("SELECT * FROM licenses WHERE id = ?", (license_id,)).fetchone() if not row: raise HTTPException(status_code=404, detail="授权记录不存在") license = dict(row) # 授权项映射表 AUTH_ITEM_MAP = { "一维自电/电阻率/激电测试模块": {"id": "1D", "category": "一维"}, "二维自电/电阻率/激电测试模块": {"id": "2D", "category": "二维"}, "三维自电/电阻率/激电测试模块": {"id": "3D", "category": "三维"}, "水上": {"id": "WATER", "category": "水上"}, "跨孔": {"id": "CROSS", "category": "跨孔"}, "电流场法": {"id": "CF", "category": "电流场法"}, } # 解析授权模块列表 modules_raw = license.get("modules", "") module_names = [] if modules_raw: try: parsed = json.loads(modules_raw) if isinstance(parsed, list): module_names = [str(m) for m in parsed] except Exception: module_names = [m.strip() for m in modules_raw.split(",") if m.strip()] auth_modules = [] for name in module_names: mapped = AUTH_ITEM_MAP.get(name, {}) auth_modules.append({ "id": mapped.get("id", ""), "name": name, "category": mapped.get("category", ""), "enabled": True, }) # 读取关联的配置文件 config = {} config_id = license.get("config_id") if config_id: with get_db() as db: cfg = db.execute("SELECT * FROM config_files WHERE id = ?", (config_id,)).fetchone() if cfg: cfg = dict(cfg) config = { "name": cfg.get("name", ""), "version": cfg.get("version", ""), "emissionParams": { "maxVoltage": cfg.get("max_tx_voltage", ""), "maxCurrent": cfg.get("max_tx_current", ""), "waveform": cfg.get("tx_waveform", ""), "pulseWidth": cfg.get("tx_pulse_width", ""), }, "acquisitionParams": { "channels": cfg.get("acq_channels", ""), "sampleRate": cfg.get("acq_sample_rate", ""), "voltageRange": cfg.get("acq_voltage_range", ""), "fullWaveform": cfg.get("full_waveform_capture", ""), }, "networkParams": { "wifiSSIDPrefix": cfg.get("ssid_prefix", ""), }, } # 组装授权文件主体(不含签名) now = datetime.utcnow().isoformat() + "Z" expiry = license.get("expiry", "") valid_until = expiry if expiry else (datetime.utcnow().replace(year=datetime.utcnow().year + 1)).strftime("%Y-%m-%d") license_data = { "version": "1.0", "generatedAt": now, "deviceModel": license.get("model", ""), "deviceSN": license.get("device_sn", ""), "validUntil": valid_until, "status": "active" if license.get("status") == "生效" else "inactive", "authModules": auth_modules, } if config: license_data["config"] = config # 计算 SHA256 签名 json_string = json.dumps(license_data, ensure_ascii=False, separators=(",", ":")) signature_value = hashlib.sha256(json_string.encode("utf-8")).hexdigest() final_license = { **license_data, "signature": { "algorithm": "SHA256", "value": signature_value, "publicKey": "platform-public-key-placeholder", }, } return {"success": True, "data": final_license} # ═══════════════════════════════════════════════════════════════ # 21. 种子数据接口 # ═══════════════════════════════════════════════════════════════ @app.post("/api/seed", summary="一键填充演示数据") def seed_data(): """清空现有数据后重新生成演示数据(仅用于测试环境)""" seed_demo_data() return {"success": True, "message": "演示数据已填充"} # ═══════════════════════════════════════════════════════════════ # 运行入口(直接 python main.py 启动) # ═══════════════════════════════════════════════════════════════ if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="localhost", port=8000, reload=True)