From 5ffc7847927edaaf2f7bf48246a678fc81b92f76 Mon Sep 17 00:00:00 2001 From: gaozheng Date: Tue, 23 Jun 2026 21:50:59 +0800 Subject: [PATCH] =?UTF-8?q?feat(store):=20ChunkedVolumeStore=20=E5=A2=9E?= =?UTF-8?q?=E9=87=8F=E5=86=99=20StreamingVolumeWriter?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 逐块增量写 level0 store,建体不必持有整卷。产出与非流式 write 逐 brick + meta 完全一致:data.bin 逐块 qCompress 追加(块按 finalize 时固定顺序 bz 最慢/bx 最快 索引),meta.json 结构同 write,故 ChunkedVolumeStore/readBrick 可照常读;偏移 64 位。 DRY 复用 write 的压缩/索引/meta 序列化逻辑(compressBrick/brickIndexJson/ writeMetaGeometry)。约定:每块只写一次(重复抛),缺块 finalize 抛,体素数不符抛。 核心测试 test_streaming_write.cpp:流式(乱序写)vs 非流式 write 逐块对拍 + meta 一致;含重复/缺块/尺寸不符三类错误用例。不破坏 write/readBrick/buildPyramid 现有行为。 --- src/data/store/ChunkedVolumeStore.cpp | 155 +++++++++++++++++++--- src/data/store/ChunkedVolumeStore.hpp | 37 ++++++ tests/CMakeLists.txt | 2 + tests/data/store/test_streaming_write.cpp | 152 +++++++++++++++++++++ 4 files changed, 328 insertions(+), 18 deletions(-) create mode 100644 tests/data/store/test_streaming_write.cpp diff --git a/src/data/store/ChunkedVolumeStore.cpp b/src/data/store/ChunkedVolumeStore.cpp index a47edb8..56fd191 100644 --- a/src/data/store/ChunkedVolumeStore.cpp +++ b/src/data/store/ChunkedVolumeStore.cpp @@ -54,6 +54,40 @@ int extent(int n, int b, int brick) { return got < brick ? got : brick; } +// 压缩一块 int16 体素 → qCompress 流(write 与 StreamingVolumeWriter 共用,DRY)。 +QByteArray compressBrick(const std::vector& blk) { + const int rawBytes = static_cast(blk.size() * sizeof(std::int16_t)); + return qCompress(reinterpret_cast(blk.data()), rawBytes); +} + +// 把几何/量化元信息写入 meta json(不含 bricks/levels 数组)——write 与 +// StreamingVolumeWriter::finalize 共用,确保 meta 结构逐字段一致(DRY)。 +void writeMetaGeometry(json& meta, int nx, int ny, int nz, int brick, + const std::array& origin, + const std::array& spacing, + const geopro::core::Quant& quant, double vminPhys, + double vmaxPhys) { + meta["nx"] = nx; + meta["ny"] = ny; + meta["nz"] = nz; + meta["brick"] = brick; + meta["origin"] = {origin[0], origin[1], origin[2]}; + meta["spacing"] = {spacing[0], spacing[1], spacing[2]}; + meta["quant"] = {{"scale", quant.scale}, {"offset", quant.offset}}; + meta["vminPhys"] = vminPhys; + meta["vmaxPhys"] = vmaxPhys; +} + +// 单块索引 json(write 与 StreamingVolumeWriter::finalize 共用,DRY)。 +json brickIndexJson(std::int64_t offset, std::int64_t clen, int bw, int bh, + int bd) { + return json{{"offset", offset}, + {"compressedLen", clen}, + {"bw", bw}, + {"bh", bh}, + {"bd", bd}}; +} + // 从体中拷出一块的 int16(块内 i 最快、k 最慢,与体一致)。 std::vector sliceBrick(const geopro::core::ScalarVolumeI16& vol, int bx, int by, int bz, int brick, @@ -101,20 +135,13 @@ void ChunkedVolumeStore::write(const std::string& dir, const int bd = extent(nz, bz, brick); auto raw = sliceBrick(vol, bx, by, bz, brick, bw, bh, bd); - const int rawBytes = - static_cast(raw.size() * sizeof(std::int16_t)); - const QByteArray compressed = qCompress( - reinterpret_cast(raw.data()), rawBytes); + const QByteArray compressed = compressBrick(raw); const std::int64_t clen = compressed.size(); data.write(compressed.constData(), clen); if (!data) throw std::runtime_error("ChunkedVolumeStore: data.bin write failed"); - bricks.push_back(json{{"offset", offset}, - {"compressedLen", clen}, - {"bw", bw}, - {"bh", bh}, - {"bd", bd}}); + bricks.push_back(brickIndexJson(offset, clen, bw, bh, bd)); offset += clen; } } @@ -122,15 +149,8 @@ void ChunkedVolumeStore::write(const std::string& dir, data.close(); json meta; - meta["nx"] = nx; - meta["ny"] = ny; - meta["nz"] = nz; - meta["brick"] = brick; - meta["origin"] = {b.origin[0], b.origin[1], b.origin[2]}; - meta["spacing"] = {b.spacing[0], b.spacing[1], b.spacing[2]}; - meta["quant"] = {{"scale", b.quant.scale}, {"offset", b.quant.offset}}; - meta["vminPhys"] = b.vminPhys; - meta["vmaxPhys"] = b.vmaxPhys; + writeMetaGeometry(meta, nx, ny, nz, brick, b.origin, b.spacing, b.quant, + b.vminPhys, b.vmaxPhys); meta["bricks"] = std::move(bricks); std::ofstream metaOut((fs::path(dir) / kMetaFile).string(), @@ -545,4 +565,103 @@ void ChunkedVolumeStore::buildPyramid(int levels) { if (!metaOut) throw std::runtime_error("ChunkedVolumeStore: meta.json write failed"); } +// ----------------------- StreamingVolumeWriter ----------------------- + +StreamingVolumeWriter::StreamingVolumeWriter(const std::string& dir, + const StoreMeta& meta) + : dir_(dir), meta_(meta) { + if (meta_.brick <= 0) + throw std::invalid_argument("StreamingVolumeWriter: brick must be > 0"); + + bricksX_ = ceilDiv(meta_.nx, meta_.brick); + bricksY_ = ceilDiv(meta_.ny, meta_.brick); + bricksZ_ = ceilDiv(meta_.nz, meta_.brick); + + // 预分配固定顺序索引(bz 最慢、bx 最快),与 write 的遍历顺序一致 → 同布局。 + entries_.assign(static_cast(bricksX_) * bricksY_ * bricksZ_, + Entry{}); + + fs::create_directories(fs::path(dir_)); + // 截断式打开一次以建立空 data.bin(writeBrick 用追加模式逐块写)。 + std::ofstream init((fs::path(dir_) / kDataFile).string(), + std::ios::binary | std::ios::trunc); + if (!init) + throw std::runtime_error( + "StreamingVolumeWriter: cannot open data.bin for write"); +} + +void StreamingVolumeWriter::writeBrick(int bx, int by, int bz, + const std::vector& voxels) { + if (finalized_) + throw std::runtime_error( + "StreamingVolumeWriter: writeBrick after finalize"); + if (bx < 0 || by < 0 || bz < 0 || bx >= bricksX_ || by >= bricksY_ || + bz >= bricksZ_) + throw std::runtime_error("StreamingVolumeWriter: brick index out of range"); + + const std::size_t idx = + (static_cast(bz) * bricksY_ + by) * bricksX_ + bx; + Entry& e = entries_[idx]; + if (e.written) + throw std::runtime_error("StreamingVolumeWriter: brick written twice"); + + const int bw = extent(meta_.nx, bx, meta_.brick); + const int bh = extent(meta_.ny, by, meta_.brick); + const int bd = extent(meta_.nz, bz, meta_.brick); + const std::size_t expect = static_cast(bw) * bh * bd; + if (voxels.size() != expect) + throw std::runtime_error("StreamingVolumeWriter: brick voxel count mismatch"); + + const QByteArray compressed = compressBrick(voxels); + const std::int64_t clen = compressed.size(); + + // 追加写 data.bin(块按 writeBrick 调用顺序物理排布,索引记录各自偏移; + // finalize 再按固定顺序写 meta → readBrick 凭索引偏移定位,物理顺序无关)。 + std::ofstream data((fs::path(dir_) / kDataFile).string(), + std::ios::binary | std::ios::app); + if (!data) + throw std::runtime_error( + "StreamingVolumeWriter: cannot open data.bin for append"); + data.write(compressed.constData(), clen); + if (!data) + throw std::runtime_error("StreamingVolumeWriter: data.bin write failed"); + + e.offset = offset_; + e.compressedLen = clen; + e.bw = bw; + e.bh = bh; + e.bd = bd; + e.written = true; + offset_ += clen; + ++written_; +} + +void StreamingVolumeWriter::finalize() { + if (finalized_) + throw std::runtime_error("StreamingVolumeWriter: already finalized"); + if (written_ != static_cast(entries_.size())) + throw std::runtime_error("StreamingVolumeWriter: missing bricks at finalize"); + + // 按固定顺序(bz 最慢、bx 最快)输出索引,结构与 write 的 bricks 数组一致。 + json bricks = json::array(); + for (const Entry& e : entries_) + bricks.push_back(brickIndexJson(e.offset, e.compressedLen, e.bw, e.bh, e.bd)); + + json meta; + writeMetaGeometry(meta, meta_.nx, meta_.ny, meta_.nz, meta_.brick, + meta_.origin, meta_.spacing, meta_.quant, meta_.vminPhys, + meta_.vmaxPhys); + meta["bricks"] = std::move(bricks); + + std::ofstream metaOut((fs::path(dir_) / kMetaFile).string(), + std::ios::trunc); + if (!metaOut) + throw std::runtime_error( + "StreamingVolumeWriter: cannot open meta.json for write"); + metaOut << meta.dump(2); + if (!metaOut) + throw std::runtime_error("StreamingVolumeWriter: meta.json write failed"); + finalized_ = true; +} + } // namespace geopro::data diff --git a/src/data/store/ChunkedVolumeStore.hpp b/src/data/store/ChunkedVolumeStore.hpp index 78c0beb..f155794 100644 --- a/src/data/store/ChunkedVolumeStore.hpp +++ b/src/data/store/ChunkedVolumeStore.hpp @@ -115,6 +115,43 @@ class ChunkedVolumeStore { int levelCount_ = 1; // mutable:brickRange 为 const,但惰性算出值域后需就地缓存(置 hasRange=true)。 mutable std::vector levels_; // levels_[0] 即 level 0(与 bricks_ 同源) + + friend class StreamingVolumeWriter; +}; + +// 逐块增量写 level0 store(不持整卷)。块写入顺序任意,但每块只写一次。 +// 产出与 ChunkedVolumeStore::write(整卷) 逐 brick + meta 完全一致:data.bin 为 +// 逐块 qCompress 流(按 bz 最慢、bx 最快的固定顺序排布),meta.json 结构同 write, +// 故 ChunkedVolumeStore(dir)/readBrick 能照常读。偏移/长度全程 64 位。 +class StreamingVolumeWriter { + public: + // 用 StoreMeta 定 dims/brick/origin/spacing/quant/vminmax(与 write 一致的元信息)。 + StreamingVolumeWriter(const std::string& dir, const StoreMeta& meta); + + // 写一块:voxels 为该块体素(大小=bw*bh*bd,块内 i 最快,与 write 同布局)。 + // bx/by/bz 为块索引;体素数不符或同块重复写 → 抛 std::runtime_error。 + void writeBrick(int bx, int by, int bz, + const std::vector& voxels); + + // 收尾:写 meta.json(含所有已写块的索引)。有缺块 → 抛 std::runtime_error。 + void finalize(); + + private: + // 单块在 data.bin 中的索引(与 ChunkedVolumeStore::BrickEntry 子集对应)。 + struct Entry { + std::int64_t offset = 0; + std::int64_t compressedLen = 0; + int bw = 0, bh = 0, bd = 0; + bool written = false; + }; + + std::string dir_; + StoreMeta meta_; + int bricksX_ = 0, bricksY_ = 0, bricksZ_ = 0; + std::vector entries_; // 固定顺序索引(bz 最慢、bx 最快) + std::int64_t offset_ = 0; // data.bin 当前追加偏移(64 位) + std::int64_t written_ = 0; // 已写块计数 + bool finalized_ = false; }; } // namespace geopro::data diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index be75041..86f1d82 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -58,6 +58,8 @@ target_link_libraries(geopro_tests PRIVATE geopro_data) target_sources(geopro_tests PRIVATE data/store/test_chunked_volume_store.cpp) # store 层:金字塔(多分辨率 LOD + 每块 min/max;不破坏 level0 与老 store 兼容)。 target_sources(geopro_tests PRIVATE data/store/test_pyramid.cpp) +# store 层:StreamingVolumeWriter(逐块增量写 level0;与非流式 write 逐块+meta 对拍一致)。 +target_sources(geopro_tests PRIVATE data/store/test_streaming_write.cpp) target_link_libraries(geopro_tests PRIVATE geopro_store) # net 层:RSA 加密器。测试需直接用 OpenSSL 生成/解密密钥,故显式 find_package diff --git a/tests/data/store/test_streaming_write.cpp b/tests/data/store/test_streaming_write.cpp new file mode 100644 index 0000000..c17f549 --- /dev/null +++ b/tests/data/store/test_streaming_write.cpp @@ -0,0 +1,152 @@ +#include "data/store/ChunkedVolumeStore.hpp" +#include "core/algo/GprVolumeBuilder.hpp" +#include +#include +#include +#include +using namespace geopro::data; + +namespace { +// 非常量体:体素随 (i,j,k) 变化,确保逐块对拍真正区分块内容(非全相同)。 +geopro::core::BuiltI16 makeBuilt(int nx, int ny, int nz) { + geopro::core::BuiltI16 b; + b.vol = geopro::core::ScalarVolumeI16(nx, ny, nz); + for (int k = 0; k < nz; ++k) + for (int j = 0; j < ny; ++j) + for (int i = 0; i < nx; ++i) + b.vol.at(i, j, k) = + static_cast((i * 131 + j * 17 + k * 7) % 251); + b.quant = {0.5, -3.0}; + b.origin = {{10.0, 20.0, 30.0}}; + b.spacing = {{2.0, 3.0, 4.0}}; + b.vminPhys = -3.0; + b.vmaxPhys = 122.0; + return b; +} + +int ceilDiv(int n, int brick) { return (n + brick - 1) / brick; } +int extent(int n, int b, int brick) { + const int got = n - b * brick; + return got < brick ? got : brick; +} + +// 从整卷切出 (bx,by,bz) 块体素(块内 i 最快、k 最慢,与 write 同布局)。 +std::vector sliceBrickFrom(const geopro::core::BuiltI16& b, int bx, + int by, int bz, int brick) { + const auto& vol = b.vol; + const int bw = extent(vol.nx(), bx, brick); + const int bh = extent(vol.ny(), by, brick); + const int bd = extent(vol.nz(), bz, brick); + std::vector out(static_cast(bw) * bh * bd); + const int i0 = bx * brick, j0 = by * brick, k0 = bz * brick; + std::size_t w = 0; + for (int kk = 0; kk < bd; ++kk) + for (int jj = 0; jj < bh; ++jj) + for (int ii = 0; ii < bw; ++ii) + out[w++] = vol.at(i0 + ii, j0 + jj, k0 + kk); + return out; +} + +std::string tmp(const char* name) { + return (std::filesystem::temp_directory_path() / name).string(); +} +} // namespace + +// 核心验收:流式逐块写 vs 非流式整卷 write,逐块 readBrick + meta 完全一致。 +TEST(StreamingVolumeWriter, MatchesNonStreamingWrite) { + auto b = makeBuilt(100, 40, 30); // 100/40/30 均非整除 64 → 含边缘块 + const int brick = 64; + auto dirA = tmp("swA"), dirB = tmp("swB"); + std::filesystem::remove_all(dirA); + std::filesystem::remove_all(dirB); + + ChunkedVolumeStore::write(dirA, b, brick); // 金标准 + StoreMeta m = ChunkedVolumeStore::readMeta(dirA); // 复用同一 meta 元信息 + + const int bX = ceilDiv(m.nx, brick); + const int bY = ceilDiv(m.ny, brick); + const int bZ = ceilDiv(m.nz, brick); + + { + StreamingVolumeWriter w(dirB, m); + // 故意打乱写入顺序(与 write 的固定遍历顺序不同),验证顺序无关。 + for (int bx = 0; bx < bX; ++bx) + for (int by = 0; by < bY; ++by) + for (int bz = 0; bz < bZ; ++bz) + w.writeBrick(bx, by, bz, sliceBrickFrom(b, bx, by, bz, brick)); + w.finalize(); + } + + ChunkedVolumeStore A(dirA), B(dirB); + for (int bz = 0; bz < bZ; ++bz) + for (int by = 0; by < bY; ++by) + for (int bx = 0; bx < bX; ++bx) + EXPECT_EQ(A.readBrick(bx, by, bz), B.readBrick(bx, by, bz)) + << "brick mismatch at " << bx << "," << by << "," << bz; + + EXPECT_EQ(B.meta().nx, A.meta().nx); + EXPECT_EQ(B.meta().ny, A.meta().ny); + EXPECT_EQ(B.meta().nz, A.meta().nz); + EXPECT_EQ(B.meta().brick, A.meta().brick); + EXPECT_EQ(B.meta().origin, A.meta().origin); + EXPECT_EQ(B.meta().spacing, A.meta().spacing); + EXPECT_EQ(B.meta().quant.scale, A.meta().quant.scale); + EXPECT_EQ(B.meta().quant.offset, A.meta().quant.offset); + EXPECT_EQ(B.meta().vminPhys, A.meta().vminPhys); + EXPECT_EQ(B.meta().vmaxPhys, A.meta().vmaxPhys); + + std::filesystem::remove_all(dirA); + std::filesystem::remove_all(dirB); +} + +// 同一块重复写 → 抛异常(约定:每块只写一次)。 +TEST(StreamingVolumeWriter, DuplicateBrickThrows) { + auto b = makeBuilt(70, 30, 20); + auto dirA = tmp("swDup"); + std::filesystem::remove_all(dirA); + ChunkedVolumeStore::write(dirA, b, 64); + StoreMeta m = ChunkedVolumeStore::readMeta(dirA); + + auto dirB = tmp("swDupB"); + std::filesystem::remove_all(dirB); + StreamingVolumeWriter w(dirB, m); + w.writeBrick(0, 0, 0, sliceBrickFrom(b, 0, 0, 0, 64)); + EXPECT_THROW(w.writeBrick(0, 0, 0, sliceBrickFrom(b, 0, 0, 0, 64)), + std::runtime_error); + std::filesystem::remove_all(dirA); + std::filesystem::remove_all(dirB); +} + +// 缺块 finalize → 抛异常(约定:所有块必须写齐)。 +TEST(StreamingVolumeWriter, MissingBrickFinalizeThrows) { + auto b = makeBuilt(70, 30, 20); // bX=2,bY=1,bZ=1 → 共 2 块 + auto dirA = tmp("swMiss"); + std::filesystem::remove_all(dirA); + ChunkedVolumeStore::write(dirA, b, 64); + StoreMeta m = ChunkedVolumeStore::readMeta(dirA); + + auto dirB = tmp("swMissB"); + std::filesystem::remove_all(dirB); + StreamingVolumeWriter w(dirB, m); + w.writeBrick(0, 0, 0, sliceBrickFrom(b, 0, 0, 0, 64)); // 只写 1 块,缺 (1,0,0) + EXPECT_THROW(w.finalize(), std::runtime_error); + std::filesystem::remove_all(dirA); + std::filesystem::remove_all(dirB); +} + +// 块体素大小不符(bw*bh*bd 不匹配)→ 抛异常。 +TEST(StreamingVolumeWriter, WrongVoxelCountThrows) { + auto b = makeBuilt(70, 30, 20); + auto dirA = tmp("swSize"); + std::filesystem::remove_all(dirA); + ChunkedVolumeStore::write(dirA, b, 64); + StoreMeta m = ChunkedVolumeStore::readMeta(dirA); + + auto dirB = tmp("swSizeB"); + std::filesystem::remove_all(dirB); + StreamingVolumeWriter w(dirB, m); + std::vector bad(10); // 远小于 64*30*20 + EXPECT_THROW(w.writeBrick(0, 0, 0, bad), std::runtime_error); + std::filesystem::remove_all(dirA); + std::filesystem::remove_all(dirB); +}