351 lines
14 KiB
Python
351 lines
14 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
RADAR_TYPE_MALAMIRA 转换插件(本地原型)。
|
||
|
||
把 Mala Mira rSlicer 原始三件套(.rad + .rd3|.rd7 + _G01.pos)转换成客户端规范化
|
||
格式(.head + .data + .cor),并提供 probe 子命令出图核对 .rd3 数据体主序。
|
||
|
||
本工具实现的 convert 契约 = 未来"服务端下发插件"的接口:
|
||
plugin_id : RADAR_TYPE_MALAMIRA
|
||
supports(fileset) -> bool
|
||
convert(lineDir, prefix, outDir) -> {head, data, cor}
|
||
|
||
字段映射规则见客户《雷达业务开发说明》§3.3 / §2.2.2 / §3.5。与文档的少量偏差见 README.md。
|
||
"""
|
||
|
||
import argparse
|
||
import os
|
||
import shutil
|
||
import sys
|
||
|
||
import numpy as np
|
||
|
||
PLUGIN_ID = "RADAR_TYPE_MALAMIRA"
|
||
|
||
# 规范化 .head 字段顺序(三维雷达,文档 §1.2.2)。
|
||
HEAD_FIELD_ORDER = [
|
||
"DATE", "START_TIME", "STOP_TIME", "UNITS", "MODE", "ANTENNAS", "FREQUENCY",
|
||
"STACKS", "LAST_TRACE", "POSITIVE_DIRECTION", "SAMPLES", "TIME_INTERVAL",
|
||
"TIMEWINDOW", "DEPTH", "ZERO_POSITION", "DIELECTRIC", "SOIL_TYPE", "BITS",
|
||
"MARK", "DISTANCE_INTERVAL", "START_POSITION", "STOP_POSITION", "WHEEL_GPS",
|
||
"WHEEL_CALIBRATION", "SCAN_SECOND", "NUMBER_OF_CH", "CH_X_OFFSETS",
|
||
"RTK_X_OFFSET", "RTK_Y_OFFSET", "RTK_Z_OFFSET", "GAIN", "FILTER", "SMOOTH",
|
||
"ENDIAN_TYPE",
|
||
]
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 解析 .rad
|
||
# ---------------------------------------------------------------------------
|
||
def parse_rad(rad_path):
|
||
"""读 Mala .rad(ASCII,KEY:VALUE 行)→ dict(保留原始键,值去首尾空白)。"""
|
||
raw = {}
|
||
with open(rad_path, "r", encoding="utf-8", errors="replace") as f:
|
||
for line in f:
|
||
if ":" not in line:
|
||
continue
|
||
key, _, val = line.partition(":")
|
||
raw[key.strip()] = val.strip()
|
||
return raw
|
||
|
||
|
||
def _f(raw, key, default=None):
|
||
v = raw.get(key, "")
|
||
if v == "" or v is None:
|
||
return default
|
||
try:
|
||
return float(v)
|
||
except ValueError:
|
||
return default
|
||
|
||
|
||
def _i(raw, key, default=None):
|
||
v = _f(raw, key, None)
|
||
return int(round(v)) if v is not None else default
|
||
|
||
|
||
def compute_dims(raw, data_path):
|
||
"""从 .rad + 数据文件大小推导体维度并做一致性校验。
|
||
|
||
返回 dict:positions(K)=道/切片数, channels(M), samples(N),
|
||
last_trace(总扫描数=K*M), bits, bytes_per_sample。
|
||
"""
|
||
samples = _i(raw, "SAMPLES")
|
||
channels = _i(raw, "NUMBER_OF_CH")
|
||
last_trace = _i(raw, "LAST TRACE") # Mala 中 = 总扫描数(道 * 通道)
|
||
if not samples or not channels or not last_trace:
|
||
raise ValueError(
|
||
"缺少 SAMPLES / NUMBER_OF_CH / LAST TRACE,无法推导维度: %s" % data_path)
|
||
|
||
ext = os.path.splitext(data_path)[1].lower()
|
||
bytes_per_sample = 2 if ext == ".rd3" else 4 if ext == ".rd7" else None
|
||
if bytes_per_sample is None:
|
||
raise ValueError("未知数据扩展名(仅 .rd3/.rd7): %s" % data_path)
|
||
|
||
filesize = os.path.getsize(data_path)
|
||
expect = last_trace * samples * bytes_per_sample
|
||
if filesize != expect:
|
||
raise ValueError(
|
||
"数据体大小不符: %s 实际 %d 字节, 期望 LAST_TRACE(%d)*SAMPLES(%d)*%d = %d"
|
||
% (data_path, filesize, last_trace, samples, bytes_per_sample, expect))
|
||
|
||
if last_trace % channels != 0:
|
||
raise ValueError(
|
||
"LAST_TRACE(%d) 不能被 NUMBER_OF_CH(%d) 整除,无法切分道/通道"
|
||
% (last_trace, channels))
|
||
|
||
positions = last_trace // channels
|
||
return {
|
||
"positions": positions,
|
||
"channels": channels,
|
||
"samples": samples,
|
||
"last_trace": last_trace,
|
||
"bits": bytes_per_sample * 8,
|
||
"bytes_per_sample": bytes_per_sample,
|
||
"filesize": filesize,
|
||
}
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# .rad -> .head
|
||
# ---------------------------------------------------------------------------
|
||
def build_head(raw, dims):
|
||
"""按 §3.3 把 .rad 映射成规范化 .head 字段 dict。无对应字段留空。"""
|
||
ch_y = raw.get("CH_Y_OFFSETS", "").split()
|
||
head = {k: "" for k in HEAD_FIELD_ORDER}
|
||
head.update({
|
||
"DATE": raw.get("DATE", ""),
|
||
"START_TIME": raw.get("TIME", ""),
|
||
"UNITS": raw.get("UNITS", ""),
|
||
"MODE": "距离模式", # Mala 默认距离模式(§3.3 要点 4)
|
||
"ANTENNAS": raw.get("ANTENNAS", ""),
|
||
"FREQUENCY": raw.get("FREQUENCY", ""),
|
||
"STACKS": raw.get("STACKS", ""),
|
||
"LAST_TRACE": str(dims["last_trace"]),
|
||
"POSITIVE_DIRECTION": raw.get("POSITIVE DIRECTION", ""),
|
||
"SAMPLES": str(dims["samples"]),
|
||
"TIME_INTERVAL": raw.get("TIME INTERVAL", ""),
|
||
"TIMEWINDOW": raw.get("TIMEWINDOW", ""),
|
||
"BITS": str(dims["bits"]),
|
||
"DISTANCE_INTERVAL": raw.get("DISTANCE INTERVAL", ""),
|
||
"START_POSITION": raw.get("START POSITION", ""),
|
||
"STOP_POSITION": raw.get("STOP POSITION", ""),
|
||
"WHEEL_CALIBRATION": raw.get("WHEEL CALIBRATION", ""),
|
||
"NUMBER_OF_CH": str(dims["channels"]),
|
||
"CH_X_OFFSETS": raw.get("CH_X_OFFSETS", "").strip(),
|
||
"RTK_Y_OFFSET": ch_y[0] if ch_y else "", # §3.3:取 CH_Y_OFFSETS 首元素
|
||
"ENDIAN_TYPE": "1", # Mala rd3 小端(§3.3 要点 1)
|
||
})
|
||
return head
|
||
|
||
|
||
def write_head(head, out_path):
|
||
with open(out_path, "w", encoding="utf-8", newline="\n") as f:
|
||
for k in HEAD_FIELD_ORDER:
|
||
f.write("%s:%s\n" % (k, head.get(k, "")))
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# .pos -> .cor (§2.2.2 场景二)
|
||
# ---------------------------------------------------------------------------
|
||
def convert_pos_to_cor(pos_path, cor_path):
|
||
""".pos(本地坐标: 序号 北 东 高程) → .cor(序号 纬度 N 经度 E 高程 M 解状态=4)。
|
||
|
||
注:.pos 为本地投影坐标(米),按文档直接映射 北→纬度 / 东→经度;N/E/M 为占位标识,
|
||
解状态固定填 4(RTK Fixed)。单线渲染不依赖 .cor 做世界配准,多线阶段再用。
|
||
"""
|
||
rows = []
|
||
with open(pos_path, "r", encoding="utf-8", errors="replace") as f:
|
||
for line in f:
|
||
s = line.strip()
|
||
if not s or s.upper().startswith("UNITS"):
|
||
continue
|
||
parts = s.split()
|
||
if len(parts) < 4:
|
||
continue
|
||
idx = int(float(parts[0]))
|
||
north, east, elev = float(parts[1]), float(parts[2]), float(parts[3])
|
||
rows.append((idx, north, east, elev))
|
||
with open(cor_path, "w", encoding="utf-8", newline="\n") as f:
|
||
f.write("VERSION:1\n")
|
||
for idx, north, east, elev in rows:
|
||
f.write("%d\t%.6f\tN\t%.6f\tE\t%.6f\tM\t4\n" % (idx, north, east, elev))
|
||
return len(rows)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# 测线发现
|
||
# ---------------------------------------------------------------------------
|
||
def find_lines(line_dir):
|
||
"""遍历目录,返回有效测线 [(prefix, rad, data, pos|None)](§3.2 抽取规则)。"""
|
||
out = []
|
||
for name in sorted(os.listdir(line_dir)):
|
||
if not name.lower().endswith(".rad"):
|
||
continue
|
||
prefix = name[:-4]
|
||
rad = os.path.join(line_dir, name)
|
||
data = None
|
||
for ext in (".rd3", ".rd7"):
|
||
cand = os.path.join(line_dir, prefix + ext)
|
||
if os.path.exists(cand):
|
||
data = cand
|
||
break
|
||
if data is None:
|
||
print(" [跳过] %s 缺 .rd3/.rd7 数据文件" % prefix, file=sys.stderr)
|
||
continue
|
||
pos = os.path.join(line_dir, prefix + "_G01.pos")
|
||
out.append((prefix, rad, data, pos if os.path.exists(pos) else None))
|
||
return out
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# convert
|
||
# ---------------------------------------------------------------------------
|
||
def convert_line(prefix, rad, data, pos, out_dir):
|
||
raw = parse_rad(rad)
|
||
dims = compute_dims(raw, data)
|
||
os.makedirs(out_dir, exist_ok=True)
|
||
|
||
head = build_head(raw, dims)
|
||
write_head(head, os.path.join(out_dir, prefix + ".head"))
|
||
shutil.copyfile(data, os.path.join(out_dir, prefix + ".data")) # §3.5 原样拷贝
|
||
cor_n = convert_pos_to_cor(pos, os.path.join(out_dir, prefix + ".cor")) if pos else 0
|
||
|
||
print("[%s] 道(K)=%d 通道(M)=%d 采样(N)=%d bits=%d .data=%.1fMB .cor=%d点%s"
|
||
% (prefix, dims["positions"], dims["channels"], dims["samples"],
|
||
dims["bits"], dims["filesize"] / 1e6, cor_n,
|
||
"" if pos else " (无轨迹)"))
|
||
return dims
|
||
|
||
|
||
def cmd_convert(args):
|
||
if args.prefix:
|
||
rad = os.path.join(args.line_dir, args.prefix + ".rad")
|
||
data = None
|
||
for ext in (".rd3", ".rd7"):
|
||
if os.path.exists(os.path.join(args.line_dir, args.prefix + ext)):
|
||
data = os.path.join(args.line_dir, args.prefix + ext)
|
||
pos = os.path.join(args.line_dir, args.prefix + "_G01.pos")
|
||
convert_line(args.prefix, rad, data, pos if os.path.exists(pos) else None,
|
||
args.out)
|
||
else:
|
||
lines = find_lines(args.line_dir)
|
||
print("发现 %d 条测线,输出 → %s" % (len(lines), args.out))
|
||
for prefix, rad, data, pos in lines:
|
||
convert_line(prefix, rad, data, pos, args.out)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# probe:核对 .rd3 数据体主序
|
||
# ---------------------------------------------------------------------------
|
||
def load_flat(data_path, dims, endian="<"):
|
||
dt = np.dtype("%si%d" % (endian, dims["bytes_per_sample"]))
|
||
flat = np.fromfile(data_path, dtype=dt)
|
||
n = dims["last_trace"] * dims["samples"]
|
||
if flat.size != n:
|
||
raise ValueError("读到 %d 个样本,期望 %d" % (flat.size, n))
|
||
return flat.astype(np.float32)
|
||
|
||
|
||
def _clip(img):
|
||
"""按 99 分位绝对值裁剪对比度,返回 (img, vmax)。"""
|
||
v = np.percentile(np.abs(img), 99) or 1.0
|
||
return img, v
|
||
|
||
|
||
def cmd_probe(args):
|
||
import matplotlib
|
||
matplotlib.use("Agg")
|
||
import matplotlib.pyplot as plt
|
||
|
||
raw = parse_rad(os.path.join(args.line_dir, args.prefix + ".rad"))
|
||
data = None
|
||
for ext in (".rd3", ".rd7"):
|
||
cand = os.path.join(args.line_dir, args.prefix + ext)
|
||
if os.path.exists(cand):
|
||
data = cand
|
||
dims = compute_dims(raw, data)
|
||
K, M, N = dims["positions"], dims["channels"], dims["samples"]
|
||
flat = load_flat(data, dims, "<" if args.endian == "little" else ">")
|
||
os.makedirs(args.out, exist_ok=True)
|
||
|
||
print("[probe] %s K(道)=%d M(通道)=%d N(采样)=%d amp[min=%.0f max=%.0f mean|.|=%.1f]"
|
||
% (args.prefix, K, M, N, flat.min(), flat.max(), np.abs(flat).mean()))
|
||
|
||
ch = args.channel
|
||
# H1: position-major sweeps 顺序 = (pos0:ch0..chM-1)(pos1:..) → reshape(K,M,N)
|
||
h1 = flat.reshape(K, M, N)
|
||
bscan_h1 = h1[:, ch, :].T # (N 采样 × K 道)
|
||
# H2: channel-major sweeps 顺序 = (ch0:pos0..posK-1)(ch1:..) → reshape(M,K,N)
|
||
h2 = flat.reshape(M, K, N)
|
||
bscan_h2 = h2[ch, :, :].T # (N 采样 × K 道)
|
||
# C-scan(H1 主序下某采样深度的 道×通道 平面)
|
||
cscan_h1 = h1[:, :, args.depth] # (K × M)
|
||
|
||
panels = [
|
||
("H1 position-major B-scan ch%d" % ch, bscan_h1, "trace (K)", "sample (N)"),
|
||
("H2 channel-major B-scan ch%d" % ch, bscan_h2, "trace (K)", "sample (N)"),
|
||
("H1 C-scan @sample %d" % args.depth, cscan_h1, "channel (M)", "trace (K)"),
|
||
]
|
||
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
|
||
for axp, (title, img, xl, yl) in zip(axes, panels):
|
||
_, vmax = _clip(img)
|
||
axp.imshow(img, aspect="auto", cmap="gray", vmin=-vmax, vmax=vmax,
|
||
interpolation="nearest")
|
||
axp.set_title(title)
|
||
axp.set_xlabel(xl)
|
||
axp.set_ylabel(yl)
|
||
fig.suptitle("%s %s -- main-order check: the coherent B-scan (layers/hyperbolas) is correct"
|
||
% (PLUGIN_ID, args.prefix), fontsize=12)
|
||
fig.tight_layout()
|
||
out_png = os.path.join(args.out, "probe_%s.png" % args.prefix)
|
||
fig.savefig(out_png, dpi=110)
|
||
print("[probe] 出图 → %s" % out_png)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
def cmd_info(args):
|
||
lines = find_lines(args.line_dir)
|
||
print("目录 %s 发现 %d 条测线 (plugin=%s)" % (args.line_dir, len(lines), PLUGIN_ID))
|
||
for prefix, rad, data, pos in lines:
|
||
raw = parse_rad(rad)
|
||
dims = compute_dims(raw, data)
|
||
print(" %-18s K=%-5d M=%-3d N=%-4d bits=%d dx=%s tw=%sns ch_x=%d个 轨迹=%s"
|
||
% (prefix, dims["positions"], dims["channels"], dims["samples"],
|
||
dims["bits"], raw.get("DISTANCE INTERVAL", "?"),
|
||
raw.get("TIMEWINDOW", "?"),
|
||
len(raw.get("CH_X_OFFSETS", "").split()),
|
||
"有" if pos else "无"))
|
||
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="RADAR_TYPE_MALAMIRA 转换插件(本地原型)")
|
||
sub = ap.add_subparsers(dest="cmd", required=True)
|
||
|
||
p = sub.add_parser("info", help="列出目录内测线 + 维度校验")
|
||
p.add_argument("line_dir")
|
||
p.set_defaults(func=cmd_info)
|
||
|
||
p = sub.add_parser("convert", help="转换为规范化 .head/.data/.cor")
|
||
p.add_argument("line_dir")
|
||
p.add_argument("--prefix", default=None, help="只转某条线(默认全部)")
|
||
p.add_argument("--out", required=True, help="输出目录")
|
||
p.set_defaults(func=cmd_convert)
|
||
|
||
p = sub.add_parser("probe", help="出图核对 .rd3 数据体主序")
|
||
p.add_argument("line_dir")
|
||
p.add_argument("--prefix", required=True)
|
||
p.add_argument("--out", required=True)
|
||
p.add_argument("--channel", type=int, default=0)
|
||
p.add_argument("--depth", type=int, default=200, help="C-scan 取的采样深度")
|
||
p.add_argument("--endian", choices=["little", "big"], default="little")
|
||
p.set_defaults(func=cmd_probe)
|
||
|
||
args = ap.parse_args()
|
||
args.func(args)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|