Skip to content
Draft
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
90d0a34
changed logic of IntervalDecodeIterator
cherep58 Dec 3, 2025
fcd61e5
add longterm
u-veles-a Dec 3, 2025
31fc7a6
added downsamplingMs parameter into Go-bindings
cherep58 Dec 3, 2025
4ee8815
remove lookbackDelta
u-veles-a Dec 3, 2025
664394d
added GO-test for downsampling
cherep58 Dec 4, 2025
421bab3
Merge branch 'downsampling' of https://github.com/deckhouse/prompp in…
cherep58 Dec 4, 2025
70c874e
optimized IntervalDecodeIterator
cherep58 Dec 4, 2025
1c4dbcd
renamed IntervalDecodeIterator to DownsamplingDecodeIterator
cherep58 Dec 5, 2025
bba84b7
optimized DownsamplingDecodeIterator
cherep58 Dec 5, 2025
88eae42
review fixes
cherep58 Dec 5, 2025
f24c038
review fixes
cherep58 Dec 8, 2025
5d5d628
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 8, 2025
e2f38d7
removed std::assignable_from from AssignableFromUniversaleDecodeItera…
cherep58 Dec 8, 2025
f8aa910
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 25, 2025
5087d19
fix after merge
u-veles-a Dec 26, 2025
b6509c2
Merge branch 'pp' into downsampling
u-veles-a Dec 26, 2025
97e937e
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Dec 29, 2025
03cc0fb
Merge branch 'pp' into downsampling
cherep58 Dec 30, 2025
28aab09
longterm blockwriter
u-veles-a Jan 12, 2026
b52a21b
fix
u-veles-a Jan 13, 2026
ab463e2
Merge branch 'pp' into downsampling
u-veles-a Jan 13, 2026
5006155
added downsampling feature in ChunkRecoder
cherep58 Jan 13, 2026
77c5ab1
Merge branch 'pp' of https://github.com/deckhouse/prompp into downsam…
cherep58 Jan 13, 2026
f1c7b28
fix compactor
u-veles-a Jan 13, 2026
858d5cf
fixed compilation error
cherep58 Jan 13, 2026
2227a87
fixed compilation error
cherep58 Jan 13, 2026
3c13156
Merge branch 'pp' into downsampling
u-veles-a Jan 26, 2026
fcf69b9
fix after merge
u-veles-a Jan 27, 2026
b9a0b12
Merge branch 'pp' into downsampling
cherep58 Feb 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
550 changes: 532 additions & 18 deletions cmd/prometheus/main.go

Large diffs are not rendered by default.

86 changes: 79 additions & 7 deletions pp-pkg/storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Adapter struct {
hashdexLimits cppbridge.WALHashdexLimits
transparentState *cppbridge.StateV2
mergeOutOfOrderChunks func()
longtermIntervalMs int64

// stat
activeQuerierMetrics *querier.Metrics
Expand All @@ -41,12 +42,51 @@ type Adapter struct {
samplesAppended prometheus.Counter
}

// NewAdapter init new [Adapter].
// NewAdapter init new main [Adapter].
func NewAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
mergeOutOfOrderChunks,
0,
querier.QueryableAppenderSource,
querier.QueryableStorageSource,
registerer,
)
}

// NewLongtermAdapter init new longterm [Adapter].
func NewLongtermAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
registerer prometheus.Registerer,
) *Adapter {
return newAdapter(
clock,
proxy,
mergeOutOfOrderChunks,
longtermIntervalMs,
querier.QueryableLongtermAppenderSource,
querier.QueryableLongtermStorageSource,
registerer,
)
}

// newAdapter init new [Adapter].
func newAdapter(
clock clockwork.Clock,
proxy *pp_storage.Proxy,
mergeOutOfOrderChunks func(),
longtermIntervalMs int64,
activeSource, storageSource string,
registerer prometheus.Registerer,
) *Adapter {
factory := util.NewUnconflictRegisterer(registerer)
return &Adapter{
Expand All @@ -56,8 +96,9 @@ func NewAdapter(
hashdexLimits: cppbridge.DefaultWALHashdexLimits(),
transparentState: cppbridge.NewTransitionStateV2(),
mergeOutOfOrderChunks: mergeOutOfOrderChunks,
activeQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableAppenderSource),
storageQuerierMetrics: querier.NewMetrics(registerer, querier.QueryableStorageSource),
longtermIntervalMs: longtermIntervalMs,
activeQuerierMetrics: querier.NewMetrics(registerer, activeSource),
storageQuerierMetrics: querier.NewMetrics(registerer, storageSource),
appendDuration: factory.NewHistogram(
prometheus.HistogramOpts{
Name: "prompp_adapter_append_duration",
Expand Down Expand Up @@ -219,7 +260,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewChunkQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -229,7 +277,14 @@ func (ar *Adapter) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error)

queriers = append(
queriers,
querier.NewChunkQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil),
querier.NewChunkQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
),
)
}

Expand All @@ -254,6 +309,7 @@ func (ar *Adapter) HeadQuerier(mint, maxt int64) (storage.Querier, error) {
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
), nil
Expand Down Expand Up @@ -281,7 +337,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {
ahead := ar.proxy.Get()
queriers = append(
queriers,
querier.NewQuerier(ahead, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.activeQuerierMetrics),
querier.NewQuerier(
ahead,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.activeQuerierMetrics,
),
)

for _, head := range ar.proxy.Heads() {
Expand All @@ -291,7 +355,15 @@ func (ar *Adapter) Querier(mint, maxt int64) (storage.Querier, error) {

queriers = append(
queriers,
querier.NewQuerier(head, querier.NewNoOpShardedDeduplicator, mint, maxt, nil, ar.storageQuerierMetrics),
querier.NewQuerier(
head,
querier.NewNoOpShardedDeduplicator,
mint,
maxt,
ar.longtermIntervalMs,
nil,
ar.storageQuerierMetrics,
),
)
}

Expand Down
1 change: 1 addition & 0 deletions pp-pkg/tsdb/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tsdb

import (
"context"

"github.com/go-kit/log"
"github.com/oklog/ulid"
"github.com/prometheus/client_golang/prometheus"
Expand Down
2 changes: 1 addition & 1 deletion pp/entrypoint/go_constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
#define Sizeof_RoaringBitset 40
#define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset)

#define Sizeof_SerializedDataIterator 192
#define Sizeof_SerializedDataIterator 200
16 changes: 12 additions & 4 deletions pp/entrypoint/head/serialization.h
Original file line number Diff line number Diff line change
@@ -1,27 +1,35 @@
#pragma once

#include "series_data/decoder/decorator/downsampling_decode_iterator.h"
#include "series_data/serialization/serialized_data.h"

namespace entrypoint::head {

using DecodeIterator = series_data::decoder::decorator::DownsamplingDecodeIterator<series_data::decoder::UniversalDecodeIterator>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator<DecodeIterator>;

class SerializedDataGo {
public:
explicit SerializedDataGo(const series_data::DataStorage& storage, const series_data::querier::QueriedChunkList& queried_chunks)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)} {}
explicit SerializedDataGo(const series_data::DataStorage& storage,
const series_data::querier::QueriedChunkList& queried_chunks,
PromPP::Primitives::Timestamp downsampling_ms)
: data_{series_data::serialization::DataSerializer{storage}.serialize(queried_chunks)}, downsampling_ms_(downsampling_ms) {}

[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_buffer_view() const noexcept { return data_view_.get_buffer_view(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto get_chunks_view() const noexcept { return data_view_.get_chunks_view(); }

[[nodiscard]] PROMPP_ALWAYS_INLINE auto next() noexcept { return data_view_.next_series(); }
[[nodiscard]] PROMPP_ALWAYS_INLINE auto iterator(uint32_t chunk_id) const noexcept { return data_view_.create_series_iterator(chunk_id); }
[[nodiscard]] PROMPP_ALWAYS_INLINE SerializedDataIterator iterator(uint32_t chunk_id) const noexcept {
return data_view_.create_series_iterator<DecodeIterator>(chunk_id, DecodeIterator(downsampling_ms_));
}

private:
series_data::serialization::SerializedData data_;
series_data::serialization::SerializedDataView data_view_{data_};
PromPP::Primitives::Timestamp downsampling_ms_{};
};

using SerializedDataPtr = std::unique_ptr<SerializedDataGo>;
using SerializedDataIterator = series_data::serialization::SerializedDataView::SeriesIterator;

static_assert(sizeof(SerializedDataPtr) == sizeof(void*));

Expand Down
10 changes: 7 additions & 3 deletions pp/entrypoint/series_data/querier.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,11 @@ class RangeQuerierWithArgumentsWrapperV2 {
using BytesStream = PromPP::Primitives::Go::BytesStream;

public:
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage, const Query& query, head::SerializedDataPtr* serialized_data)
: querier_(storage), query_(&query), serialized_data_(serialized_data) {}
RangeQuerierWithArgumentsWrapperV2(DataStorage& storage,
const Query& query,
head::SerializedDataPtr* serialized_data,
PromPP::Primitives::Timestamp downsampling_ms)
: querier_(storage), query_(&query), serialized_data_(serialized_data), downsampling_ms_(downsampling_ms) {}

void query() noexcept {
querier_.query(*query_);
Expand All @@ -118,9 +121,10 @@ class RangeQuerierWithArgumentsWrapperV2 {
::series_data::querier::Querier querier_;
const Query* query_;
head::SerializedDataPtr* serialized_data_;
PromPP::Primitives::Timestamp downsampling_ms_;

PROMPP_ALWAYS_INLINE void serialize_chunks() const noexcept {
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks()));
std::construct_at(serialized_data_, std::make_unique<head::SerializedDataGo>(querier_.get_storage(), querier_.chunks(), downsampling_ms_));
}
};

Expand Down
3 changes: 2 additions & 1 deletion pp/entrypoint/series_data_data_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
struct Arguments {
DataStoragePtr data_storage;
Query query;
PromPP::Primitives::Timestamp downsampling_ms;
};

struct Result {
Expand All @@ -162,7 +163,7 @@ extern "C" void prompp_series_data_data_storage_query_v2(void* args, void* res)
const auto in = static_cast<Arguments*>(args);
const auto out = static_cast<Result*>(res);

RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data);
RangeQuerierWithArgumentsWrapperV2 querier(*in->data_storage, in->query, out->serialized_data, in->downsampling_ms);
querier.query();

if (querier.need_loading()) {
Expand Down
5 changes: 3 additions & 2 deletions pp/entrypoint/series_data_data_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,13 @@ void prompp_series_data_data_storage_query(void* args, void* res);
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed.
* querier uintptr // pointer to constructed Querier if data loading is needed.
* // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final.
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
Expand Down
9 changes: 5 additions & 4 deletions pp/go/cppbridge/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1856,11 +1856,12 @@ type DataStorageQueryResult struct {
SerializedData *DataStorageSerializedData
}

func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData) (querier uintptr, status uint8) {
func seriesDataDataStorageQueryV2(dataStorage uintptr, query HeadDataStorageQuery, serializedData *DataStorageSerializedData, downsamplingMs int64) (querier uintptr, status uint8) {
args := struct {
dataStorage uintptr
query HeadDataStorageQuery
}{dataStorage, query}
dataStorage uintptr
query HeadDataStorageQuery
downsamplingMs int64
}{dataStorage, query, downsamplingMs}

var res = struct {
Querier uintptr
Expand Down
7 changes: 4 additions & 3 deletions pp/go/cppbridge/entrypoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ void prompp_dump_memory_profile(void* args, void* res);
#define Sizeof_RoaringBitset 40
#define Sizeof_InnerSeries (Sizeof_SizeT + Sizeof_BareBonesVector + Sizeof_RoaringBitset)

#define Sizeof_SerializedDataIterator 192
#define Sizeof_SerializedDataIterator 200
#ifdef __cplusplus
extern "C" {
#endif
Expand Down Expand Up @@ -1349,12 +1349,13 @@ void prompp_series_data_data_storage_query(void* args, void* res);
* @param args {
* dataStorage uintptr // pointer to constructed data storage
* query DataStorageQuery // query
* downsamplingMs int64 // downsampling interval in milliseconds (0 - downsampling is disabled)
* }
*
* @param res {
* Querier uintptr // pointer to constructed Querier if data loading is needed.
* querier uintptr // pointer to constructed Querier if data loading is needed.
* // If constructed (!= 0) it must be destroyed by calling prompp_series_data_data_storage_query_final.
* Status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* status uint8 // status of a query (0 - Success, 1 - Data loading is needed)
* serializedData uintptr // pointer to serialized data
* }
*/
Expand Down
23 changes: 16 additions & 7 deletions pp/go/cppbridge/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ const (
NormalNaN uint64 = 0x7ff8000000000001

StaleNaN uint64 = 0x7ff0000000000002

NoDownsampling = 0
)

func IsStaleNaN(v float64) bool {
Expand Down Expand Up @@ -314,9 +316,9 @@ func (i HeadDataStorageSerializedChunkIndex) Chunks(r *HeadDataStorageSerialized
return res
}

func (ds *HeadDataStorage) Query(query HeadDataStorageQuery) DataStorageQueryResult {
func (ds *HeadDataStorage) Query(query HeadDataStorageQuery, downsamplingMs int64) DataStorageQueryResult {
sd := NewDataStorageSerializedData()
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd)
querier, status := seriesDataDataStorageQueryV2(ds.dataStorage, query, sd, downsamplingMs)
return DataStorageQueryResult{
Querier: querier,
Status: status,
Expand Down Expand Up @@ -356,10 +358,9 @@ func (sd *DataStorageSerializedData) Next() (uint32, uint32) {
}

type DataStorageSerializedDataIteratorControlBlock struct {
decoderVariant uint64
Timestamp int64
Value float64
remainingSamples uint8
decodedTimestamp int64
timestamp int64
value float64
}

type DataStorageSerializedDataIterator struct {
Expand All @@ -386,7 +387,15 @@ func (it *DataStorageSerializedDataIterator) Reset(serializedData *DataStorageSe
}

func (it *DataStorageSerializedDataIterator) HasData() bool {
return it.remainingSamples != 0
return it.decodedTimestamp != math.MinInt64
}

func (it *DataStorageSerializedDataIterator) Timestamp() int64 {
return it.timestamp
}

func (it *DataStorageSerializedDataIterator) Value() float64 {
return it.value
}

// UnloadedDataLoader is Go wrapper around series_data::Loader.
Expand Down
6 changes: 3 additions & 3 deletions pp/go/cppbridge/head_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,8 @@ func (s *HeadSuite) TestSerializedChunkRecoder() {
result := s.dataStorage.Query(cppbridge.HeadDataStorageQuery{
StartTimestampMs: timeInterval.MinT,
EndTimestampMs: timeInterval.MaxT,
LabelSetIDs: []uint32{0, 1}},
)
LabelSetIDs: []uint32{0, 1},
}, cppbridge.NoDownsampling)
recoder := cppbridge.NewSerializedChunkRecoder(result.SerializedData, timeInterval)

// Act
Expand Down Expand Up @@ -179,7 +179,7 @@ func (s *HeadSuite) TestInstantQuery() {
// Arrange
dataStorage := cppbridge.NewHeadDataStorage()
encoder := cppbridge.NewHeadEncoderWithDataStorage(dataStorage)
var series = []struct {
series := []struct {
SeriesID uint32
cppbridge.Sample
}{
Expand Down
2 changes: 1 addition & 1 deletion pp/go/storage/appender/appender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (s *AppenderSuite) getHeadData(labelSetIDs []uint32) headStorageData {
StartTimestampMs: 0,
EndTimestampMs: math.MaxInt64,
LabelSetIDs: labelSetIDs,
})
}, cppbridge.NoDownsampling)
data.dsResult = append(data.dsResult, dsResult)

data.shards = append(data.shards, storageData{
Expand Down
Loading
Loading