Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 240 additions & 63 deletions agent/runner/jobs/pbm_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,18 @@ const (
resyncTimeout = 5 * time.Minute
statusCheckInterval = 5 * time.Second
maxRestoreChecks = 100

maxDescribeCommandRetries = 5
// PBM waits up to ~33s for backup metadata; allow extra margin for describe-backup.
pbmDescribeStartupGrace = 60 * time.Second

pbmCmdBackup = "backup"
pbmCmdRestore = "restore"

pbmStatusDone = "done"
pbmStatusCanceled = "canceled"
pbmStatusError = "error"
pbmStatusPartlyDone = "partlyDone"
)

type pbmSeverity int
Expand All @@ -48,6 +60,8 @@ type describeInfo struct {
type replSet struct {
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
Node string `json:"node,omitempty"`
Nodes []node `json:"nodes"`
}

Expand Down Expand Up @@ -254,44 +268,214 @@ func getPBMStatus(ctx context.Context, dsn string) (*pbmStatus, error) {
return &status, nil
}

type pbmDescribePollConfig struct {
l logrus.FieldLogger
dsn string
operation string
targetName string
startedAt time.Time
describeRetries *int
fetchDescribe func(context.Context) (describeInfo, error)
fetchStatus func(context.Context, string) (*pbmStatus, error)
isRunning func(*pbmStatus) bool
findSnapshot func(*pbmStatus) *pbmSnapshot
}

// pbmDescribePollInterval is used by waitForPBMDescribe; tests may override it.
var pbmDescribePollInterval = statusCheckInterval

// pbmStatusFetcher fetches PBM status; tests may override it.
var pbmStatusFetcher = getPBMStatus

func waitForPBMBackup(ctx context.Context, l logrus.FieldLogger, dsn string, name string) error {
l.Infof("waiting for pbm backup: %s", name)
ticker := time.NewTicker(statusCheckInterval)
defer ticker.Stop()

retryCount := 500
describeRetries := maxDescribeCommandRetries
return waitForPBMDescribe(ctx, pbmDescribePollConfig{
l: l,
dsn: dsn,
operation: pbmCmdBackup,
targetName: name,
startedAt: time.Now(),
describeRetries: &describeRetries,
fetchDescribe: func(ctx context.Context) (describeInfo, error) {
var info describeInfo
err := execPBMCommand(ctx, dsn, &info, "describe-backup", name)
return info, err
},
})
}

func waitForPBMDescribe(ctx context.Context, cfg pbmDescribePollConfig) error {
ticker := time.NewTicker(pbmDescribePollInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
var info describeInfo
err := execPBMCommand(ctx, dsn, &info, "describe-backup", name)
done, err := pollPBMDescribeOnce(ctx, cfg)
if err != nil {
// for the first couple of seconds after backup process starts describe-backup command may return this error
if (strings.HasSuffix(err.Error(), "no such file") ||
strings.HasSuffix(err.Error(), "file is empty")) && retryCount > 0 {
retryCount--
continue
}

return errors.Wrap(err, "failed to get backup status")
return err
}

switch info.Status {
case "done":
if done {
return nil
case "canceled":
return errors.New("backup was canceled")
case "error":
return errors.New(info.Error)
}

case <-ctx.Done():
return ctx.Err()
}
}
}

func pollPBMDescribeOnce(ctx context.Context, cfg pbmDescribePollConfig) (bool, error) {
info, describeErr := cfg.fetchDescribe(ctx)
if describeErr == nil {
*cfg.describeRetries = maxDescribeCommandRetries
return describeTerminalError(info, cfg.operation)
}

fetchStatus := cfg.fetchStatus
if fetchStatus == nil {
fetchStatus = pbmStatusFetcher
}

status, statusErr := fetchStatus(ctx, cfg.dsn)
if statusErr != nil {
return false, errors.Wrap(statusErr, "failed to get pbm status")
}

if cfg.operationIsRunning(status) {
if shouldRetryDescribeFailure(describeErr, cfg.startedAt) {
cfg.l.Debugf("describe-%s transient error while %s is running: %s", cfg.operation, cfg.operation, describeErr)
return false, nil
}
if retryDescribeCommand(cfg, describeErr) {
return false, nil
}
}

if snapshot := cfg.snapshotForTarget(status); snapshot != nil {
done, err := snapshotTerminalError(snapshot, cfg.operation)
if done {
return true, err
}
}

if shouldRetryDescribeFailure(describeErr, cfg.startedAt) {
return false, nil
}
if retryDescribeCommand(cfg, describeErr) {
return false, nil
}

return false, errors.Wrapf(describeErr, "failed to get %s status", cfg.operation)
}

func shouldRetryDescribeFailure(err error, startedAt time.Time) bool {
return isTransientPBMDescribeError(err) && time.Since(startedAt) < pbmDescribeStartupGrace
}

func retryDescribeCommand(cfg pbmDescribePollConfig, err error) bool {
if *cfg.describeRetries <= 0 {
return false
}
*cfg.describeRetries--
cfg.l.Warnf("describe-%s failed and will retry: %s", cfg.operation, err)
return true
}

func isTransientPBMDescribeError(err error) bool {
if err == nil {
return false
}
msg := strings.ToLower(err.Error())
return strings.Contains(msg, "no such file") ||
strings.Contains(msg, "file is empty") ||
strings.Contains(msg, "not found") ||
strings.Contains(msg, "get backup meta") ||
strings.Contains(msg, "get snapshot size") ||
strings.Contains(msg, "missed file")
}

func (cfg pbmDescribePollConfig) operationIsRunning(status *pbmStatus) bool {
if cfg.isRunning != nil {
return cfg.isRunning(status)
}
switch cfg.operation {
case pbmCmdBackup:
return isPBMBackupRunning(status, cfg.targetName)
case pbmCmdRestore:
return isPBMRestoreRunning(status)
default:
return false
}
}

func (cfg pbmDescribePollConfig) snapshotForTarget(status *pbmStatus) *pbmSnapshot {
if cfg.findSnapshot != nil {
return cfg.findSnapshot(status)
}
if cfg.operation == pbmCmdBackup {
return findPBMSnapshot(status, cfg.targetName)
}
return nil
}

func isPBMBackupRunning(status *pbmStatus, name string) bool {
return status.Running.Type == pbmCmdBackup && status.Running.Name == name
}

func isPBMRestoreRunning(status *pbmStatus) bool {
return status.Running.Type == pbmCmdRestore
}

func findPBMSnapshot(status *pbmStatus, name string) *pbmSnapshot {
for i := range status.Backups.Snapshot {
if status.Backups.Snapshot[i].Name == name {
return &status.Backups.Snapshot[i]
}
}
return nil
}

func describeTerminalError(info describeInfo, operation string) (bool, error) {
switch info.Status {
case pbmStatusDone:
return true, nil
case pbmStatusCanceled:
return true, errors.Errorf("%s was canceled", operation)
case pbmStatusError:
return true, describeFailureError(info, operation)
case pbmStatusPartlyDone:
return true, groupDescribeErrors(info)
default:
return false, nil
}
}

func snapshotTerminalError(snapshot *pbmSnapshot, operation string) (bool, error) {
switch snapshot.Status {
case pbmStatusDone:
return true, nil
case pbmStatusCanceled:
return true, errors.Errorf("%s was canceled", operation)
case pbmStatusError:
if snapshot.Error != "" {
return true, errors.New(snapshot.Error)
}
return true, errors.Errorf("%s failed", operation)
default:
return false, nil
}
}

func describeFailureError(info describeInfo, operation string) error {
err := groupDescribeErrors(info)
if err != nil && err.Error() != "operation failed" {
return err
}
return errors.Errorf("%s failed", operation)
}

func findPITRRestore(list []pbmListRestore, restoreInfoPITRTime int64, startedAt time.Time) *pbmListRestore {
for _, v := range slices.Backward(list) {
// TODO when PITR restore invoked with wrong timestamp pbm marks this restore operation as "snapshot" type.
Expand Down Expand Up @@ -365,47 +549,25 @@ func waitForPBMRestore(ctx context.Context, l logrus.FieldLogger, dsn string, re

l.Infof("waiting for pbm restore: %s", name)

ticker := time.NewTicker(statusCheckInterval)
defer ticker.Stop()

maxRetryCount := 5
for {
select {
case <-ticker.C:
describeRetries := maxDescribeCommandRetries
return waitForPBMDescribe(ctx, pbmDescribePollConfig{
l: l,
dsn: dsn,
operation: pbmCmdRestore,
targetName: name,
startedAt: time.Now(),
describeRetries: &describeRetries,
fetchDescribe: func(ctx context.Context) (describeInfo, error) {
var info describeInfo
var describeErr error
if backupType == "physical" {
err = execPBMCommand(ctx, dsn, &info, "describe-restore", name, "--config="+confFile)
describeErr = execPBMCommand(ctx, dsn, &info, "describe-restore", name, "--config="+confFile)
} else {
err = execPBMCommand(ctx, dsn, &info, "describe-restore", name)
describeErr = execPBMCommand(ctx, dsn, &info, "describe-restore", name)
}
if err != nil {
if maxRetryCount > 0 {
maxRetryCount--
l.Warnf("PMM failed to get backup restore status and will retry: %s", err)
continue
} else { //nolint:revive
return errors.Wrap(err, "failed to get restore status")
}
}
// reset maxRetryCount if we were able to successfully get the current restore status
maxRetryCount = 5

switch info.Status {
case "done":
return nil
case "canceled":
return errors.New("restore was canceled")
case "error":
return errors.New(info.Error)
// We consider partlyDone as an error because we cannot automatically recover cluster from this status to fully working.
case "partlyDone":
return groupPartlyDoneErrors(info)
}

case <-ctx.Done():
return ctx.Err()
}
}
return info, describeErr
},
})
}

func pbmConfigure(ctx context.Context, l logrus.FieldLogger, params pbmConfigParams) error {
Expand Down Expand Up @@ -538,21 +700,36 @@ func createPBMConfig(locationConfig *BackupLocationConfig, prefix string, pitr b
return conf, nil
}

func groupPartlyDoneErrors(info describeInfo) error {
func groupDescribeErrors(info describeInfo) error {
var errMsgs []string

if info.Error != "" {
errMsgs = append(errMsgs, info.Error)
}

for _, rs := range info.ReplSets {
if rs.Status == "partlyDone" {
for _, node := range rs.Nodes {
if node.Status == "error" {
errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, node: %s, error: %s", rs.Name, node.Name, node.Error))
if rs.Error != "" {
errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, error: %s", rs.Name, rs.Error))
}
if rs.Status == pbmStatusPartlyDone {
for _, n := range rs.Nodes {
if n.Status == pbmStatusError {
errMsgs = append(errMsgs, fmt.Sprintf("replset: %s, node: %s, error: %s", rs.Name, n.Name, n.Error))
}
}
}
}

if len(errMsgs) == 0 {
return errors.New("operation failed")
}
return errors.New(strings.Join(errMsgs, "; "))
}

func groupPartlyDoneErrors(info describeInfo) error {
return groupDescribeErrors(info)
}

// pbmGetSnapshotTimestamp returns time the backup restores target db to.
func pbmGetSnapshotTimestamp(ctx context.Context, l logrus.FieldLogger, dsn string, backupName string) (*time.Time, error) {
snapshots, err := getSnapshots(ctx, l, dsn)
Expand Down
Loading
Loading