Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions api/v1alpha1/dataprocess_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ type DataProcessSpec struct {
// TTLSecondsAfterFinished is the time second to clean up data operations after finished or failed
// +optional
TTLSecondsAfterFinished *int32 `json:"ttlSecondsAfterFinished,omitempty"`

//+kubebuilder:default:=Once
//+kubebuilder:validation:Enum=Once;Cron;OnEvent
// Policy defines the operation policy, including Once, Cron, OnEvent

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When Policy is set to Cron, the Schedule field must be non-empty and contain a valid cron expression. Currently there is no validation for this in dataProcessOperation.Validate() in implement.go. Please add validation to check:

  1. If policy == Cron, schedule must not be empty
  2. Optionally validate the cron expression format

// +optional
Policy Policy `json:"policy,omitempty"`

// Schedule defines the Cron schedule, only used when Policy is Cron.
// See https://en.wikipedia.org/wiki/Cron.
// +optional
Schedule string `json:"schedule,omitempty"`
}

// +kubebuilder:printcolumn:name="Dataset",type="string",JSONPath=`.spec.dataset.name`
Expand Down
14 changes: 14 additions & 0 deletions api/v1alpha1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

91 changes: 91 additions & 0 deletions charts/fluid-dataprocess/common/templates/cronjob.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{{- if eq (lower .Values.dataProcess.policy) "cron" }}
apiVersion: {{ ternary "batch/v1" "batch/v1beta1" (.Capabilities.APIVersions.Has "batch/v1/CronJob") }}
kind: CronJob
metadata:
name: {{ printf "%s-job" .Release.Name }}
labels:
release: {{ .Release.Name }}
role: dataprocess-cronjob
app: fluid-dataprocess
targetDataset: {{ required "targetDataset should be set" .Values.dataProcess.targetDataset }}
fluid.io/jobPolicy: cron
{{- include "library.fluid.labels" . | nindent 4 }}
ownerReferences:
{{- if .Values.owner.enabled }}
- apiVersion: {{ .Values.owner.apiVersion }}
blockOwnerDeletion: {{ .Values.owner.blockOwnerDeletion }}
controller: {{ .Values.owner.controller }}
kind: {{ .Values.owner.kind }}
name: {{ .Values.owner.name }}
uid: {{ .Values.owner.uid }}
{{- end }}
spec:
schedule: "{{ .Values.dataProcess.schedule }}"
jobTemplate:
spec:
backoffLimit: 3
completions: 1
parallelism: 1
template:
metadata:
name: {{ printf "%s-process" .Release.Name }}
annotations:
sidecar.istio.io/inject: "false"
{{- if .Values.dataProcess.annotations }}
{{ toYaml .Values.dataProcess.annotations | nindent 12 }}
{{- end }}
labels:
release: {{ .Release.Name }}
role: dataprocess-pod
app: fluid-dataprocess
cronjob: {{ printf "%s-job" .Release.Name }}
targetDataset: {{ required "targetDataset should be set" .Values.dataProcess.targetDataset }}
{{- include "library.fluid.labels" . | nindent 12 }}
{{- if .Values.dataProcess.labels }}
{{ toYaml .Values.dataProcess.labels | nindent 12 }}
{{- end }}
spec:
{{- if .Values.dataProcess.serviceAccountName }}
serviceAccountName: {{ .Values.dataProcess.serviceAccountName | quote }}
{{- end }}
{{- if .Values.dataProcess.jobProcessor.podSpec }}
{{- toYaml .Values.dataProcess.jobProcessor.podSpec | nindent 10 }}
{{- else if .Values.dataProcess.scriptProcessor }}
restartPolicy: {{ .Values.dataProcess.scriptProcessor.restartPolicy | default "Never" | quote }}
containers:
- name: script-processor
image: {{ required "DataProcess image should be set" .Values.dataProcess.scriptProcessor.image }}
imagePullPolicy: {{ .Values.dataProcess.scriptProcessor.imagePullPolicy }}
{{- if .Values.dataProcess.scriptProcessor.command }}
command:
{{ toYaml .Values.dataProcess.scriptProcessor.command | nindent 14 }}
{{- end }}
args: ["/fluid-scripts/preprocess.sh"]
{{- if .Values.dataProcess.scriptProcessor.resources}}

Check warning on line 64 in charts/fluid-dataprocess/common/templates/cronjob.yaml

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a whitespace before "}}" in the template directive.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ7KJZF_EhMlwP9QmVIk&open=AZ7KJZF_EhMlwP9QmVIk&pullRequest=5969
resources:
{{- toYaml .Values.dataProcess.scriptProcessor.resources | nindent 16}}

Check warning on line 66 in charts/fluid-dataprocess/common/templates/cronjob.yaml

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a whitespace before "}}" in the template directive.

See more on https://sonarcloud.io/project/issues?id=fluid-cloudnative_fluid&issues=AZ7KJZF_EhMlwP9QmVIl&open=AZ7KJZF_EhMlwP9QmVIl&pullRequest=5969
{{- end }}
{{- if .Values.dataProcess.scriptProcessor.envs }}
env:
{{ toYaml .Values.dataProcess.scriptProcessor.envs | nindent 14 }}
{{- end }}
volumeMounts:
- name: script-cm-vol
mountPath: /fluid-scripts/preprocess.sh
subPath: preprocess.sh
{{- if .Values.dataProcess.scriptProcessor.volumeMounts }}
{{- toYaml .Values.dataProcess.scriptProcessor.volumeMounts | nindent 16 }}
{{- end }}
{{- if .Values.dataProcess.scriptProcessor.affinity }}
affinity:
{{ toYaml .Values.dataProcess.scriptProcessor.affinity | indent 12 }}
{{- end }}
volumes:
- name: script-cm-vol
configMap:
name: {{ .Release.Name }}-scripts
{{- if .Values.dataProcess.scriptProcessor.volumes }}
{{- toYaml .Values.dataProcess.scriptProcessor.volumes | nindent 12 }}
{{- end }}
{{- end }}
{{- end }}
2 changes: 2 additions & 0 deletions charts/fluid-dataprocess/common/templates/job.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{{- if or (eq (lower .Values.dataProcess.policy) "") (eq (lower .Values.dataProcess.policy) "once") }}
apiVersion: batch/v1
kind: Job
metadata:
Expand Down Expand Up @@ -82,3 +83,4 @@ spec:
{{- toYaml .Values.dataProcess.scriptProcessor.volumes | nindent 8 }}
{{- end }}
{{- end }}
{{- end }}
4 changes: 4 additions & 0 deletions charts/fluid-dataprocess/common/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ owner:
controller: false

dataProcess:
# policy for DataProcess: Once, Cron, OnEvent
policy: ""
# schedule for cron policy, see https://en.wikipedia.org/wiki/Cron
schedule: ""
targetDataset: ""
labels: {}
annotations: {}
Expand Down
9 changes: 9 additions & 0 deletions charts/fluid/fluid/crds/data.fluid.io_dataprocesses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ spec:
- mountPath
- name
type: object
policy:
default: Once
enum:
- Once
- Cron
- OnEvent
type: string
processor:
properties:
job:
Expand Down Expand Up @@ -4362,6 +4369,8 @@ spec:
- kind
- name
type: object
schedule:
type: string
ttlSecondsAfterFinished:
format: int32
type: integer
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/data.fluid.io_dataprocesses.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ spec:
- mountPath
- name
type: object
policy:
default: Once
enum:
- Once
- Cron
- OnEvent
type: string
processor:
properties:
job:
Expand Down Expand Up @@ -4362,6 +4369,8 @@ spec:
- kind
- name
type: object
schedule:
type: string
ttlSecondsAfterFinished:
format: int32
type: integer
Expand Down
3 changes: 2 additions & 1 deletion pkg/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,8 @@ const (

DataProcessMultipleProcessorSpecified = "MultipleProcessorSpecified"

DataProcessConflictMountPath = "ConflictMountPath"
DataProcessConflictMountPath = "ConflictMountPath"
DataProcessScheduleNotSpecified = "ScheduleNotSpecified"
)

type CacheStoreType string
Expand Down
50 changes: 47 additions & 3 deletions pkg/controllers/v1alpha1/dataprocess/implement.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,27 @@ func (r *dataProcessOperation) Validate(ctx runtime.ReconcileRequestContext) ([]
}, err
}

// DataProcess with Cron policy must specify a non-empty schedule
if dataProcess.Spec.Policy == datav1alpha1.Cron && dataProcess.Spec.Schedule == "" {
r.Recorder.Eventf(dataProcess,
corev1.EventTypeWarning,
common.DataProcessScheduleNotSpecified,
"DataProcess(%s)'s policy is Cron but spec.schedule is not specified",
dataProcess.Name,
)
err := fmt.Errorf("DataProcess(%s/%s)'s policy is Cron but spec.schedule is not specified", dataProcess.Namespace, dataProcess.Name)
now := time.Now()
return []datav1alpha1.Condition{
{
Type: common.Failed,
Status: corev1.ConditionTrue,
Reason: common.DataProcessScheduleNotSpecified,
Message: "DataProcess's policy is Cron but spec.schedule is not specified",
LastProbeTime: metav1.NewTime(now),
LastTransitionTime: metav1.NewTime(now),
},
}, err
}
return nil, nil
}

Expand All @@ -219,15 +240,38 @@ func (r *dataProcessOperation) RemoveTargetDatasetStatusInProgress(dataset *data
// DataProcess does not need to recover Dataset status after execution.
}

// GetStatusHandler implements dataoperation.OperationInterface.
// Unlike DataLoad, which returns nil for an unrecognized policy, this defaults
// to OnceStatusHandler since policy is optional with a kubebuilder default of
// Once, so an empty value should be treated the same as Once.
func (r *dataProcessOperation) GetStatusHandler() dataoperation.StatusHandler {
// TODO: Support dataProcess.Spec.Policy
return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
policy := r.dataProcess.Spec.Policy
switch policy {
case datav1alpha1.Cron:
return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.OnEvent:
return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.Once:
fallthrough
default:
return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
}
Comment on lines +248 to +258

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Returning nil from GetStatusHandler when the policy is empty or unrecognized can cause a nil pointer dereference panic in the caller. It is safer to default to OnceStatusHandler to ensure robust behavior.

Suggested change
policy := r.dataProcess.Spec.Policy
switch policy {
case datav1alpha1.Once:
return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.Cron:
return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.OnEvent:
return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
default:
return nil
}
policy := r.dataProcess.Spec.Policy
switch policy {
case datav1alpha1.Cron:
return &CronStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.OnEvent:
return &OnEventStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
case datav1alpha1.Once:
fallthrough
default:
return &OnceStatusHandler{Client: r.Client, dataProcess: r.dataProcess}
}

}

// GetTTL implements dataoperation.OperationInterface.
// GetTTL implements dataoperation.OperationInterface.
// Cron and OnEvent policies are recurring/event-driven operations and should
// not be cleaned up via TTL; only Once (and the default/empty policy, which
// behaves like Once, see GetStatusHandler) uses TTLSecondsAfterFinished.
func (r *dataProcessOperation) GetTTL() (ttl *int32, err error) {
dataProcess := r.dataProcess

ttl = r.dataProcess.Spec.TTLSecondsAfterFinished
switch dataProcess.Spec.Policy {
case datav1alpha1.Cron, datav1alpha1.OnEvent:
ttl = nil
default:
ttl = dataProcess.Spec.TTLSecondsAfterFinished
}
return
}

Expand Down
Loading
Loading