geopro/tools/radar_convert/malamira.py

351 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
RADAR_TYPE_MALAMIRA 转换插件(本地原型)。
把 Mala Mira rSlicer 原始三件套(.rad + .rd3|.rd7 + _G01.pos转换成客户端规范化
格式(.head + .data + .cor并提供 probe 子命令出图核对 .rd3 数据体主序。
本工具实现的 convert 契约 = 未来"服务端下发插件"的接口:
plugin_id : RADAR_TYPE_MALAMIRA
supports(fileset) -> bool
convert(lineDir, prefix, outDir) -> {head, data, cor}
字段映射规则见客户《雷达业务开发说明》§3.3 / §2.2.2 / §3.5。与文档的少量偏差见 README.md。
"""
import argparse
import os
import shutil
import sys
import numpy as np
PLUGIN_ID = "RADAR_TYPE_MALAMIRA"
# 规范化 .head 字段顺序(三维雷达,文档 §1.2.2)。
HEAD_FIELD_ORDER = [
"DATE", "START_TIME", "STOP_TIME", "UNITS", "MODE", "ANTENNAS", "FREQUENCY",
"STACKS", "LAST_TRACE", "POSITIVE_DIRECTION", "SAMPLES", "TIME_INTERVAL",
"TIMEWINDOW", "DEPTH", "ZERO_POSITION", "DIELECTRIC", "SOIL_TYPE", "BITS",
"MARK", "DISTANCE_INTERVAL", "START_POSITION", "STOP_POSITION", "WHEEL_GPS",
"WHEEL_CALIBRATION", "SCAN_SECOND", "NUMBER_OF_CH", "CH_X_OFFSETS",
"RTK_X_OFFSET", "RTK_Y_OFFSET", "RTK_Z_OFFSET", "GAIN", "FILTER", "SMOOTH",
"ENDIAN_TYPE",
]
# ---------------------------------------------------------------------------
# 解析 .rad
# ---------------------------------------------------------------------------
def parse_rad(rad_path):
"""读 Mala .radASCIIKEY:VALUE 行)→ dict保留原始键值去首尾空白"""
raw = {}
with open(rad_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
if ":" not in line:
continue
key, _, val = line.partition(":")
raw[key.strip()] = val.strip()
return raw
def _f(raw, key, default=None):
v = raw.get(key, "")
if v == "" or v is None:
return default
try:
return float(v)
except ValueError:
return default
def _i(raw, key, default=None):
v = _f(raw, key, None)
return int(round(v)) if v is not None else default
def compute_dims(raw, data_path):
"""从 .rad + 数据文件大小推导体维度并做一致性校验。
返回 dictpositions(K)=道/切片数, channels(M), samples(N),
last_trace(总扫描数=K*M), bits, bytes_per_sample。
"""
samples = _i(raw, "SAMPLES")
channels = _i(raw, "NUMBER_OF_CH")
last_trace = _i(raw, "LAST TRACE") # Mala 中 = 总扫描数(道 * 通道)
if not samples or not channels or not last_trace:
raise ValueError(
"缺少 SAMPLES / NUMBER_OF_CH / LAST TRACE无法推导维度: %s" % data_path)
ext = os.path.splitext(data_path)[1].lower()
bytes_per_sample = 2 if ext == ".rd3" else 4 if ext == ".rd7" else None
if bytes_per_sample is None:
raise ValueError("未知数据扩展名(仅 .rd3/.rd7): %s" % data_path)
filesize = os.path.getsize(data_path)
expect = last_trace * samples * bytes_per_sample
if filesize != expect:
raise ValueError(
"数据体大小不符: %s 实际 %d 字节, 期望 LAST_TRACE(%d)*SAMPLES(%d)*%d = %d"
% (data_path, filesize, last_trace, samples, bytes_per_sample, expect))
if last_trace % channels != 0:
raise ValueError(
"LAST_TRACE(%d) 不能被 NUMBER_OF_CH(%d) 整除,无法切分道/通道"
% (last_trace, channels))
positions = last_trace // channels
return {
"positions": positions,
"channels": channels,
"samples": samples,
"last_trace": last_trace,
"bits": bytes_per_sample * 8,
"bytes_per_sample": bytes_per_sample,
"filesize": filesize,
}
# ---------------------------------------------------------------------------
# .rad -> .head
# ---------------------------------------------------------------------------
def build_head(raw, dims):
"""按 §3.3 把 .rad 映射成规范化 .head 字段 dict。无对应字段留空。"""
ch_y = raw.get("CH_Y_OFFSETS", "").split()
head = {k: "" for k in HEAD_FIELD_ORDER}
head.update({
"DATE": raw.get("DATE", ""),
"START_TIME": raw.get("TIME", ""),
"UNITS": raw.get("UNITS", ""),
"MODE": "距离模式", # Mala 默认距离模式(§3.3 要点 4)
"ANTENNAS": raw.get("ANTENNAS", ""),
"FREQUENCY": raw.get("FREQUENCY", ""),
"STACKS": raw.get("STACKS", ""),
"LAST_TRACE": str(dims["last_trace"]),
"POSITIVE_DIRECTION": raw.get("POSITIVE DIRECTION", ""),
"SAMPLES": str(dims["samples"]),
"TIME_INTERVAL": raw.get("TIME INTERVAL", ""),
"TIMEWINDOW": raw.get("TIMEWINDOW", ""),
"BITS": str(dims["bits"]),
"DISTANCE_INTERVAL": raw.get("DISTANCE INTERVAL", ""),
"START_POSITION": raw.get("START POSITION", ""),
"STOP_POSITION": raw.get("STOP POSITION", ""),
"WHEEL_CALIBRATION": raw.get("WHEEL CALIBRATION", ""),
"NUMBER_OF_CH": str(dims["channels"]),
"CH_X_OFFSETS": raw.get("CH_X_OFFSETS", "").strip(),
"RTK_Y_OFFSET": ch_y[0] if ch_y else "", # §3.3:取 CH_Y_OFFSETS 首元素
"ENDIAN_TYPE": "1", # Mala rd3 小端(§3.3 要点 1)
})
return head
def write_head(head, out_path):
with open(out_path, "w", encoding="utf-8", newline="\n") as f:
for k in HEAD_FIELD_ORDER:
f.write("%s:%s\n" % (k, head.get(k, "")))
# ---------------------------------------------------------------------------
# .pos -> .cor (§2.2.2 场景二)
# ---------------------------------------------------------------------------
def convert_pos_to_cor(pos_path, cor_path):
""".pos(本地坐标: 序号 北 东 高程) → .cor(序号 纬度 N 经度 E 高程 M 解状态=4)。
注:.pos 为本地投影坐标(米),按文档直接映射 北→纬度 / 东→经度N/E/M 为占位标识,
解状态固定填 4(RTK Fixed)。单线渲染不依赖 .cor 做世界配准,多线阶段再用。
"""
rows = []
with open(pos_path, "r", encoding="utf-8", errors="replace") as f:
for line in f:
s = line.strip()
if not s or s.upper().startswith("UNITS"):
continue
parts = s.split()
if len(parts) < 4:
continue
idx = int(float(parts[0]))
north, east, elev = float(parts[1]), float(parts[2]), float(parts[3])
rows.append((idx, north, east, elev))
with open(cor_path, "w", encoding="utf-8", newline="\n") as f:
f.write("VERSION:1\n")
for idx, north, east, elev in rows:
f.write("%d\t%.6f\tN\t%.6f\tE\t%.6f\tM\t4\n" % (idx, north, east, elev))
return len(rows)
# ---------------------------------------------------------------------------
# 测线发现
# ---------------------------------------------------------------------------
def find_lines(line_dir):
"""遍历目录,返回有效测线 [(prefix, rad, data, pos|None)]§3.2 抽取规则)。"""
out = []
for name in sorted(os.listdir(line_dir)):
if not name.lower().endswith(".rad"):
continue
prefix = name[:-4]
rad = os.path.join(line_dir, name)
data = None
for ext in (".rd3", ".rd7"):
cand = os.path.join(line_dir, prefix + ext)
if os.path.exists(cand):
data = cand
break
if data is None:
print(" [跳过] %s 缺 .rd3/.rd7 数据文件" % prefix, file=sys.stderr)
continue
pos = os.path.join(line_dir, prefix + "_G01.pos")
out.append((prefix, rad, data, pos if os.path.exists(pos) else None))
return out
# ---------------------------------------------------------------------------
# convert
# ---------------------------------------------------------------------------
def convert_line(prefix, rad, data, pos, out_dir):
raw = parse_rad(rad)
dims = compute_dims(raw, data)
os.makedirs(out_dir, exist_ok=True)
head = build_head(raw, dims)
write_head(head, os.path.join(out_dir, prefix + ".head"))
shutil.copyfile(data, os.path.join(out_dir, prefix + ".data")) # §3.5 原样拷贝
cor_n = convert_pos_to_cor(pos, os.path.join(out_dir, prefix + ".cor")) if pos else 0
print("[%s] 道(K)=%d 通道(M)=%d 采样(N)=%d bits=%d .data=%.1fMB .cor=%d%s"
% (prefix, dims["positions"], dims["channels"], dims["samples"],
dims["bits"], dims["filesize"] / 1e6, cor_n,
"" if pos else " (无轨迹)"))
return dims
def cmd_convert(args):
if args.prefix:
rad = os.path.join(args.line_dir, args.prefix + ".rad")
data = None
for ext in (".rd3", ".rd7"):
if os.path.exists(os.path.join(args.line_dir, args.prefix + ext)):
data = os.path.join(args.line_dir, args.prefix + ext)
pos = os.path.join(args.line_dir, args.prefix + "_G01.pos")
convert_line(args.prefix, rad, data, pos if os.path.exists(pos) else None,
args.out)
else:
lines = find_lines(args.line_dir)
print("发现 %d 条测线,输出 → %s" % (len(lines), args.out))
for prefix, rad, data, pos in lines:
convert_line(prefix, rad, data, pos, args.out)
# ---------------------------------------------------------------------------
# probe核对 .rd3 数据体主序
# ---------------------------------------------------------------------------
def load_flat(data_path, dims, endian="<"):
dt = np.dtype("%si%d" % (endian, dims["bytes_per_sample"]))
flat = np.fromfile(data_path, dtype=dt)
n = dims["last_trace"] * dims["samples"]
if flat.size != n:
raise ValueError("读到 %d 个样本,期望 %d" % (flat.size, n))
return flat.astype(np.float32)
def _clip(img):
"""按 99 分位绝对值裁剪对比度,返回 (img, vmax)。"""
v = np.percentile(np.abs(img), 99) or 1.0
return img, v
def cmd_probe(args):
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
raw = parse_rad(os.path.join(args.line_dir, args.prefix + ".rad"))
data = None
for ext in (".rd3", ".rd7"):
cand = os.path.join(args.line_dir, args.prefix + ext)
if os.path.exists(cand):
data = cand
dims = compute_dims(raw, data)
K, M, N = dims["positions"], dims["channels"], dims["samples"]
flat = load_flat(data, dims, "<" if args.endian == "little" else ">")
os.makedirs(args.out, exist_ok=True)
print("[probe] %s K(道)=%d M(通道)=%d N(采样)=%d amp[min=%.0f max=%.0f mean|.|=%.1f]"
% (args.prefix, K, M, N, flat.min(), flat.max(), np.abs(flat).mean()))
ch = args.channel
# H1: position-major sweeps 顺序 = (pos0:ch0..chM-1)(pos1:..) → reshape(K,M,N)
h1 = flat.reshape(K, M, N)
bscan_h1 = h1[:, ch, :].T # (N 采样 × K 道)
# H2: channel-major sweeps 顺序 = (ch0:pos0..posK-1)(ch1:..) → reshape(M,K,N)
h2 = flat.reshape(M, K, N)
bscan_h2 = h2[ch, :, :].T # (N 采样 × K 道)
# C-scanH1 主序下某采样深度的 道×通道 平面)
cscan_h1 = h1[:, :, args.depth] # (K × M)
panels = [
("H1 position-major B-scan ch%d" % ch, bscan_h1, "trace (K)", "sample (N)"),
("H2 channel-major B-scan ch%d" % ch, bscan_h2, "trace (K)", "sample (N)"),
("H1 C-scan @sample %d" % args.depth, cscan_h1, "channel (M)", "trace (K)"),
]
fig, axes = plt.subplots(1, 3, figsize=(18, 6))
for axp, (title, img, xl, yl) in zip(axes, panels):
_, vmax = _clip(img)
axp.imshow(img, aspect="auto", cmap="gray", vmin=-vmax, vmax=vmax,
interpolation="nearest")
axp.set_title(title)
axp.set_xlabel(xl)
axp.set_ylabel(yl)
fig.suptitle("%s %s -- main-order check: the coherent B-scan (layers/hyperbolas) is correct"
% (PLUGIN_ID, args.prefix), fontsize=12)
fig.tight_layout()
out_png = os.path.join(args.out, "probe_%s.png" % args.prefix)
fig.savefig(out_png, dpi=110)
print("[probe] 出图 → %s" % out_png)
# ---------------------------------------------------------------------------
def cmd_info(args):
lines = find_lines(args.line_dir)
print("目录 %s 发现 %d 条测线 (plugin=%s)" % (args.line_dir, len(lines), PLUGIN_ID))
for prefix, rad, data, pos in lines:
raw = parse_rad(rad)
dims = compute_dims(raw, data)
print(" %-18s K=%-5d M=%-3d N=%-4d bits=%d dx=%s tw=%sns ch_x=%d个 轨迹=%s"
% (prefix, dims["positions"], dims["channels"], dims["samples"],
dims["bits"], raw.get("DISTANCE INTERVAL", "?"),
raw.get("TIMEWINDOW", "?"),
len(raw.get("CH_X_OFFSETS", "").split()),
"" if pos else ""))
def main():
ap = argparse.ArgumentParser(description="RADAR_TYPE_MALAMIRA 转换插件(本地原型)")
sub = ap.add_subparsers(dest="cmd", required=True)
p = sub.add_parser("info", help="列出目录内测线 + 维度校验")
p.add_argument("line_dir")
p.set_defaults(func=cmd_info)
p = sub.add_parser("convert", help="转换为规范化 .head/.data/.cor")
p.add_argument("line_dir")
p.add_argument("--prefix", default=None, help="只转某条线(默认全部)")
p.add_argument("--out", required=True, help="输出目录")
p.set_defaults(func=cmd_convert)
p = sub.add_parser("probe", help="出图核对 .rd3 数据体主序")
p.add_argument("line_dir")
p.add_argument("--prefix", required=True)
p.add_argument("--out", required=True)
p.add_argument("--channel", type=int, default=0)
p.add_argument("--depth", type=int, default=200, help="C-scan 取的采样深度")
p.add_argument("--endian", choices=["little", "big"], default="little")
p.set_defaults(func=cmd_probe)
args = ap.parse_args()
args.func(args)
if __name__ == "__main__":
main()