Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 58 additions & 6 deletions api/v1alpha1/workerdeployment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,22 @@ type WorkerDeploymentSpec struct {
// How to rollout new workflow executions to the target version.
RolloutStrategy RolloutStrategy `json:"rollout"`

// How to rollback to a previous version. If not specified, defaults to AllAtOnce strategy.
//
// A rollback is triggered automatically when the target version's pod spec is updated and
// the resulting build ID has previously been set as the default (current) version of the
// worker deployment. The controller detects this by checking whether Temporal recorded a
// non-nil LastCurrentTime for that build ID.
//
// The rollback strategy controls routing of NEW workflow executions only. Workflows already
// running are pinned to the version they started on and continue executing there; they are
// not affected by the rollback. Only new workflow executions will be routed to the rollback
// target version.
Comment on lines +95 to +96

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// not affected by the rollback. Only new workflow executions will be routed to the rollback
// target version.
// not affected by the rollback. Only new workflow executions that are not pinned
// to a specific build ID will be routed to the rollback target version.

//
// Rollback is suppressed when the rollout strategy is Manual.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm just curious, but why is rollback suppressed when the rollout stategy is Manual?

// +optional
RollbackStrategy *RollbackStrategy `json:"rollback,omitempty"`
Comment thread
eniko-dif marked this conversation as resolved.

// How to manage sunsetting drained versions.
SunsetStrategy SunsetStrategy `json:"sunset"`

Expand Down Expand Up @@ -338,17 +354,17 @@ type DeprecatedWorkerDeploymentVersion struct {
EligibleForDeletion bool `json:"eligibleForDeletion,omitempty"`
}

// DefaultVersionUpdateStrategy describes how to cut over new workflow executions
// VersionRolloutStrategy describes how to cut over new workflow executions
// to the target worker deployment version.
// +kubebuilder:validation:Enum=Manual;AllAtOnce;Progressive
type DefaultVersionUpdateStrategy string
type VersionRolloutStrategy string

const (
// UpdateManual scales worker resources up or down, but does not update the current or ramping worker deployment version.
UpdateManual DefaultVersionUpdateStrategy = "Manual"
UpdateManual VersionRolloutStrategy = "Manual"

// UpdateAllAtOnce starts 100% of new workflow executions on the new worker deployment version as soon as it's healthy.
UpdateAllAtOnce DefaultVersionUpdateStrategy = "AllAtOnce"
UpdateAllAtOnce VersionRolloutStrategy = "AllAtOnce"

// UpdateProgressive ramps up the percentage of new workflow executions targeting the new worker deployment version over time.
//
Expand All @@ -358,7 +374,19 @@ const (
// Sending a percentage of traffic to a "nil" version means that traffic will be sent to unversioned workers. If
// there are no unversioned workers, those tasks will get stuck. This behavior ensures that all traffic on the task
// queues in this worker deployment can be handled by an active poller.
UpdateProgressive DefaultVersionUpdateStrategy = "Progressive"
UpdateProgressive VersionRolloutStrategy = "Progressive"
)

// VersionRollbackStrategy describes how to cut over during rollback to a previous version.
// +kubebuilder:validation:Enum=AllAtOnce;Progressive
type VersionRollbackStrategy string

const (
// RollbackAllAtOnce immediately switches 100% of traffic back to the previous version.
RollbackAllAtOnce VersionRollbackStrategy = "AllAtOnce"

// RollbackProgressive gradually ramps traffic back to the previous version.
RollbackProgressive VersionRollbackStrategy = "Progressive"
)

type GateWorkflowConfig struct {
Expand Down Expand Up @@ -393,7 +421,7 @@ type RolloutStrategy struct {
// - "Manual"
// - "AllAtOnce"
// - "Progressive"
Strategy DefaultVersionUpdateStrategy `json:"strategy"`
Strategy VersionRolloutStrategy `json:"strategy"`

// Gate specifies a workflow type that must run once to completion on the new worker deployment version before
// any traffic is directed to the new version.
Expand All @@ -405,6 +433,30 @@ type RolloutStrategy struct {
Steps []RolloutStep `json:"steps,omitempty" protobuf:"bytes,3,rep,name=steps"`
}

// RollbackStrategy defines strategy to apply when rolling back to a previous version.
// This is separate from RolloutStrategy because rollbacks have different requirements:
// - No gate workflow (already trusted version)
// - No manual mode (rollbacks should be automatic)
// - Default to AllAtOnce for fast recovery
// The rollback strategy is ignored if the rollout strategy is set to Manual and
// rollback is disabled if the MaxVersionAge is set to 0.
type RollbackStrategy struct {
Comment thread
eniko-dif marked this conversation as resolved.
// Strategy for rollback. Valid values are "AllAtOnce" or "Progressive".
// Defaults to "AllAtOnce" for fast recovery.
Strategy VersionRollbackStrategy `json:"strategy"`

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there really a valid reason or use case for having a rollback strategy not match the rollout strategy? Why not just have the rollback strategy always be the same as rollout?


// Steps to execute progressive rollbacks. Only required when strategy is "Progressive".
// +optional
Steps []RolloutStep `json:"steps,omitempty"`
Comment thread
eniko-dif marked this conversation as resolved.

// MaxVersionAge limits which versions are eligible as rollback targets.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a suspicion this is overly complicated configuration for most users. Would a simpler "just roll back to the previous working (i.e. passes gates) version" behaviour be easier for users to grok? Where the controller simply would attempt rolling back to N-1, then N-2, etc... until landing on a working version?

// A version is only considered a rollback target if it was last current within this duration.
// Defaults to 1 hour if unset.
// Set to 0 to disable rollback entirely, or to a large value to allow rollback regardless of age.
// +optional
MaxVersionAge *metav1.Duration `json:"maxVersionAge,omitempty"`
}

// SunsetStrategy defines strategy to apply when sunsetting k8s deployments of drained versions.
type SunsetStrategy struct {
// ScaledownDelay specifies how long to wait after a version is drained before scaling its Deployment to zero.
Expand Down
101 changes: 91 additions & 10 deletions api/v1alpha1/workerdeployment_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package v1alpha1
import (
"context"
"fmt"
"time"

"github.com/temporalio/temporal-worker-controller/internal/defaults"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -53,9 +54,29 @@ func (s *WorkerDeploymentSpec) Default(ctx context.Context) error {
s.SunsetStrategy.DeleteDelay = &v1.Duration{Duration: defaults.DeleteDelay}
}

rb := RollbackStrategyWithDefaults(s.RollbackStrategy)
s.RollbackStrategy = &rb
return nil
}

// RollbackStrategyWithDefaults returns s with any unset fields filled in with their defaults.
func RollbackStrategyWithDefaults(s *RollbackStrategy) RollbackStrategy {
if s == nil {
return RollbackStrategy{
Strategy: RollbackAllAtOnce,
MaxVersionAge: &v1.Duration{Duration: defaults.RollbackMaxVersionAge},
}
}
result := *s
if result.Strategy == "" {
result.Strategy = RollbackAllAtOnce
}
if result.MaxVersionAge == nil {
result.MaxVersionAge = &v1.Duration{Duration: defaults.RollbackMaxVersionAge}
}
return result
}

// ValidateCreate implements webhook.CustomValidator so a webhook will be registered for the type
func (r *WorkerDeployment) ValidateCreate(ctx context.Context, obj runtime.Object) (admission.Warnings, error) {
return r.validateForUpdateOrCreate(ctx, obj)
Expand All @@ -81,10 +102,17 @@ func (r *WorkerDeployment) validateForUpdateOrCreate(ctx context.Context, obj ru
}

func validateForUpdateOrCreate(old, new *WorkerDeployment) (admission.Warnings, error) {
allErrs := validateRolloutStrategy(new.Spec.RolloutStrategy)
var allErrs field.ErrorList
allErrs = append(allErrs, validateRolloutStrategy(new.Spec.RolloutStrategy)...)
if new.Spec.RollbackStrategy != nil {
allErrs = append(allErrs, validateRollbackStrategy(*new.Spec.RollbackStrategy)...)
}
if len(allErrs) > 0 {
return nil, newInvalidErr(new, allErrs)
}
if new.Spec.RollbackStrategy != nil {
return warnRollbackSlowerThanRollout(new.Spec.RolloutStrategy, *new.Spec.RollbackStrategy), nil
}
return nil, nil
}

Expand All @@ -96,15 +124,7 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
var allErrs []*field.Error

if s.Strategy == UpdateProgressive {
var lastRamp int
for i, step := range s.Steps {
if step.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("spec.rollout.steps[%d].rampPercentage", i)), step.RampPercentage, "rampPercentage must increase between each step"),
)
}
lastRamp = step.RampPercentage
}
allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollout.steps", s.Steps)...)
}

if s.Gate != nil && s.Gate.Input != nil && s.Gate.InputFrom != nil {
Expand All @@ -117,6 +137,67 @@ func validateRolloutStrategy(s RolloutStrategy) []*field.Error {
return allErrs
}

func validateRollbackStrategy(s RollbackStrategy) []*field.Error {
var allErrs []*field.Error
if s.Strategy == RollbackProgressive {
allErrs = append(allErrs, validateProgressiveStrategySteps("spec.rollback.steps", s.Steps)...)
}
return allErrs
}

func warnRollbackSlowerThanRollout(rollout RolloutStrategy, rollback RollbackStrategy) admission.Warnings {
switch rollout.Strategy {
case UpdateAllAtOnce:
if rollback.Strategy != RollbackAllAtOnce {
return admission.Warnings{"rollback strategy is slower than rollout: rollout is AllAtOnce, but rollback is Progressive — is that intended?"}
}
case UpdateProgressive:
if rollback.Strategy == RollbackProgressive {
var rolloutTotal, rollbackTotal time.Duration
for _, s := range rollout.Steps {
rolloutTotal += s.PauseDuration.Duration
}
for _, s := range rollback.Steps {
rollbackTotal += s.PauseDuration.Duration
}
if rollbackTotal > rolloutTotal {
return admission.Warnings{fmt.Sprintf(
"rollback strategy is slower than rollout: progressive rollback total duration (%s) exceeds progressive rollout total duration (%s) — is that intended?",
rollbackTotal, rolloutTotal,
)}
}
}
}
return nil
}
Comment on lines +148 to +172

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above... I suspect there's not really a valid use case for having a rollback strategy not match the rollout strategy... if we can just say the rollback == rollout, we can simplify things quite a bit and remove the need for these warnings.


func validateProgressiveStrategySteps(specName string, steps []RolloutStep) []*field.Error {
var allErrs []*field.Error

if len(steps) == 0 {
allErrs = append(allErrs,
field.Invalid(field.NewPath(specName), steps, "steps are required for Progressive strategy"),
)
}

var lastRamp int
for i, step := range steps {
if step.PauseDuration.Duration < 30*time.Second {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].pauseDuration", specName, i)), step.PauseDuration.Duration.String(), "pause duration must be at least 30s"),
)
}
if step.RampPercentage <= lastRamp {
allErrs = append(allErrs,
field.Invalid(field.NewPath(fmt.Sprintf("%s[%d].rampPercentage", specName, i)), step.RampPercentage, "rampPercentage must increase between each step"),
)
}
lastRamp = step.RampPercentage
}

return allErrs
}

func newInvalidErr(dep *WorkerDeployment, errs field.ErrorList) *apierrors.StatusError {
return apierrors.NewInvalid(dep.GroupVersionKind().GroupKind(), dep.GetName(), errs)
}
Loading