diff --git a/pp/BUILD b/pp/BUILD index 7db1eac54..a7695adcf 100644 --- a/pp/BUILD +++ b/pp/BUILD @@ -198,6 +198,19 @@ cc_library( ], ) +cc_test( + name = "entrypoint_types_test", + srcs = glob(["entrypoint/types/**/*_tests.cpp"]), + deps = [ + ":bare_bones", + ":entrypoint_types", + ":metrics", + ":prometheus", + ":wal", + "@gtest//:gtest_main", + ], +) + cc_library( name = "entrypoint_bridge", srcs = glob( diff --git a/pp/entrypoint/bridge/go_constants.cpp b/pp/entrypoint/bridge/go_constants.cpp deleted file mode 100644 index d6b50d223..000000000 --- a/pp/entrypoint/bridge/go_constants.cpp +++ /dev/null @@ -1,24 +0,0 @@ -#include "entrypoint/types/go_constants.h" -#include "entrypoint/types/serialized_data.h" -#include "metrics/storage.h" -#include "prometheus/relabeler.h" -#include "wal/output_decoder.h" -#include "wal/segment_samples_storage.h" - -namespace { - -static_assert(sizeof(std::vector) == Sizeof_StdVector); -static_assert(sizeof(BareBones::Vector) == Sizeof_BareBonesVector); -static_assert(sizeof(roaring::Roaring) == Sizeof_RoaringBitset); - -static_assert(sizeof(PromPP::Prometheus::Relabel::InnerSeries) == Sizeof_InnerSeries); - -static_assert(sizeof(entrypoint::types::SerializedDataIterator) == Sizeof_SerializedDataIterator); - -static_assert(sizeof(metrics::Storage::Iterator) == Sizeof_MetricsIterator); - -static_assert(sizeof(PromPP::WAL::SegmentSamplesStorage) == Sizeof_SegmentSamplesStorage); -static_assert(sizeof(PromPP::WAL::ProtobufEncoder) == Sizeof_RemoteWriteMessageEncoder); -static_assert(sizeof(PromPP::WAL::SegmentSamplesStorageList::Iterator) == Sizeof_SegmentSamplesStorageListIterator); - -} // namespace \ No newline at end of file diff --git a/pp/entrypoint/bridge/series_data_data_storage.cpp b/pp/entrypoint/bridge/series_data_data_storage.cpp index b550c3a4b..55fe3eae1 100644 --- a/pp/entrypoint/bridge/series_data_data_storage.cpp +++ b/pp/entrypoint/bridge/series_data_data_storage.cpp @@ -16,7 +16,6 @@ #include "series_data/querier/querier.h" #include "series_data/unloading/loader.h" #include "series_data/unloading/unloader.h" -#include "series_index/querier/selector_querier.h" using entrypoint::types::DataStoragePtr; using entrypoint::types::QueryableEncodingBimap; diff --git a/pp/entrypoint/types/go_constants_tests.cpp b/pp/entrypoint/types/go_constants_tests.cpp new file mode 100644 index 000000000..73d81dc88 --- /dev/null +++ b/pp/entrypoint/types/go_constants_tests.cpp @@ -0,0 +1,33 @@ +#include + +#include + +#include "bare_bones/vector.h" +#include "entrypoint/types/go_constants.h" +#include "entrypoint/types/serialized_data.h" +#include "metrics/storage.h" +#include "prometheus/relabeler.h" +#include "wal/output_decoder.h" +#include "wal/segment_samples_storage.h" + +namespace { + +TEST(GoConstantsTest, CompileTimeSizesMatchConstants) { + static_assert(sizeof(std::vector) == Sizeof_StdVector); + static_assert(sizeof(BareBones::Vector) == Sizeof_BareBonesVector); + static_assert(sizeof(roaring::Roaring) == Sizeof_RoaringBitset); + + static_assert(sizeof(PromPP::Prometheus::Relabel::InnerSeries) == Sizeof_InnerSeries); + + static_assert(sizeof(entrypoint::types::SerializedDataIterator) == Sizeof_SerializedDataIterator); + + static_assert(sizeof(metrics::Storage::Iterator) == Sizeof_MetricsIterator); + + static_assert(sizeof(PromPP::WAL::SegmentSamplesStorage) == Sizeof_SegmentSamplesStorage); + static_assert(sizeof(PromPP::WAL::ProtobufEncoder) == Sizeof_RemoteWriteMessageEncoder); + static_assert(sizeof(PromPP::WAL::SegmentSamplesStorageList::Iterator) == Sizeof_SegmentSamplesStorageListIterator); + + SUCCEED(); +} + +} // namespace diff --git a/pp/entrypoint/types/loader_tests.cpp b/pp/entrypoint/types/loader_tests.cpp new file mode 100644 index 000000000..32c4cce5c --- /dev/null +++ b/pp/entrypoint/types/loader_tests.cpp @@ -0,0 +1,164 @@ +#include + +#include "bare_bones/streams.h" +#include "entrypoint/types/loader.h" +#include "primitives/label_set.h" +#include "series_data/decoder.h" +#include "series_data/encoder.h" +#include "series_data/encoder/sample.h" +#include "series_data/unloading/unloader.h" + +namespace { + +using entrypoint::types::QueryableEncodingBimap; +using PromPP::Primitives::LabelViewSet; +using series_data::DataStorage; +using series_data::Decoder; +using series_data::Encoder; +using series_data::chunk::DataChunk; +using series_data::encoder::SampleList; +using series_data::unloading::Unloader; + +class RevertableLoaderFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + Unloader unloader_{storage_}; + BareBones::ShrinkedToFitOStringStream stream_; + QueryableEncodingBimap lss_; + + void SetUp() override { + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + encoder_.encode(0, 4, 4.0); + encoder_.encode(0, 5, 5.0); + + lss_.find_or_emplace(LabelViewSet{{"job", "a"}}); + lss_.build_deferred_indexes(); + } + + void encode_more(uint32_t ls_id, const LabelViewSet& label_set, const SampleList& samples) { + for (const auto& sample : samples) { + encoder_.encode(ls_id, sample.timestamp, sample.value); + } + + lss_.find_or_emplace(label_set); + lss_.build_deferred_indexes(); + } + + [[nodiscard]] auto open_chunk_stream(uint32_t ls_id) const { + return storage_.get_asc_integer_stream(storage_.open_chunks[ls_id].encoder.external_index); + } + + [[nodiscard]] SampleList decode_open_chunk(uint32_t ls_id) const { + return Decoder::decode_chunk(storage_, storage_.open_chunks[ls_id]); + } +}; + +TEST_F(RevertableLoaderFixture, LoadFinalizeRestoresUnloadedOpenChunk) { + // Arrange + unloader_.create_snapshot(stream_); + unloader_.unload(); + + // Act + entrypoint::types::RevertableLoader loader{storage_, lss_.ls_id_set().begin(), lss_.ls_id_set().end(), 1}; + loader.load_next(stream_.span()); + loader.load_finalize(); + + // Assert + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}}), decode_open_chunk(0)); + EXPECT_FALSE(storage_.unloaded_series_bitmap.is_set(0)); +} + +TEST_F(RevertableLoaderFixture, RevertRestoresUnloadedOpenChunk) { + // Arrange + unloader_.create_snapshot(stream_); + unloader_.unload(); + const auto trimmed_stream = open_chunk_stream(0); + + entrypoint::types::RevertableLoader loader{storage_, lss_.ls_id_set().begin(), lss_.ls_id_set().end(), 1}; + loader.load_next(stream_.span()); + loader.load_finalize(); + + // Act + loader.revert(); + const auto restored_stream = open_chunk_stream(0); + + // Assert + EXPECT_EQ(trimmed_stream, restored_stream); + EXPECT_TRUE(storage_.unloaded_series_bitmap.is_set(0)); +} + +TEST_F(RevertableLoaderFixture, LoadFinalizeLoadsSeriesByBatch) { + // Arrange + encode_more(1, LabelViewSet{{"job", "b"}}, SampleList{{1, 11.0}, {2, 12.0}, {3, 13.0}, {4, 14.0}, {5, 15.0}}); + encode_more(2, LabelViewSet{{"job", "c"}}, SampleList{{1, 21.0}, {2, 22.0}, {3, 23.0}, {4, 24.0}, {5, 25.0}}); + + unloader_.create_snapshot(stream_); + unloader_.unload(); + + entrypoint::types::RevertableLoader loader{storage_, lss_.ls_id_set().begin(), lss_.ls_id_set().end(), 2}; + + // Act + loader.load_next(stream_.span()); + loader.load_finalize(); + + const auto has_second_batch = loader.next_batch(); + loader.load_next(stream_.span()); + loader.load_finalize(); + + const auto has_third_batch = loader.next_batch(); + + // Assert + EXPECT_TRUE(has_second_batch); + EXPECT_FALSE(has_third_batch); + + EXPECT_FALSE(storage_.unloaded_series_bitmap.is_set(0)); + EXPECT_FALSE(storage_.unloaded_series_bitmap.is_set(1)); + EXPECT_FALSE(storage_.unloaded_series_bitmap.is_set(2)); + + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}}), decode_open_chunk(0)); + EXPECT_EQ((SampleList{{1, 11.0}, {2, 12.0}, {3, 13.0}, {4, 14.0}, {5, 15.0}}), decode_open_chunk(1)); + EXPECT_EQ((SampleList{{1, 21.0}, {2, 22.0}, {3, 23.0}, {4, 24.0}, {5, 25.0}}), decode_open_chunk(2)); +} + +TEST_F(RevertableLoaderFixture, RevertRestoresSeriesLoadedAcrossBatches) { + // Arrange + encode_more(1, LabelViewSet{{"job", "b"}}, SampleList{{1, 11.0}, {2, 12.0}, {3, 13.0}, {4, 14.0}, {5, 15.0}}); + encode_more(2, LabelViewSet{{"job", "c"}}, SampleList{{1, 21.0}, {2, 22.0}, {3, 23.0}, {4, 24.0}, {5, 25.0}}); + + unloader_.create_snapshot(stream_); + unloader_.unload(); + const auto trimmed_stream0 = open_chunk_stream(0); + const auto trimmed_stream1 = open_chunk_stream(1); + const auto trimmed_stream2 = open_chunk_stream(2); + + entrypoint::types::RevertableLoader loader{storage_, lss_.ls_id_set().begin(), lss_.ls_id_set().end(), 2}; + + loader.load_next(stream_.span()); + loader.load_finalize(); + + const auto has_second_batch = loader.next_batch(); + loader.load_next(stream_.span()); + loader.load_finalize(); + + // Act + loader.revert(); + const auto restored_stream0 = open_chunk_stream(0); + const auto restored_stream1 = open_chunk_stream(1); + const auto restored_stream2 = open_chunk_stream(2); + + // Assert + ASSERT_TRUE(has_second_batch); + + EXPECT_EQ(trimmed_stream0, restored_stream0); + EXPECT_EQ(trimmed_stream1, restored_stream1); + EXPECT_EQ(trimmed_stream2, restored_stream2); + + EXPECT_TRUE(storage_.unloaded_series_bitmap.is_set(0)); + EXPECT_TRUE(storage_.unloaded_series_bitmap.is_set(1)); + EXPECT_TRUE(storage_.unloaded_series_bitmap.is_set(2)); +} + +} // namespace diff --git a/pp/entrypoint/types/lss_tests.cpp b/pp/entrypoint/types/lss_tests.cpp new file mode 100644 index 000000000..fde5bd689 --- /dev/null +++ b/pp/entrypoint/types/lss_tests.cpp @@ -0,0 +1,221 @@ +#include + +#include +#include +#include + +#include "bare_bones/exception.h" +#include "bare_bones/vector.h" +#include "entrypoint/types/lss.h" +#include "primitives/label_set.h" +#include "series_index/queryable_encoding_bimap.h" + +namespace { + +using entrypoint::types::create_lss; +using entrypoint::types::create_snapshot_lss; +using entrypoint::types::EncodingBimap; +using entrypoint::types::LssType; +using entrypoint::types::QueryableEncodingBimap; +using entrypoint::types::ReallocationsDetector; +using entrypoint::types::ShrinkAwareSnapshotLSS; +using entrypoint::types::SnapshotLSS; +using PromPP::Primitives::LabelViewSet; + +TEST(LssTest, CreateLssEncodingBimapSelectsExpectedAlternative) { + // Arrange + + // Act + const auto lss = create_lss(LssType::kEncodingBimap); + + // Assert + EXPECT_TRUE(std::holds_alternative(*lss)); +} + +TEST(LssTest, CreateLssQueryableEncodingBimapSelectsExpectedAlternative) { + // Arrange + + // Act + const auto lss = create_lss(LssType::kQueryableEncodingBimap); + + // Assert + EXPECT_TRUE(std::holds_alternative(*lss)); +} + +TEST(LssTest, CreateLssRejectsUnknownType) { + // Arrange + const auto unknown_type = static_cast(-1); + + // Act + + // Assert + EXPECT_THROW((void)create_lss(unknown_type), BareBones::Exception); +} + +TEST(LssTest, CreateSnapshotFromEncodingBimapProducesPlainSnapshot) { + // Arrange + auto lss = create_lss(LssType::kEncodingBimap); + std::get(*lss).find_or_emplace(LabelViewSet{{"job", "a"}}); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + EXPECT_TRUE(std::holds_alternative(*snapshot)); +} + +template +using QueryableEncodingBimapCopier = series_index::QueryableEncodingBimapCopier; + +class SnapshotLssFixture : public testing::Test { + protected: + static constexpr uint32_t kShrinkBoundary = 3U; + + const LabelViewSet ls0_{{"job", "a"}}; + const LabelViewSet ls1_{{"job", "b"}}; + const LabelViewSet ls2_{{"job", "c"}}; + const LabelViewSet ls3_{{"job", "d"}}; + const LabelViewSet ls4_{{"job", "e"}}; + + entrypoint::types::LssVariantPtr create_queryable_lss() const { + auto lss = create_lss(LssType::kQueryableEncodingBimap); + auto& bimap = std::get(*lss); + + bimap.find_or_emplace(ls0_); + bimap.find_or_emplace(ls1_); + bimap.find_or_emplace(ls2_); + bimap.find_or_emplace(ls3_); + bimap.find_or_emplace(ls4_); + + bimap.build_deferred_indexes(); + + return lss; + } + + entrypoint::types::LssVariantPtr create_fixed_lss() const { + auto lss = create_queryable_lss(); + std::get(*lss).set_pending_shrink_boundary(kShrinkBoundary); + + return lss; + } + + entrypoint::types::LssVariantPtr create_shrunk_lss() const { + QueryableEncodingBimap seeded_lss; + seeded_lss.find_or_emplace(ls0_); + seeded_lss.find_or_emplace(ls1_); + seeded_lss.find_or_emplace(ls2_); + seeded_lss.find_or_emplace(ls3_); + seeded_lss.find_or_emplace(ls4_); + + seeded_lss.build_deferred_indexes(); + + auto lss = create_lss(LssType::kQueryableEncodingBimap); + auto& bimap = std::get(*lss); + BareBones::Vector dst_src_ids_mapping; + QueryableEncodingBimapCopier copier(seeded_lss, seeded_lss.sorting_index(), seeded_lss.added_series(), bimap, dst_src_ids_mapping); + copier.copy_added_series_and_build_indexes(); + + std::ignore = bimap.find_or_emplace(ls1_); + std::ignore = bimap.find_or_emplace(ls3_); + std::ignore = bimap.find_or_emplace(ls4_); + bimap.build_deferred_indexes(); + + dst_src_ids_mapping.clear(); + QueryableEncodingBimap lss_copy; + QueryableEncodingBimapCopier shrink_copier(bimap, bimap.sorting_index(), bimap.added_series(), lss_copy, dst_src_ids_mapping); + shrink_copier.copy_added_series_and_build_indexes(); + bimap.set_pending_shrink_boundary(kShrinkBoundary); + const SnapshotLSS resolve_snapshot(lss_copy); + bimap.finalize_copy_and_shrink(resolve_snapshot, dst_src_ids_mapping); + return lss; + } +}; + +TEST_F(SnapshotLssFixture, ResolvesNormalQueryableLss) { + // Arrange + auto lss = create_queryable_lss(); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + ASSERT_TRUE(std::holds_alternative(*snapshot)); + EXPECT_EQ(ls0_, std::get(*snapshot)[0]); + EXPECT_EQ(ls4_, std::get(*snapshot)[4]); +} + +TEST_F(SnapshotLssFixture, FromFixedQueryableLssIsShrinkAware) { + // Arrange + auto lss = create_fixed_lss(); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + EXPECT_TRUE(std::holds_alternative(*snapshot)); +} + +TEST_F(SnapshotLssFixture, ShrinkAwareResolvesSurvivingPreBoundarySeries) { + // Arrange + auto lss = create_shrunk_lss(); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + ASSERT_TRUE(std::holds_alternative(*snapshot)); + EXPECT_EQ(ls1_, std::get(*snapshot)[1]); +} + +TEST_F(SnapshotLssFixture, ShrinkAwareHidesDroppedPreBoundarySeries) { + // Arrange + auto lss = create_shrunk_lss(); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + ASSERT_TRUE(std::holds_alternative(*snapshot)); + EXPECT_EQ(0U, std::get(*snapshot)[0].size()); + EXPECT_EQ(0U, std::get(*snapshot)[2].size()); +} + +TEST_F(SnapshotLssFixture, ShrinkAwareResolvesPostBoundarySeries) { + // Arrange + auto lss = create_shrunk_lss(); + + // Act + const auto snapshot = create_snapshot_lss(*lss); + + // Assert + ASSERT_TRUE(std::holds_alternative(*snapshot)); + EXPECT_EQ(ls3_, std::get(*snapshot)[3]); + EXPECT_EQ(ls4_, std::get(*snapshot)[4]); +} + +TEST(ReallocationsDetectorTest, ReportsReallocOnEmplace) { + // Arrange + QueryableEncodingBimap lss; + ReallocationsDetector detector(lss); + + // Act + lss.find_or_emplace(LabelViewSet{{"job", "a"}}); + + // Assert + EXPECT_TRUE(detector.has_reallocations()); +} + +TEST(ReallocationsDetectorTest, StaysQuietWithoutChanges) { + // Arrange + QueryableEncodingBimap lss; + lss.find_or_emplace(LabelViewSet{{"job", "a"}}); + lss.build_deferred_indexes(); + + // Act + ReallocationsDetector detector(lss); + + // Assert + EXPECT_FALSE(detector.has_reallocations()); +} + +} // namespace diff --git a/pp/entrypoint/types/querier_tests.cpp b/pp/entrypoint/types/querier_tests.cpp new file mode 100644 index 000000000..8ee230f34 --- /dev/null +++ b/pp/entrypoint/types/querier_tests.cpp @@ -0,0 +1,314 @@ +#include + +#include +#include +#include +#include +#include +#include + +#include "bare_bones/streams.h" +#include "entrypoint/types/querier.h" +#include "series_data/decoder.h" +#include "series_data/encoder.h" +#include "series_data/unloading/loader.h" +#include "series_data/unloading/unloader.h" + +namespace { + +using BareBones::Encoding::Gorilla::STALE_NAN; +using PromPP::Primitives::LabelSetID; +using PromPP::Primitives::Go::Slice; +using series_data::DataStorage; +using series_data::Decoder; +using series_data::Encoder; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::encoder::Sample; +using series_data::encoder::SampleList; +using series_data::unloading::Loader; +using series_data::unloading::Unloader; +using InstantQuerierWrapper = entrypoint::types::InstantQuerierWithArgumentsWrapper, std::span>; +using RangeQuery = series_data::querier::Query>; + +template +class UninitializedMemory { + public: + UninitializedMemory() { std::ranges::fill(storage_, kDefaultValue); } + + ~UninitializedMemory() { + if (!has_default_value()) { + std::destroy_at(ptr()); + } + } + + [[nodiscard]] T* ptr() noexcept { return reinterpret_cast(storage_); } + [[nodiscard]] const T* ptr() const noexcept { return reinterpret_cast(storage_); } + [[nodiscard]] T& value() noexcept { return *ptr(); } + [[nodiscard]] const T& value() const noexcept { return *ptr(); } + [[nodiscard]] bool has_default_value() const noexcept { + return std::ranges::all_of(storage_, [](std::byte byte) { return byte == kDefaultValue; }); + } + + private: + static constexpr auto kDefaultValue = std::byte{0x5a}; + + alignas(T) std::byte storage_[sizeof(T)]; +}; + +class RangeQuerierUninitializedMemoryFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + BareBones::ShrinkedToFitOStringStream unloaded_chunks_; + UninitializedMemory serialized_data_memory_; + + RangeQuery query_for(LabelSetID label_set_id, int64_t min, int64_t max) { + Slice label_set_ids; + label_set_ids.push_back(label_set_id); + return RangeQuery{.time_interval{.min = min, .max = max}, .label_set_ids = std::move(label_set_ids)}; + } + + [[nodiscard]] entrypoint::types::SerializedDataPtr* serialized_data_ptr() noexcept { return serialized_data_memory_.ptr(); } + + void unload_open_chunks() { + Unloader unloader{storage_}; + unloader.create_snapshot(unloaded_chunks_); + unloader.unload(); + } + + void load_unloaded_chunks(LabelSetID label_set_id) { + std::vector label_set_ids{label_set_id}; + Loader loader{storage_, label_set_ids, static_cast(label_set_ids.size())}; + loader.load_next(unloaded_chunks_.span()); + loader.load_finalize(); + } +}; + +TEST_F(RangeQuerierUninitializedMemoryFixture, QueryWritesSerializedDataToPreparedMemory) { + // Arrange + encoder_.encode(0, 1, 1.0); + auto query = query_for(0, 1, 1); + const auto was_default_before_prepare = serialized_data_memory_.has_default_value(); + entrypoint::types::RangeQuerierWithArgumentsWrapperV2 wrapper{storage_, query, serialized_data_ptr()}; + + // Act + wrapper.query(); + + // Assert + EXPECT_TRUE(was_default_before_prepare); + EXPECT_FALSE(serialized_data_memory_.has_default_value()); + ASSERT_NE(nullptr, serialized_data_memory_.value().get()); +} + +TEST_F(RangeQuerierUninitializedMemoryFixture, QueryFinalizeWritesSerializedDataToPreparedMemory) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + + unload_open_chunks(); + + auto query = query_for(0, 1, 3); + const auto was_default_before_prepare = serialized_data_memory_.has_default_value(); + entrypoint::types::RangeQuerierWithArgumentsWrapperV2 wrapper{storage_, query, serialized_data_ptr()}; + + // Act + wrapper.query(); + const auto need_loading = wrapper.need_loading(); + load_unloaded_chunks(0); + wrapper.query_finalize(); + + // Assert + ASSERT_TRUE(need_loading); + EXPECT_TRUE(was_default_before_prepare); + EXPECT_FALSE(serialized_data_memory_.has_default_value()); + ASSERT_NE(nullptr, serialized_data_memory_.value().get()); +} + +class InstantQuerierWrapperFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + BareBones::ShrinkedToFitOStringStream unloaded_chunks_; + std::vector label_set_ids_{0}; + std::vector samples_{Sample{.timestamp = -1, .value = STALE_NAN}}; + + void encode_open_chunk() { + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + encoder_.encode(0, 4, 4.0); + encoder_.encode(0, 5, 5.0); + } + + void unload_open_chunks() { + Unloader unloader{storage_}; + unloader.create_snapshot(unloaded_chunks_); + unloader.unload(); + } + + void load_unloaded_chunks() { + Loader loader{storage_, label_set_ids_, static_cast(label_set_ids_.size())}; + loader.load_next(unloaded_chunks_.span()); + loader.load_finalize(); + } +}; + +TEST_F(InstantQuerierWrapperFixture, QueryReturnsSampleAtTimestamp) { + // Arrange + encode_open_chunk(); + std::span samples_view{samples_}; + InstantQuerierWrapper wrapper{storage_, label_set_ids_, 3, samples_view}; + + // Act + wrapper.query(); + + // Assert + EXPECT_EQ((Sample{.timestamp = 3, .value = 3.0}), samples_[0]); + EXPECT_FALSE(wrapper.need_loading()); +} + +TEST_F(InstantQuerierWrapperFixture, QueryKeepsDefaultSampleWhenSeriesHasNoPointBeforeTimestamp) { + // Arrange + encoder_.encode(0, 10, 10.0); + std::span samples_view{samples_}; + InstantQuerierWrapper wrapper{storage_, label_set_ids_, 5, samples_view}; + + // Act + wrapper.query(); + + // Assert + EXPECT_EQ((Sample{.timestamp = -1, .value = STALE_NAN}), samples_[0]); + EXPECT_FALSE(wrapper.need_loading()); +} + +TEST_F(InstantQuerierWrapperFixture, QueryRequestsLoadingForUnloadedSeriesThenFinalizeReturnsSample) { + // Arrange + encode_open_chunk(); + unload_open_chunks(); + + std::span samples_view{samples_}; + InstantQuerierWrapper wrapper{storage_, label_set_ids_, 3, samples_view}; + + // Act + wrapper.query(); + const auto need_loading = wrapper.need_loading(); + const auto series_to_load_0 = wrapper.series_to_load().is_set(0); + load_unloaded_chunks(); + wrapper.query_finalize(); + + // Assert + ASSERT_TRUE(need_loading); + EXPECT_TRUE(series_to_load_0); + EXPECT_EQ((Sample{.timestamp = 3, .value = 3.0}), samples_[0]); +} + +class RangeQuerierWrapperFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + BareBones::ShrinkedToFitOStringStream unloaded_chunks_; + UninitializedMemory serialized_data_memory_; + entrypoint::types::SerializedDataPtr serialized_data_; + + RangeQuery query_for(LabelSetID label_set_id, int64_t min, int64_t max) { + Slice label_set_ids; + label_set_ids.push_back(label_set_id); + return RangeQuery{.time_interval{.min = min, .max = max}, .label_set_ids = std::move(label_set_ids)}; + } + + [[nodiscard]] SampleList decode_chunk(uint32_t chunk_id) const { + SampleList decoded; + std::ranges::copy(serialized_data_->iterator(chunk_id), DecodeIteratorSentinel{}, std::back_inserter(decoded)); + return decoded; + } + + [[nodiscard]] entrypoint::types::SerializedDataPtr* serialized_data_ptr() noexcept { return serialized_data_memory_.ptr(); } + + void take_serialized_data() { serialized_data_ = std::move(serialized_data_memory_.value()); } + + void unload_open_chunks() { + Unloader unloader{storage_}; + unloader.create_snapshot(unloaded_chunks_); + unloader.unload(); + } + + void load_unloaded_chunks(LabelSetID label_set_id) { + std::vector label_set_ids{label_set_id}; + Loader loader{storage_, label_set_ids, static_cast(label_set_ids.size())}; + loader.load_next(unloaded_chunks_.span()); + loader.load_finalize(); + } +}; + +TEST_F(RangeQuerierWrapperFixture, QuerySerializesMatchingOpenChunk) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + encoder_.encode(0, 4, 4.0); + encoder_.encode(0, 5, 5.0); + + auto query = query_for(0, 2, 4); + entrypoint::types::RangeQuerierWithArgumentsWrapperV2 wrapper{storage_, query, serialized_data_ptr()}; + + // Act + wrapper.query(); + take_serialized_data(); + const auto decoded = decode_chunk(0); + + // Assert + ASSERT_FALSE(wrapper.need_loading()); + ASSERT_NE(nullptr, serialized_data_); + ASSERT_EQ(1U, serialized_data_->get_chunks_view().size()); + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}}), decoded); +} + +TEST_F(RangeQuerierWrapperFixture, QuerySerializesEmptyResultWhenSeriesDoesNotMatchInterval) { + // Arrange + encoder_.encode(0, 10, 10.0); + auto query = query_for(0, 1, 5); + entrypoint::types::RangeQuerierWithArgumentsWrapperV2 wrapper{storage_, query, serialized_data_ptr()}; + + // Act + wrapper.query(); + take_serialized_data(); + + // Assert + ASSERT_FALSE(wrapper.need_loading()); + ASSERT_NE(nullptr, serialized_data_); + EXPECT_EQ(0U, serialized_data_->get_chunks_view().size()); +} + +TEST_F(RangeQuerierWrapperFixture, QueryDefersSerializationUntilUnloadedSeriesIsLoaded) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + + unload_open_chunks(); + + auto query = query_for(0, 1, 3); + entrypoint::types::RangeQuerierWithArgumentsWrapperV2 wrapper{storage_, query, serialized_data_ptr()}; + + // Act + wrapper.query(); + + const auto need_loading = wrapper.need_loading(); + const auto series_to_load_0 = wrapper.series_to_load().is_set(0); + const auto was_default_before_finalize = serialized_data_memory_.has_default_value(); + + load_unloaded_chunks(0); + wrapper.query_finalize(); + take_serialized_data(); + + // Assert + ASSERT_TRUE(need_loading); + EXPECT_TRUE(series_to_load_0); + EXPECT_TRUE(was_default_before_finalize); + ASSERT_NE(nullptr, serialized_data_); + ASSERT_EQ(1U, serialized_data_->get_chunks_view().size()); + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}}), decode_chunk(0)); +} + +} // namespace diff --git a/pp/entrypoint/types/serialized_data_tests.cpp b/pp/entrypoint/types/serialized_data_tests.cpp new file mode 100644 index 000000000..5ace44325 --- /dev/null +++ b/pp/entrypoint/types/serialized_data_tests.cpp @@ -0,0 +1,120 @@ +#include + +#include + +#include "entrypoint/types/serialized_data.h" +#include "series_data/chunk_finalizer.h" +#include "series_data/decoder/traits.h" +#include "series_data/encoder.h" +#include "series_data/encoder/sample.h" +#include "series_data/querier/querier.h" + +namespace { + +using series_data::ChunkFinalizer; +using series_data::DataStorage; +using series_data::Encoder; +using series_data::decoder::DecodeIteratorSentinel; +using series_data::encoder::Sample; +using series_data::encoder::SampleList; +using series_data::querier::Querier; +using series_data::serialization::SerializedDataView; +using Query = series_data::querier::Query>; + +class SerializedDataGoFixture : public testing::Test { + protected: + DataStorage storage_; + Encoder<> encoder_{storage_}; + Querier querier_{storage_}; + + [[nodiscard]] static SampleList decode_chunk(const entrypoint::types::SerializedDataGo& data, uint32_t chunk_id) { + SampleList decoded; + std::ranges::copy(data.iterator(chunk_id), DecodeIteratorSentinel{}, std::back_inserter(decoded)); + return decoded; + } +}; + +TEST_F(SerializedDataGoFixture, EmptyQueriedChunkListProducesNoChunks) { + // Arrange + + // Act + entrypoint::types::SerializedDataGo data{storage_, {}}; + + // Assert + EXPECT_EQ(0U, data.get_chunks_view().size()); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, data.next().first); +} + +TEST_F(SerializedDataGoFixture, RoundTripsQueriedOpenChunk) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + encoder_.encode(0, 4, 4.0); + encoder_.encode(0, 5, 5.0); + + const auto& queried_chunks = querier_.query(Query{.time_interval{.min = 1, .max = 5}, .label_set_ids = {0}}); + + // Act + entrypoint::types::SerializedDataGo data{storage_, queried_chunks}; + const auto next_series = data.next(); + const auto decoded = decode_chunk(data, 0); + + // Assert + ASSERT_EQ(1U, data.get_chunks_view().size()); + EXPECT_EQ(0U, next_series.first); + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}}), decoded); +} + +TEST_F(SerializedDataGoFixture, RoundTripsQueriedFinalizedChunk) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + encoder_.encode(0, 4, 4.0); + encoder_.encode(0, 5, 5.0); + ChunkFinalizer::finalize(storage_, 0, storage_.open_chunks[0]); + + encoder_.encode(0, 10, 10.0); + + const auto& queried_chunks = querier_.query(Query{.time_interval{.min = 1, .max = 5}, .label_set_ids = {0}}); + + // Act + entrypoint::types::SerializedDataGo data{storage_, queried_chunks}; + const auto next_series = data.next(); + const auto decoded = decode_chunk(data, 0); + + // Assert + ASSERT_EQ(1U, data.get_chunks_view().size()); + EXPECT_EQ(0U, next_series.first); + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}}), decoded); +} + +TEST_F(SerializedDataGoFixture, NextReturnsChunkIdsForAllQueriedSeries) { + // Arrange + encoder_.encode(0, 1, 1.0); + encoder_.encode(0, 2, 2.0); + encoder_.encode(0, 3, 3.0); + + encoder_.encode(1, 1, 11.0); + encoder_.encode(1, 2, 12.0); + encoder_.encode(1, 3, 13.0); + + const auto& queried_chunks = querier_.query(Query{.time_interval{.min = 1, .max = 3}, .label_set_ids = {0, 1}}); + + // Act + entrypoint::types::SerializedDataGo data{storage_, queried_chunks}; + const auto series0 = data.next(); + const auto series1 = data.next(); + const auto end = data.next(); + + // Assert + ASSERT_EQ(2U, data.get_chunks_view().size()); + EXPECT_EQ(0U, series0.first); + EXPECT_EQ(1U, series1.first); + EXPECT_EQ(SerializedDataView::kNoMoreSeries, end.first); + EXPECT_EQ((SampleList{{1, 1.0}, {2, 2.0}, {3, 3.0}}), decode_chunk(data, series0.second)); + EXPECT_EQ((SampleList{{1, 11.0}, {2, 12.0}, {3, 13.0}}), decode_chunk(data, series1.second)); +} + +} // namespace