Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
230 changes: 137 additions & 93 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ type flagConfig struct {
WalCommitInterval model.Duration
WalMaxSamplesPerSegment uint32
HeadRetentionTimeout model.Duration
UseBlockManagerStorage bool

featureList []string
memlimitRatio float64
Expand Down Expand Up @@ -314,7 +315,8 @@ func main() {
Registerer: prometheus.DefaultRegisterer,
Gatherer: prometheus.DefaultGatherer,
},
promlogConfig: promlog.Config{},
promlogConfig: promlog.Config{},
UseBlockManagerStorage: false,
}

a := kingpin.New(filepath.Base(os.Args[0]), "The Prom++ monitoring server").UsageWriter(os.Stdout)
Expand Down Expand Up @@ -562,7 +564,7 @@ func main() {

logger := promlog.New(&cfg.promlogConfig)

readPromPPFeatures(logger)
readPromPPFeatures(logger, &cfg)

if err := cfg.setFeatureListOptions(logger); err != nil {
fmt.Fprintln(os.Stderr, fmt.Errorf("Error parsing feature list: %w", err))
Expand Down Expand Up @@ -822,56 +824,110 @@ func main() {
scraper := &readyScrapeManager{}

// PP_CHANGES.md: rebuild on cpp start
// In server mode the persisted blocks are managed by block.Manager (read +
// retention) and block.Compactor (compaction) instead of a tsdb.DB. The
// block.Manager is wired into the fanout via a querier-only storage.Storage
// adapter; localStorage stays an empty stub. In agent mode the secondary is
// still localStorage (agent.DB set later).
// Server mode supports two historical-block storage schemes selected by the
// PROMPP_FEATURES=enable_block_manager feature flag:
// 1) enabled: block.Manager + block.Compactor for persisted blocks.
// 2) disabled (default): pre-PR-377 mode with tsdb.DB serving persisted blocks.
// In both modes, PP head manager + adapter remain the write path.
var (
blockManager *block.Manager
blockCompactor *block.Compactor
tsdbHistorical *tsdbHistoricalStorage
compactCancel context.CancelFunc
persistedStorage storage.Storage = localStorage
startTimeFn func() (int64, error) = localStorage.StartTime
)
if !agentMode {
retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond)
blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete(
retentionMs,
int64(cfg.tsdb.MaxBytes),
pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog),
prometheus.DefaultRegisterer,
)
blockManager, err = block.NewManager(localStoragePath, &block.Options{
RetentionDuration: retentionMs,
CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to initialize block manager", "err", err)
// Storage is constructed eagerly here for both schemes. The historical
// path does no WAL replay (the PP head + adapter is the only write path),
// so opening it is as cheap as the block manager's initial reload; there is
// no need to defer the open into the run group or gate startup on it.
if cfg.tsdb.WALSegmentSize != 0 && (cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024) {
level.Error(logger).Log("msg", "flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
os.Exit(1)
}

var compactCtx context.Context
compactCtx, compactCancel = context.WithCancel(context.Background())
blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{
MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond),
MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to create block compactor", "err", err)
if cfg.tsdb.MaxBlockChunkSegmentSize != 0 && cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
level.Error(logger).Log("msg", "flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
os.Exit(1)
}
switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}

bs := &blockStorage{m: blockManager, onClose: func() error {
blockCompactor.Close()
blockManager.Close()
compactCancel()
return nil
}}
persistedStorage = bs
startTimeFn = bs.StartTime
if cfg.UseBlockManagerStorage {
level.Info(logger).Log("msg", "Using block-manager storage scheme")
level.Debug(logger).Log("msg", "Block storage options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration,
"EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction,
)
retentionMs := int64(time.Duration(cfg.tsdb.RetentionDuration) / time.Millisecond)
blocksToDelete := pp_pkg_tsdb.NewBlocksToDelete(
retentionMs,
int64(cfg.tsdb.MaxBytes),
pp_pkg_tsdb.CatalogHeadsExtraSize(dataDir, headCatalog),
prometheus.DefaultRegisterer,
)
blockManager, err = block.NewManager(localStoragePath, &block.Options{
RetentionDuration: retentionMs,
CorruptedRetentionDuration: time.Duration(cfg.tsdb.CorruptedRetentionDuration),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blocksToDelete, log.With(logger, "component", "blockmanager"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to initialize block manager", "err", err)
os.Exit(1)
}

var compactCtx context.Context
compactCtx, compactCancel = context.WithCancel(context.Background())
blockCompactor, err = block.NewCompactor(compactCtx, localStoragePath, &block.CompactorOptions{
MinBlockDuration: int64(time.Duration(cfg.tsdb.MinBlockDuration) / time.Millisecond),
MaxBlockDuration: int64(time.Duration(cfg.tsdb.MaxBlockDuration) / time.Millisecond),
MaxBlockChunkSegmentSize: int64(cfg.tsdb.MaxBlockChunkSegmentSize),
EnableOverlappingCompaction: cfg.tsdb.EnableOverlappingCompaction,
}, blockManager, log.With(logger, "component", "blockcompactor"), prometheus.DefaultRegisterer)
if err != nil {
level.Error(logger).Log("msg", "failed to create block compactor", "err", err)
os.Exit(1)
}
// Drive compaction from the manager's single reload loop so compact
// and block deletion never run concurrently.
blockManager.SetCompactor(blockCompactor)

bs := &blockStorage{m: blockManager, onClose: func() error {
// Cancel any in-flight leveled compaction first so the manager
// loop can return promptly, then stop the loop and close blocks.
compactCancel()
blockManager.Close()
return nil
}}
persistedStorage = bs
startTimeFn = bs.StartTime
} else {
level.Info(logger).Log("msg", "Using pre-PR-377 historical TSDB storage scheme")
opts := cfg.tsdb.ToTSDBOptions()
db, err := tsdb.Open(localStoragePath, logger, prometheus.DefaultRegisterer, &opts, localStorage.stats)
if err != nil {
level.Error(logger).Log("msg", "opening storage failed", "err", err)
os.Exit(1)
}
tsdbHistorical = &tsdbHistoricalStorage{db: db}
persistedStorage = tsdbHistorical
startTimeFn = tsdbHistorical.StartTime
level.Info(logger).Log("msg", "TSDB storage started")
level.Debug(logger).Log("msg", "TSDB options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBlockDuration", cfg.tsdb.MaxBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"WALCompression", cfg.tsdb.WALCompression,
)
}
}

remoteRead := pp_pkg_remote.NewRemoteRead(
Expand Down Expand Up @@ -1197,9 +1253,8 @@ func main() {
prometheus.MustRegister(configSuccess)
prometheus.MustRegister(configSuccessTime)

// Start all components while we wait for TSDB to open but only load
// initial config and mark ourselves as ready after it completed.
dbOpen := make(chan struct{})
// Storage is opened eagerly during setup, so components can start and load
// the initial config without waiting on a storage-open signal.

// sync.Once is used to make sure we can close the channel at different execution stages(SIGTERM or when the config is loaded).
type closeOnce struct {
Expand Down Expand Up @@ -1397,14 +1452,6 @@ func main() {
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
reloadReady.Close()
return nil
}

if err := reloadConfig(cfg.configFile, cfg.enableExpandExternalLabels, cfg.tsdb.EnableExemplarStorage, logger, noStepSubqueryInterval, reloaders...); err != nil {
return fmt.Errorf("error loading config from %q: %w", cfg.configFile, err)
}
Expand All @@ -1422,48 +1469,18 @@ func main() {
)
}
if !agentMode {
// Persisted block storage (block.Manager + block.Compactor). The tsdb.DB
// is disabled in server mode; persisted blocks are read via block.Manager
// and compacted by block.Compactor, both started above. This actor only
// signals readiness and tears the storage down on shutdown.
// Storage is opened eagerly during setup (see above), so this actor only
// waits for shutdown and then closes the fanout, which closes the
// historical backend.
// PP_CHANGES.md: rebuild on cpp
cancel := make(chan struct{})
g.Add(
func() error {
level.Info(logger).Log("msg", "Starting persisted block storage ...")
if cfg.tsdb.WALSegmentSize != 0 {
if cfg.tsdb.WALSegmentSize < 10*1024*1024 || cfg.tsdb.WALSegmentSize > 256*1024*1024 {
return errors.New("flag 'storage.tsdb.wal-segment-size' must be set between 10MB and 256MB")
}
}
if cfg.tsdb.MaxBlockChunkSegmentSize != 0 {
if cfg.tsdb.MaxBlockChunkSegmentSize < 1024*1024 {
return errors.New("flag 'storage.tsdb.max-block-chunk-segment-size' must be set over 1MB")
}
}

switch fsType := prom_runtime.Statfs(localStoragePath); fsType {
case "NFS_SUPER_MAGIC":
level.Warn(logger).Log("fs_type", fsType, "msg", "This filesystem is not supported and may lead to data corruption and data loss. Please carefully read https://prometheus.io/docs/prometheus/latest/storage/ to learn more about supported filesystems.")
default:
level.Info(logger).Log("fs_type", fsType)
}

level.Info(logger).Log("msg", "Persisted block storage started")
level.Debug(logger).Log("msg", "Block storage options",
"MinBlockDuration", cfg.tsdb.MinBlockDuration,
"MaxBytes", cfg.tsdb.MaxBytes,
"RetentionDuration", cfg.tsdb.RetentionDuration,
"CorruptedRetentionDuration", cfg.tsdb.CorruptedRetentionDuration,
"EnableOverlappingCompaction", cfg.tsdb.EnableOverlappingCompaction,
)

close(dbOpen)
<-cancel
return nil
},
func(err error) {
// Closes adapter (head) + blockStorage (block.Manager + block.Compactor) + remoteRead.
// Closes adapter (head) + historical storage backend + remoteRead.
if err := fanoutStorage.Close(); err != nil {
level.Error(logger).Log("msg", "Error stopping storage", "err", err)
}
Expand Down Expand Up @@ -1514,7 +1531,6 @@ func main() {

localStorage.Set(db, 0)
// db.SetWriteNotified(remoteStorage) // PP_CHANGES.md: rebuild on cpp
close(dbOpen)
<-cancel
return nil
},
Expand Down Expand Up @@ -1545,13 +1561,6 @@ func main() {
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-dbOpen:
// In case a shutdown is initiated before the dbOpen is released
case <-cancel:
return nil
}

return hManager.Run()
},
func(err error) {
Expand Down Expand Up @@ -1793,6 +1802,35 @@ func (b *blockStorage) StartTime() (int64, error) {
return math.MaxInt64, nil
}

// tsdbHistoricalStorage adapts a tsdb.DB to serve persisted blocks as a
// fanout secondary, while dropping appends so writes stay on the PP head path.
type tsdbHistoricalStorage struct {
db *tsdb.DB
}

func (s *tsdbHistoricalStorage) Querier(mint, maxt int64) (storage.Querier, error) {
return s.db.Querier(mint, maxt)
}

func (s *tsdbHistoricalStorage) ChunkQuerier(mint, maxt int64) (storage.ChunkQuerier, error) {
return s.db.ChunkQuerier(mint, maxt)
}

func (s *tsdbHistoricalStorage) Appender(context.Context) storage.Appender {
return noopAppender{}
}

func (s *tsdbHistoricalStorage) Close() error {
return s.db.Close()
}

func (s *tsdbHistoricalStorage) StartTime() (int64, error) {
if blocks := s.db.Blocks(); len(blocks) > 0 {
return blocks[0].Meta().MinTime, nil
}
return math.MaxInt64, nil
}

// noopAppender silently drops samples and reports success, so that the fanout
// appender (which appends to every secondary) does not fail on the read-only
// blockStorage secondary.
Expand Down Expand Up @@ -2185,7 +2223,7 @@ func (p *rwProtoMsgFlagParser) Set(opt string) error {
return nil
}

func readPromPPFeatures(logger log.Logger) {
func readPromPPFeatures(logger log.Logger, cfg *flagConfig) {
features := os.Getenv("PROMPP_FEATURES")
if features == "" {
return
Expand Down Expand Up @@ -2313,6 +2351,12 @@ func readPromPPFeatures(logger log.Logger) {
case "shrink_shard_copier":
pp_storage.ShrinkShardCopier = true
_ = level.Info(logger).Log("msg", "[FEATURE] Shrink shard copier is enabled.")

case "enable_block_manager":
if cfg != nil {
cfg.UseBlockManagerStorage = true
}
_ = level.Info(logger).Log("msg", "[FEATURE] Block-manager historical storage is enabled.")
}
}
}
Loading
Loading