Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import (
"context"
"encoding/json"
"strings"

Check failure on line 20 in pkg/ddl/backfilling_dist_executor.go

View workflow job for this annotation

GitHub Actions / Bazel Crossbuild (ubuntu-latest)

"strings" imported and not used

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/logutil"
Expand All @@ -33,6 +34,12 @@
"go.uber.org/zap"
)

var errBackfillTaskMetaOutdated = errors.New("backfill task meta is outdated")

func isBackfillTaskMetaOutdatedErr(err error) bool {
return errors.Cause(err) == errBackfillTaskMetaOutdated
}

// Version constants for BackfillTaskMeta.
const (
BackfillTaskMetaVersion0 = iota
Expand Down Expand Up @@ -164,7 +171,9 @@
logutil.DDLIngestLogger().Warn("index info not found",
zap.Int64("table ID", tbl.Meta().ID),
zap.Int64("index ID", eid))
return nil, errors.Errorf("index info not found: %d", eid)
return nil, errors.Annotatef(errBackfillTaskMetaOutdated,
"index info not found: %d, table ID: %d, job ID: %d",
eid, tbl.Meta().ID, jobMeta.ID)
}
indexInfos = append(indexInfos, indexInfo)
}
Expand Down Expand Up @@ -246,6 +255,9 @@
}

func (*backfillDistExecutor) IsRetryableError(err error) bool {
if isBackfillTaskMetaOutdatedErr(err) {
return false
}
return common.IsRetryableError(err) || isRetryableError(err, true)
}

Expand Down
44 changes: 44 additions & 0 deletions pkg/ddl/backfilling_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@ package ddl
import (
"bytes"
"context"
"encoding/json"
"math"
"testing"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
"github.com/pingcap/tidb/pkg/errctx"
"github.com/pingcap/tidb/pkg/expression/exprstatic"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/mysql"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
Expand Down Expand Up @@ -64,6 +68,46 @@ func TestDoneTaskKeeper(t *testing.T) {
require.True(t, bytes.Equal(n.nextKey, kv.Key("h")))
}

func TestOutdatedBackfillTaskMetaIsNonRetryable(t *testing.T) {
err := errors.Annotatef(errBackfillTaskMetaOutdated, "index info not found: %d", 1)
require.False(t, isRetryableError(err, true))
require.False(t, (&backfillDistExecutor{}).IsRetryableError(err))

restoredErr := errors.New(err.Error())
require.False(t, isRetryableError(restoredErr, true))
require.False(t, (&backfillDistExecutor{}).IsRetryableError(restoredErr))
}

func TestValidateBackfillTaskMeta(t *testing.T) {
job := &model.Job{ID: 1, SchemaID: 2, TableID: 3}
reorgInfo := &reorgInfo{
Job: job,
elements: []*meta.Element{{ID: 10, TypeKey: meta.IndexElementKey}},
currElement: &meta.Element{ID: 10, TypeKey: meta.IndexElementKey},
}
taskMeta := &BackfillTaskMeta{
Job: *job,
EleIDs: []int64{10},
EleTypeKey: meta.IndexElementKey,
}
taskMetaBytes, err := json.Marshal(taskMeta)
require.NoError(t, err)
task := &proto.Task{
TaskBase: proto.TaskBase{
ID: 5,
Key: "ddl/backfill/1",
},
Meta: taskMetaBytes,
}
require.NoError(t, validateBackfillTaskMeta(task, reorgInfo))

taskMeta.EleIDs = []int64{11}
task.Meta, err = json.Marshal(taskMeta)
require.NoError(t, err)
err = validateBackfillTaskMeta(task, reorgInfo)
require.ErrorIs(t, errors.Cause(err), errBackfillTaskMetaOutdated)
}

func TestPickBackfillType(t *testing.T) {
ingest.LitDiskRoot = ingest.NewDiskRootImpl(t.TempDir())
ingest.LitMemRoot = ingest.NewMemRootImpl(math.MaxInt64)
Expand Down
29 changes: 29 additions & 0 deletions pkg/ddl/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -1935,6 +1935,9 @@ func isRetryableJobError(err error, jobErrCnt int64) bool {
}

func isRetryableError(err error, retryUnknown bool) bool {
if isBackfillTaskMetaOutdatedErr(err) {
return false
}
errMsg := err.Error()
for _, m := range dbterror.ReorgRetryableErrMsgs {
if strings.Contains(errMsg, m) {
Expand Down Expand Up @@ -3101,6 +3104,15 @@ func (w *worker) executeDistTask(jobCtx *jobContext, t table.Table, reorgInfo *r
if err := json.Unmarshal(task.Meta, taskMeta); err != nil {
return errors.Trace(err)
}
if err := validateBackfillTaskMeta(task, reorgInfo); err != nil {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need to validate and fail the task here, if DXF subtask failed as we have make the error not retryable, the whole task failed, then DDL part will notices that it's reverted and fail the DDL job

if !task.TaskBase.IsDone() {
if err1 := taskManager.FailTask(w.workCtx, task.ID, task.State, err); err1 != nil {
return err1
}
handle.NotifyTaskChange()
}
return err
}
taskID = task.ID
lastRequiredSlots = task.RequiredSlots
lastBatchSize = taskMeta.Job.ReorgMeta.GetBatchSize()
Expand Down Expand Up @@ -3209,6 +3221,23 @@ func (w *worker) executeDistTask(jobCtx *jobContext, t table.Table, reorgInfo *r
return err
}

func validateBackfillTaskMeta(task *proto.Task, reorgInfo *reorgInfo) error {
taskMeta := &BackfillTaskMeta{}
if err := json.Unmarshal(task.Meta, taskMeta); err != nil {
return errors.Trace(err)
}
job := reorgInfo.Job
if taskMeta.Job.ID != job.ID ||
taskMeta.Job.SchemaID != job.SchemaID ||
taskMeta.Job.TableID != job.TableID ||
!bytes.Equal(taskMeta.EleTypeKey, reorgInfo.currElement.TypeKey) ||
!slices.Equal(taskMeta.EleIDs, extractElemIDs(reorgInfo)) {
return errors.Annotatef(errBackfillTaskMetaOutdated,
"task ID: %d, task key: %s, job ID: %d", task.ID, task.Key, job.ID)
}
return nil
}

func (w *worker) checkRunnableOrHandlePauseOrCanceled(stepCtx context.Context, taskKey string) (err error) {
if err = w.isReorgRunnable(stepCtx, true); err != nil {
if dbterror.ErrPausedDDLJob.Equal(err) {
Expand Down
Loading