diff --git a/api/v1alpha1/dataprocess_types.go b/api/v1alpha1/dataprocess_types.go index 304a1eb36c0..b4a146bd9f6 100644 --- a/api/v1alpha1/dataprocess_types.go +++ b/api/v1alpha1/dataprocess_types.go @@ -111,6 +111,17 @@ type DataProcessSpec struct { // TTLSecondsAfterFinished is the time second to clean up data operations after finished or failed // +optional TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"` + + //+kubebuilder:default:=Once + //+kubebuilder:validation:Enum=Once;Cron;OnEvent + // Policy defines the operation policy, including Once, Cron, OnEvent + // +optional + Policy Policy `json:"policy,omitempty"` + + // Schedule defines the Cron schedule, only used when Policy is Cron. + // See https://en.wikipedia.org/wiki/Cron. + // +optional + Schedule string `json:"schedule,omitempty"` } // +kubebuilder:printcolumn:name="Dataset",type="string",JSONPath=`.spec.dataset.name` diff --git a/api/v1alpha1/openapi_generated.go b/api/v1alpha1/openapi_generated.go index e836541ec22..dd55f22032c 100644 --- a/api/v1alpha1/openapi_generated.go +++ b/api/v1alpha1/openapi_generated.go @@ -2902,6 +2902,20 @@ func schema_fluid_cloudnative_fluid_api_v1alpha1_DataProcessSpec(ref common.Refe Format: "int32", }, }, + "policy": { + SchemaProps: spec.SchemaProps{ + Description: "Policy defines the operation policy, including Once, Cron, OnEvent", + Type: []string{"string"}, + Format: "", + }, + }, + "schedule": { + SchemaProps: spec.SchemaProps{ + Description: "Schedule defines the Cron schedule, only used when Policy is Cron. See https://en.wikipedia.org/wiki/Cron.", + Type: []string{"string"}, + Format: "", + }, + }, }, Required: []string{"dataset", "processor"}, }, diff --git a/charts/fluid-dataprocess/common/templates/cronjob.yaml b/charts/fluid-dataprocess/common/templates/cronjob.yaml new file mode 100644 index 00000000000..25de2f71386 --- /dev/null +++ b/charts/fluid-dataprocess/common/templates/cronjob.yaml @@ -0,0 +1,91 @@ +{{- if eq (lower .Values.dataProcess.policy) "cron" }} +apiVersion: {{ ternary "batch/v1" "batch/v1beta1" (.Capabilities.APIVersions.Has "batch/v1/CronJob") }} +kind: CronJob +metadata: + name: {{ printf "%s-job" .Release.Name }} + labels: + release: {{ .Release.Name }} + role: dataprocess-cronjob + app: fluid-dataprocess + targetDataset: {{ required "targetDataset should be set" .Values.dataProcess.targetDataset }} + fluid.io/jobPolicy: cron + {{- include "library.fluid.labels" . | nindent 4 }} + ownerReferences: + {{- if .Values.owner.enabled }} + - apiVersion: {{ .Values.owner.apiVersion }} + blockOwnerDeletion: {{ .Values.owner.blockOwnerDeletion }} + controller: {{ .Values.owner.controller }} + kind: {{ .Values.owner.kind }} + name: {{ .Values.owner.name }} + uid: {{ .Values.owner.uid }} + {{- end }} +spec: + schedule: "{{ .Values.dataProcess.schedule }}" + jobTemplate: + spec: + backoffLimit: 3 + completions: 1 + parallelism: 1 + template: + metadata: + name: {{ printf "%s-process" .Release.Name }} + annotations: + sidecar.istio.io/inject: "false" + {{- if .Values.dataProcess.annotations }} + {{ toYaml .Values.dataProcess.annotations | nindent 12 }} + {{- end }} + labels: + release: {{ .Release.Name }} + role: dataprocess-pod + app: fluid-dataprocess + cronjob: {{ printf "%s-job" .Release.Name }} + targetDataset: {{ required "targetDataset should be set" .Values.dataProcess.targetDataset }} + {{- include "library.fluid.labels" . | nindent 12 }} + {{- if .Values.dataProcess.labels }} + {{ toYaml .Values.dataProcess.labels | nindent 12 }} + {{- end }} + spec: + {{- if .Values.dataProcess.serviceAccountName }} + serviceAccountName: {{ .Values.dataProcess.serviceAccountName | quote }} + {{- end }} + {{- if .Values.dataProcess.jobProcessor.podSpec }} +{{- toYaml .Values.dataProcess.jobProcessor.podSpec | nindent 10 }} + {{- else if .Values.dataProcess.scriptProcessor }} + restartPolicy: {{ .Values.dataProcess.scriptProcessor.restartPolicy | default "Never" | quote }} + containers: + - name: script-processor + image: {{ required "DataProcess image should be set" .Values.dataProcess.scriptProcessor.image }} + imagePullPolicy: {{ .Values.dataProcess.scriptProcessor.imagePullPolicy }} + {{- if .Values.dataProcess.scriptProcessor.command }} + command: + {{ toYaml .Values.dataProcess.scriptProcessor.command | nindent 14 }} + {{- end }} + args: ["/fluid-scripts/preprocess.sh"] + {{- if .Values.dataProcess.scriptProcessor.resources}} + resources: + {{- toYaml .Values.dataProcess.scriptProcessor.resources | nindent 16}} + {{- end }} + {{- if .Values.dataProcess.scriptProcessor.envs }} + env: + {{ toYaml .Values.dataProcess.scriptProcessor.envs | nindent 14 }} + {{- end }} + volumeMounts: + - name: script-cm-vol + mountPath: /fluid-scripts/preprocess.sh + subPath: preprocess.sh + {{- if .Values.dataProcess.scriptProcessor.volumeMounts }} + {{- toYaml .Values.dataProcess.scriptProcessor.volumeMounts | nindent 16 }} + {{- end }} + {{- if .Values.dataProcess.scriptProcessor.affinity }} + affinity: +{{ toYaml .Values.dataProcess.scriptProcessor.affinity | indent 12 }} + {{- end }} + volumes: + - name: script-cm-vol + configMap: + name: {{ .Release.Name }}-scripts + {{- if .Values.dataProcess.scriptProcessor.volumes }} + {{- toYaml .Values.dataProcess.scriptProcessor.volumes | nindent 12 }} + {{- end }} + {{- end }} +{{- end }} diff --git a/charts/fluid-dataprocess/common/templates/job.yaml b/charts/fluid-dataprocess/common/templates/job.yaml index f2d8d78d0a7..cc8c4b030ea 100644 --- a/charts/fluid-dataprocess/common/templates/job.yaml +++ b/charts/fluid-dataprocess/common/templates/job.yaml @@ -1,3 +1,4 @@ +{{- if or (eq (lower .Values.dataProcess.policy) "") (eq (lower .Values.dataProcess.policy) "once") }} apiVersion: batch/v1 kind: Job metadata: @@ -82,3 +83,4 @@ spec: {{- toYaml .Values.dataProcess.scriptProcessor.volumes | nindent 8 }} {{- end }} {{- end }} +{{- end }} diff --git a/charts/fluid-dataprocess/common/values.yaml b/charts/fluid-dataprocess/common/values.yaml index 057f1d547f7..5e5a6580e31 100644 --- a/charts/fluid-dataprocess/common/values.yaml +++ b/charts/fluid-dataprocess/common/values.yaml @@ -10,6 +10,10 @@ owner: controller: false dataProcess: + # policy for DataProcess: Once, Cron, OnEvent + policy: "" + # schedule for cron policy, see https://en.wikipedia.org/wiki/Cron + schedule: "" targetDataset: "" labels: {} annotations: {} diff --git a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml index 6bdfa2b69d6..e0aa56bef17 100644 --- a/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml +++ b/charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml @@ -53,6 +53,13 @@ spec: - mountPath - name type: object + policy: + default: Once + enum: + - Once + - Cron + - OnEvent + type: string processor: properties: job: @@ -4362,6 +4369,8 @@ spec: - kind - name type: object + schedule: + type: string ttlSecondsAfterFinished: format: int32 type: integer diff --git a/config/crd/bases/data.fluid.io_dataprocesses.yaml b/config/crd/bases/data.fluid.io_dataprocesses.yaml index 6bdfa2b69d6..e0aa56bef17 100644 --- a/config/crd/bases/data.fluid.io_dataprocesses.yaml +++ b/config/crd/bases/data.fluid.io_dataprocesses.yaml @@ -53,6 +53,13 @@ spec: - mountPath - name type: object + policy: + default: Once + enum: + - Once + - Cron + - OnEvent + type: string processor: properties: job: @@ -4362,6 +4369,8 @@ spec: - kind - name type: object + schedule: + type: string ttlSecondsAfterFinished: format: int32 type: integer diff --git a/pkg/common/constants.go b/pkg/common/constants.go index 14dc4e0cf79..81f635cb25e 100644 --- a/pkg/common/constants.go +++ b/pkg/common/constants.go @@ -107,7 +107,8 @@ const ( DataProcessMultipleProcessorSpecified = "MultipleProcessorSpecified" - DataProcessConflictMountPath = "ConflictMountPath" + DataProcessConflictMountPath = "ConflictMountPath" + DataProcessScheduleNotSpecified = "ScheduleNotSpecified" ) type CacheStoreType string diff --git a/pkg/controllers/v1alpha1/dataprocess/implement.go b/pkg/controllers/v1alpha1/dataprocess/implement.go index 24508b3f1f8..da66e32653b 100644 --- a/pkg/controllers/v1alpha1/dataprocess/implement.go +++ b/pkg/controllers/v1alpha1/dataprocess/implement.go @@ -201,6 +201,27 @@ func (r *dataProcessOperation) Validate(ctx runtime.ReconcileRequestContext) ([] }, err } + // DataProcess with Cron policy must specify a non-empty schedule + if dataProcess.Spec.Policy == datav1alpha1.Cron && dataProcess.Spec.Schedule == "" { + r.Recorder.Eventf(dataProcess, + corev1.EventTypeWarning, + common.DataProcessScheduleNotSpecified, + "DataProcess(%s)'s policy is Cron but spec.schedule is not specified", + dataProcess.Name, + ) + err := fmt.Errorf("DataProcess(%s/%s)'s policy is Cron but spec.schedule is not specified", dataProcess.Namespace, dataProcess.Name) + now := time.Now() + return []datav1alpha1.Condition{ + { + Type: common.Failed, + Status: corev1.ConditionTrue, + Reason: common.DataProcessScheduleNotSpecified, + Message: "DataProcess's policy is Cron but spec.schedule is not specified", + LastProbeTime: metav1.NewTime(now), + LastTransitionTime: metav1.NewTime(now), + }, + }, err + } return nil, nil } @@ -219,15 +240,38 @@ func (r *dataProcessOperation) RemoveTargetDatasetStatusInProgress(dataset *data // DataProcess does not need to recover Dataset status after execution. } +// GetStatusHandler implements dataoperation.OperationInterface. +// Unlike DataLoad, which returns nil for an unrecognized policy, this defaults +// to OnceStatusHandler since policy is optional with a kubebuilder default of +// Once, so an empty value should be treated the same as Once. func (r *dataProcessOperation) GetStatusHandler() dataoperation.StatusHandler { - // TODO: Support dataProcess.Spec.Policy - return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess} + policy := r.dataProcess.Spec.Policy + switch policy { + case datav1alpha1.Cron: + return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess} + case datav1alpha1.OnEvent: + return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess} + case datav1alpha1.Once: + fallthrough + default: + return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess} + } } // GetTTL implements dataoperation.OperationInterface. +// GetTTL implements dataoperation.OperationInterface. +// Cron and OnEvent policies are recurring/event-driven operations and should +// not be cleaned up via TTL; only Once (and the default/empty policy, which +// behaves like Once, see GetStatusHandler) uses TTLSecondsAfterFinished. func (r *dataProcessOperation) GetTTL() (ttl *int32, err error) { + dataProcess := r.dataProcess - ttl = r.dataProcess.Spec.TTLSecondsAfterFinished + switch dataProcess.Spec.Policy { + case datav1alpha1.Cron, datav1alpha1.OnEvent: + ttl = nil + default: + ttl = dataProcess.Spec.TTLSecondsAfterFinished + } return } diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler.go b/pkg/controllers/v1alpha1/dataprocess/status_handler.go index d1e2b768ea0..79cc4129523 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler.go @@ -27,6 +27,7 @@ import ( "github.com/fluid-cloudnative/fluid/pkg/utils/kubeclient" "github.com/pkg/errors" batchv1 "k8s.io/api/batch/v1" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -100,3 +101,141 @@ func (handler *OnceStatusHandler) GetOperationStatus(ctx runtime.ReconcileReques return } + +type CronStatusHandler struct { + client.Client + dataProcess *datav1alpha1.DataProcess +} + +var _ dataoperation.StatusHandler = &CronStatusHandler{} + +func (handler *CronStatusHandler) GetOperationStatus(ctx runtime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { + result = opStatus.DeepCopy() + object := handler.dataProcess + + releaseName := utils.GetDataProcessReleaseName(object.GetName()) + cronjobName := utils.GetDataProcessJobName(releaseName) + + cronjobStatus, err := kubeclient.GetCronJobStatus(handler.Client, types.NamespacedName{Namespace: object.GetNamespace(), Name: cronjobName}) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related CronJob missing, will delete helm chart and retry", "namespace", ctx.Namespace, "cronjobName", cronjobName) + if err = helm.DeleteReleaseIfExists(releaseName, ctx.Namespace); err != nil { + ctx.Log.Error(err, "failed to delete dataprocess helm release", "namespace", ctx.Namespace, "releaseName", releaseName) + return + } + return + } + ctx.Log.Error(err, "can't get dataprocess cronjob", "namespace", ctx.Namespace, "cronjobName", cronjobName) + return + } + + if cronjobStatus.LastScheduleTime == nil { + ctx.Log.Info("CronJob has not been scheduled yet", "namespace", ctx.Namespace, "cronjobName", cronjobName) + return + } + + result.LastScheduleTime = cronjobStatus.LastScheduleTime + result.LastSuccessfulTime = cronjobStatus.LastSuccessfulTime + + jobs, err := utils.ListDataOperationJobByCronjob(handler.Client, types.NamespacedName{Namespace: object.GetNamespace(), Name: cronjobName}) + if err != nil { + ctx.Log.Error(err, "can't list dataprocess jobs by cronjob", "namespace", ctx.Namespace, "cronjobName", cronjobName) + return + } + + var currentJob *batchv1.Job + for _, job := range jobs { + if job.CreationTimestamp == *cronjobStatus.LastScheduleTime || job.CreationTimestamp.After(cronjobStatus.LastScheduleTime.Time) { + currentJob = &job + break + } + } + if currentJob == nil { + ctx.Log.Info("can't get newest job by cronjob, skip", "namespace", ctx.Namespace, "cronjobName", cronjobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(currentJob) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataProcess job still running", "namespace", ctx.Namespace, "cronjobName", cronjobName) + if opStatus.Phase == common.PhaseComplete || opStatus.Phase == common.PhaseFailed { + result.Phase = common.PhasePending + result.Duration = "-" + } + return + } + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if finishedJobCondition.Type == batchv1.JobFailed { + result.Phase = common.PhaseFailed + } else { + result.Phase = common.PhaseComplete + } + result.Duration = utils.CalculateDuration(currentJob.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return +} + +type OnEventStatusHandler struct { + client.Client + dataProcess *datav1alpha1.DataProcess +} + +var _ dataoperation.StatusHandler = &OnEventStatusHandler{} + +func (handler *OnEventStatusHandler) GetOperationStatus(ctx runtime.ReconcileRequestContext, opStatus *datav1alpha1.OperationStatus) (result *datav1alpha1.OperationStatus, err error) { + result = opStatus.DeepCopy() + object := handler.dataProcess + + releaseName := utils.GetDataProcessReleaseName(object.GetName()) + jobName := utils.GetDataProcessJobName(releaseName) + + ctx.Log.V(1).Info("DataProcess chart already existed, check its running status") + job, err := kubeclient.GetJob(handler.Client, jobName, object.GetNamespace()) + if err != nil { + if utils.IgnoreNotFound(err) == nil { + ctx.Log.Info("Related job missing, will delete helm chart and retry", "namespace", object.GetNamespace(), "jobName", jobName) + if err = helm.DeleteReleaseIfExists(releaseName, object.GetNamespace()); err != nil { + ctx.Log.Error(err, "failed to delete dataprocess helm release", "namespace", object.GetNamespace(), "releaseName", releaseName) + return + } + return + } + ctx.Log.Error(err, "can't get dataprocess job", "namespace", object.GetNamespace(), "jobName", jobName) + return + } + + finishedJobCondition := kubeclient.GetFinishedJobCondition(job) + if finishedJobCondition == nil { + ctx.Log.V(1).Info("DataProcess job still running", "namespace", object.GetNamespace(), "jobName", jobName) + return + } + isJobSucceed := finishedJobCondition.Type == batchv1.JobComplete + + result.Conditions = []datav1alpha1.Condition{ + { + Type: common.ConditionType(finishedJobCondition.Type), + Status: finishedJobCondition.Status, + Reason: finishedJobCondition.Reason, + Message: finishedJobCondition.Message, + LastProbeTime: finishedJobCondition.LastProbeTime, + LastTransitionTime: finishedJobCondition.LastTransitionTime, + }, + } + if isJobSucceed { + result.Phase = common.PhaseComplete + } else { + result.Phase = common.PhaseFailed + } + result.Duration = utils.CalculateDuration(job.CreationTimestamp.Time, finishedJobCondition.LastTransitionTime.Time) + return +} diff --git a/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go b/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go index 1a752331860..74781f1cc3e 100644 --- a/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go +++ b/pkg/controllers/v1alpha1/dataprocess/status_handler_test.go @@ -20,9 +20,12 @@ import ( "testing" "time" + "github.com/agiledragon/gomonkey/v2" "github.com/fluid-cloudnative/fluid/api/v1alpha1" "github.com/fluid-cloudnative/fluid/pkg/common" cruntime "github.com/fluid-cloudnative/fluid/pkg/runtime" + "github.com/fluid-cloudnative/fluid/pkg/utils" + "github.com/fluid-cloudnative/fluid/pkg/utils/compatibility" "github.com/fluid-cloudnative/fluid/pkg/utils/fake" batchv1 "k8s.io/api/batch/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -111,3 +114,314 @@ func TestOnceGetOperationStatus(t *testing.T) { } } } + +func TestOnEventGetOperationStatus(t *testing.T) { + testScheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(testScheme) + _ = batchv1.AddToScheme(testScheme) + + mockDataProcess := v1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1alpha1.DataProcessSpec{ + Policy: v1alpha1.OnEvent, + }, + } + + mockJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, + }, + } + + mockFailedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + LastProbeTime: v1.NewTime(time.Now()), + LastTransitionTime: v1.NewTime(time.Now()), + }, + }, + }, + } + + testcases := []struct { + name string + job batchv1.Job + expectedPhase common.Phase + }{ + { + name: "job success", + job: mockJob, + expectedPhase: common.PhaseComplete, + }, + { + name: "job failed", + job: mockFailedJob, + expectedPhase: common.PhaseFailed, + }, + } + + for _, testcase := range testcases { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &testcase.job) + handler := &OnEventStatusHandler{Client: client, dataProcess: &mockDataProcess} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + opStatus, err := handler.GetOperationStatus(ctx, &mockDataProcess.Status) + if err != nil { + t.Errorf("fail to GetOperationStatus with error %v", err) + } + if opStatus.Phase != testcase.expectedPhase { + t.Error("Failed to GetOperationStatus", "expected phase", testcase.expectedPhase, "get", opStatus.Phase) + } + } +} + +func TestOnEventGetOperationStatusJobStillRunning(t *testing.T) { + testScheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(testScheme) + _ = batchv1.AddToScheme(testScheme) + + mockDataProcess := v1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1alpha1.DataProcessSpec{ + Policy: v1alpha1.OnEvent, + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhasePending, + }, + } + + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: "test-processor-job", + Namespace: "default", + }, + Status: batchv1.JobStatus{}, + } + + client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &runningJob) + handler := &OnEventStatusHandler{Client: client, dataProcess: &mockDataProcess} + ctx := cruntime.ReconcileRequestContext{ + NamespacedName: types.NamespacedName{ + Namespace: "default", + Name: "", + }, + Log: fake.NullLogger(), + } + opStatus, err := handler.GetOperationStatus(ctx, &mockDataProcess.Status) + if err != nil { + t.Errorf("fail to GetOperationStatus with error %v", err) + } + if opStatus.Phase != common.PhasePending { + t.Error("Failed to GetOperationStatus", "expected phase", common.PhasePending, "get", opStatus.Phase) + } +} + +func TestCronGetOperationStatus(t *testing.T) { + testScheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(testScheme) + _ = batchv1.AddToScheme(testScheme) + + patch := gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool { + return true + }) + defer patch.Reset() + + startTime := time.Date(2023, 8, 1, 12, 0, 0, 0, time.Local) + lastScheduleTime := v1.NewTime(startTime) + lastSuccessfulTime := v1.NewTime(startTime.Add(time.Second * 10)) + + mockDataProcess := v1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1alpha1.DataProcessSpec{ + Policy: v1alpha1.Cron, + Schedule: "* * * * *", + }, + Status: v1alpha1.OperationStatus{ + Phase: common.PhaseComplete, + }, + } + + releaseName := utils.GetDataProcessReleaseName(mockDataProcess.GetName()) + cronjobName := utils.GetDataProcessJobName(releaseName) + + mockCronJob := batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: cronjobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * *", + }, + Status: batchv1.CronJobStatus{ + LastScheduleTime: &lastScheduleTime, + LastSuccessfulTime: &lastSuccessfulTime, + }, + } + + mockJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: cronjobName + "-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": cronjobName, + }, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobComplete, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, + } + + mockFailedJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: cronjobName + "-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": cronjobName, + }, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{ + Conditions: []batchv1.JobCondition{ + { + Type: batchv1.JobFailed, + LastProbeTime: lastSuccessfulTime, + LastTransitionTime: lastSuccessfulTime, + }, + }, + }, + } + + runningJob := batchv1.Job{ + ObjectMeta: v1.ObjectMeta{ + Name: cronjobName + "-1", + Namespace: "default", + Labels: map[string]string{ + "cronjob": cronjobName, + }, + CreationTimestamp: lastScheduleTime, + }, + Status: batchv1.JobStatus{}, + } + + testcases := []struct { + name string + job *batchv1.Job + expectedPhase common.Phase + }{ + { + name: "job success yields PhaseComplete", + job: &mockJob, + expectedPhase: common.PhaseComplete, + }, + { + name: "job failed yields PhaseFailed", + job: &mockFailedJob, + expectedPhase: common.PhaseFailed, + }, + { + name: "job still running yields PhasePending", + job: &runningJob, + expectedPhase: common.PhasePending, + }, + } + + for _, testcase := range testcases { + client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &mockCronJob, testcase.job) + handler := &CronStatusHandler{Client: client, dataProcess: &mockDataProcess} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataProcess.Status) + if err != nil { + t.Errorf("%s: fail to GetOperationStatus with error %v", testcase.name, err) + } + if opStatus.Phase != testcase.expectedPhase { + t.Errorf("%s: expected phase %s, got %s", testcase.name, testcase.expectedPhase, opStatus.Phase) + } + } +} + +func TestCronGetOperationStatusNotScheduledYet(t *testing.T) { + testScheme := runtime.NewScheme() + _ = v1alpha1.AddToScheme(testScheme) + _ = batchv1.AddToScheme(testScheme) + + patch := gomonkey.ApplyFunc(compatibility.IsBatchV1CronJobSupported, func() bool { + return true + }) + defer patch.Reset() + + mockDataProcess := v1alpha1.DataProcess{ + ObjectMeta: v1.ObjectMeta{ + Name: "test", + Namespace: "default", + }, + Spec: v1alpha1.DataProcessSpec{ + Policy: v1alpha1.Cron, + Schedule: "* * * * *", + }, + } + + releaseName := utils.GetDataProcessReleaseName(mockDataProcess.GetName()) + cronjobName := utils.GetDataProcessJobName(releaseName) + + // CronJob exists but has not been scheduled yet (LastScheduleTime is nil) + mockCronJob := batchv1.CronJob{ + ObjectMeta: v1.ObjectMeta{ + Name: cronjobName, + Namespace: "default", + }, + Spec: batchv1.CronJobSpec{ + Schedule: "* * * * *", + }, + Status: batchv1.CronJobStatus{}, + } + + client := fake.NewFakeClientWithScheme(testScheme, &mockDataProcess, &mockCronJob) + handler := &CronStatusHandler{Client: client, dataProcess: &mockDataProcess} + ctx := cruntime.ReconcileRequestContext{Log: fake.NullLogger()} + + opStatus, err := handler.GetOperationStatus(ctx, &mockDataProcess.Status) + if err != nil { + t.Errorf("fail to GetOperationStatus with error %v", err) + } + if opStatus == nil { + t.Error("expected non-nil opStatus") + } +} diff --git a/pkg/dataprocess/generate_values.go b/pkg/dataprocess/generate_values.go index a608449ac7b..d2fc2957ccc 100644 --- a/pkg/dataprocess/generate_values.go +++ b/pkg/dataprocess/generate_values.go @@ -110,6 +110,8 @@ func GenDataProcessValue(dataset *datav1alpha1.Dataset, dataProcess *datav1alpha func transformCommonPart(value *DataProcessValue, dataProcess *datav1alpha1.DataProcess) { value.Name = dataProcess.Name + value.DataProcessInfo.Policy = string(dataProcess.Spec.Policy) + value.DataProcessInfo.Schedule = dataProcess.Spec.Schedule value.DataProcessInfo.Labels = dataProcess.Spec.Processor.PodMetadata.Labels value.DataProcessInfo.Annotations = dataflow.InjectAffinityAnnotation(dataProcess.Annotations, dataProcess.Spec.Processor.PodMetadata.Annotations) value.Owner = transformer.GenerateOwnerReferenceFromObject(dataProcess) diff --git a/pkg/dataprocess/value.go b/pkg/dataprocess/value.go index a3e8fd63445..30098345513 100644 --- a/pkg/dataprocess/value.go +++ b/pkg/dataprocess/value.go @@ -30,6 +30,12 @@ type DataProcessValue struct { } type DataProcessInfo struct { + // Policy for DataProcess, including Once, Cron, OnEvent + Policy string `json:"policy"` + + // Schedule The schedule in Cron format, only set when policy is cron, see https://en.wikipedia.org/wiki/Cron. + Schedule string `json:"schedule,omitempty"` + TargetDataset string `json:"targetDataset,omitempty"` ServiceAccountName string `json:"serviceAccountName,omitempty"`