diff --git a/pkg/ddl/backfilling_dist_executor.go b/pkg/ddl/backfilling_dist_executor.go index 08406f8e93f0b..1db601f5ffda2 100644 --- a/pkg/ddl/backfilling_dist_executor.go +++ b/pkg/ddl/backfilling_dist_executor.go @@ -130,22 +130,16 @@ func (s *backfillDistExecutor) newBackfillStepExecutor( jobMeta := &s.taskMeta.Job ddlObj := s.d - store := ddlObj.store + store := s.TaskStore sessPool := ddlObj.sessPool taskKS := s.task.Keyspace if ddlObj.store.GetKeyspace() != taskKS { - var err error - err = s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error { + if err := s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error { svr := se.GetSQLServer() - store, err = svr.GetKSStore(taskKS) - if err != nil { - return err - } sp, err := svr.GetKSSessPool(taskKS) sessPool = sess.NewSessionPool(sp) return err - }) - if err != nil { + }); err != nil { return nil, err } } diff --git a/pkg/ddl/backfilling_dist_scheduler.go b/pkg/ddl/backfilling_dist_scheduler.go index ff01fbf481d0b..5d6fad855c784 100644 --- a/pkg/ddl/backfilling_dist_scheduler.go +++ b/pkg/ddl/backfilling_dist_scheduler.go @@ -43,7 +43,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/objstore" "github.com/pingcap/tidb/pkg/objstore/storeapi" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" @@ -137,7 +136,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch( } job := &backfillMeta.Job logger.Info("on next subtasks batch") - store, tbl, err := getUserStoreAndTable(ctx, sch.d, sch.d.store, task.Keyspace, job) + tbl, err := getUserTableFromTaskStore(ctx, sch.d, sch.TaskStore, job) if err != nil { return nil, errors.Trace(err) } @@ -147,7 +146,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch( // TODO(tangenta): use available disk during adding index. availableDisk := sch.nodeRes.GetTaskDiskResource(&task.TaskBase, vardef.DDLDiskQuota.Load()) logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk)))) - return generateReadIndexPlan(ctx, sch.d, store, tbl, job, sch.GlobalSort, nodeCnt, logger) + return generateReadIndexPlan(ctx, sch.d, sch.TaskStore, tbl, job, sch.GlobalSort, nodeCnt, logger) case proto.BackfillStepMergeSort: metaBytes, err2 := generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger) if err2 != nil { @@ -170,7 +169,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch( }) return generateGlobalSortIngestPlan( ctx, - store.(kv.StorageWithPD), + sch.TaskStore.(kv.StorageWithPD), taskHandle, task, backfillMeta.CloudStorageURI, @@ -178,42 +177,27 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch( } return nil, nil case proto.BackfillStepMergeTempIndex: - return generateMergeTempIndexPlan(ctx, store, tbl, nodeCnt, backfillMeta.EleIDs, logger) + return generateMergeTempIndexPlan(ctx, sch.TaskStore, tbl, nodeCnt, backfillMeta.EleIDs, logger) default: return nil, nil } } -func getUserStoreAndTable( +func getUserTableFromTaskStore( ctx context.Context, d *ddl, - schStore kv.Storage, - taskKeyspace string, + taskStore kv.Storage, job *model.Job, -) (kv.Storage, table.Table, error) { - store := schStore - if taskKeyspace != d.store.GetKeyspace() { - taskMgr, err := diststorage.GetTaskManager() - if err != nil { - return nil, nil, errors.Trace(err) - } - err = taskMgr.WithNewSession(func(se sessionctx.Context) error { - store, err = se.GetSQLServer().GetKSStore(taskKeyspace) - return err - }) - if err != nil { - return nil, nil, err - } - } - tblInfo, err := getTblInfo(ctx, store, job) +) (table.Table, error) { + tblInfo, err := getTblInfo(ctx, taskStore, job) if err != nil { - return nil, nil, err + return nil, err } tbl, err := getTable(d.ddlCtx.getAutoIDRequirement(), job.SchemaID, tblInfo) if err != nil { - return nil, nil, err + return nil, err } - return store, tbl, nil + return tbl, nil } // GetNextStep implements scheduler.Extension interface. diff --git a/pkg/ddl/backfilling_dist_scheduler_test.go b/pkg/ddl/backfilling_dist_scheduler_test.go index a5f722ba54123..6ce8ad9ca0741 100644 --- a/pkg/ddl/backfilling_dist_scheduler_test.go +++ b/pkg/ddl/backfilling_dist_scheduler_test.go @@ -46,6 +46,9 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) sch, err := ddl.NewBackfillingSchedulerForTest(dom.DDL()) require.NoError(t, err) + sch.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{ + Param: scheduler.Param{TaskStore: store}, + } tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -172,6 +175,9 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) { ext, err := ddl.NewBackfillingSchedulerForTest(dom.DDL()) require.NoError(t, err) ext.(*ddl.LitBackfillScheduler).GlobalSort = true + ext.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{ + Param: scheduler.Param{TaskStore: store}, + } sch.Extension = ext taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, "", 1, "", 0, proto.ExtraParams{}, task.Meta) diff --git a/pkg/dxf/framework/scheduler/interface.go b/pkg/dxf/framework/scheduler/interface.go index e4c77604aade0..54a297a402ff3 100644 --- a/pkg/dxf/framework/scheduler/interface.go +++ b/pkg/dxf/framework/scheduler/interface.go @@ -182,7 +182,9 @@ type Param struct { serverID string allocatedSlots bool nodeRes *proto.NodeResource - // store of the task, this store corresponds to the task keyspace in nextgen. + // TaskStore is the store for task.Keyspace. It equals the instance store in + // classic kernel mode or for SYSTEM-keyspace tasks; otherwise Manager resolves + // it from the task keyspace. TaskStore kv.Storage } diff --git a/pkg/dxf/framework/scheduler/scheduler.go b/pkg/dxf/framework/scheduler/scheduler.go index b4d1668c5330b..60f7bd666eafe 100644 --- a/pkg/dxf/framework/scheduler/scheduler.go +++ b/pkg/dxf/framework/scheduler/scheduler.go @@ -124,7 +124,8 @@ func (s *BaseScheduler) Init() error { if s.TaskStore.GetKeyspace() != s.GetTask().Keyspace { // shouldn't happen normally, but since keyspace mismatch might cause // correctness error, we check it at runtime too. - return errors.New("store keyspace mismatch with task") + return errors.Trace(fmt.Errorf("store keyspace mismatch with task: %s vs %s", + s.TaskStore.GetKeyspace(), s.GetTask().Keyspace)) } return nil } diff --git a/pkg/dxf/framework/taskexecutor/BUILD.bazel b/pkg/dxf/framework/taskexecutor/BUILD.bazel index 4218de82b61ef..a2736b1930230 100644 --- a/pkg/dxf/framework/taskexecutor/BUILD.bazel +++ b/pkg/dxf/framework/taskexecutor/BUILD.bazel @@ -55,7 +55,7 @@ go_test( ], embed = [":taskexecutor"], flaky = True, - shard_count = 14, + shard_count = 17, deps = [ "//pkg/dxf/framework/mock", "//pkg/dxf/framework/mock/execute", @@ -65,10 +65,14 @@ go_test( "//pkg/dxf/framework/taskexecutor/execute", "//pkg/dxf/framework/testutil", "//pkg/kv", + "//pkg/owner", + "//pkg/sessionctx", "//pkg/testkit", "//pkg/testkit/testfailpoint", "//pkg/testkit/testsetup", + "//pkg/util", "//pkg/util/logutil", + "//pkg/util/mock", "@com_github_ngaut_pools//:pools", "@com_github_pingcap_errors//:errors", "@com_github_stretchr_testify//require", diff --git a/pkg/dxf/framework/taskexecutor/manager.go b/pkg/dxf/framework/taskexecutor/manager.go index c053bd9097710..2763e583f7861 100644 --- a/pkg/dxf/framework/taskexecutor/manager.go +++ b/pkg/dxf/framework/taskexecutor/manager.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/pkg/kv" "github.com/pingcap/tidb/pkg/metrics" litstorage "github.com/pingcap/tidb/pkg/objstore" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/vardef" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/backoff" @@ -314,6 +315,19 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b zap.String("task-key", taskBase.Key), zap.Error(err)) return false } + + taskStore := m.store + if m.store.GetKeyspace() != task.Keyspace { + if err2 := m.taskTable.WithNewSession(func(se sessionctx.Context) error { + var err2 error + taskStore, err2 = se.GetSQLServer().GetKSStore(task.Keyspace) + return err2 + }); err2 != nil { + m.logger.Warn("get task store failed", zap.Int64("task-id", task.ID), + zap.String("task-key", task.Key), zap.Error(err2)) + return false + } + } if !m.slotManager.alloc(&task.TaskBase) { m.logger.Info("alloc slots failed, maybe other task executor alloc more slots at runtime", zap.Int64("task-id", taskBase.ID), zap.String("task-key", taskBase.Key), @@ -339,7 +353,7 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b slotMgr: m.slotManager, nodeRc: m.getNodeResource(), execID: m.id, - Store: m.store, + TaskStore: taskStore, }) err = executor.Init(m.ctx) if err != nil { diff --git a/pkg/dxf/framework/taskexecutor/manager_test.go b/pkg/dxf/framework/taskexecutor/manager_test.go index b2c834ae3a726..4010d450b1d65 100644 --- a/pkg/dxf/framework/taskexecutor/manager_test.go +++ b/pkg/dxf/framework/taskexecutor/manager_test.go @@ -17,24 +17,65 @@ package taskexecutor import ( "context" "errors" + "fmt" "testing" "time" "github.com/pingcap/tidb/pkg/dxf/framework/mock" "github.com/pingcap/tidb/pkg/dxf/framework/proto" "github.com/pingcap/tidb/pkg/dxf/framework/storage" + "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/owner" + "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" + tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/logutil" + mockctx "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" ) +type storeWithKS struct { + kv.Storage + ks string +} + +func (s *storeWithKS) GetKeyspace() string { + return s.ks +} + +type testSQLServer struct { + stores map[string]kv.Storage +} + +func (s *testSQLServer) GetKSSessPool(string) (tidbutil.DestroyableSessionPool, error) { + return nil, errors.New("not implemented") +} + +func (s *testSQLServer) GetKSStore(targetKS string) (kv.Storage, error) { + store, ok := s.stores[targetKS] + if !ok { + return nil, fmt.Errorf("ks store not found: %s", targetKS) + } + return store, nil +} + +func (*testSQLServer) GetDDLOwnerMgr() owner.Manager { + return nil +} + +func buildMockSessionWithSQLServer(server *testSQLServer) sessionctx.Context { + se := mockctx.NewContextDeprecated() + se.BindDomainAndSchValidator(server, nil) + return se +} + func TestManageTaskExecutor(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() mockTaskTable := mock.NewMockTaskTable(ctrl) - m, err := NewManager(context.Background(), nil, "test", mockTaskTable, proto.NodeResourceForTest) + m, err := NewManager(context.Background(), &storeWithKS{}, "test", mockTaskTable, proto.NodeResourceForTest) require.NoError(t, err) // add executor 1 @@ -95,7 +136,7 @@ func TestHandleExecutableTasks(t *testing.T) { task := &proto.TaskBase{ID: taskID, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type", RequiredSlots: 6} mockInternalExecutor.EXPECT().GetTaskBase().Return(task).AnyTimes() - m, err := NewManager(ctx, nil, id, mockTaskTable, proto.NodeResourceForTest) + m, err := NewManager(ctx, &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest) require.NoError(t, err) m.slotManager.available.Store(16) @@ -164,7 +205,7 @@ func TestManager(t *testing.T) { }) id := "test" - m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest) + m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest) require.NoError(t, err) task1 := &proto.TaskBase{ID: 1, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type"} @@ -205,7 +246,7 @@ func TestManagerHandleTasks(t *testing.T) { }) id := "test" - m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest) + m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest) require.NoError(t, err) m.slotManager.available.Store(16) @@ -286,7 +327,7 @@ func TestSlotManagerInManager(t *testing.T) { }) id := "test" - m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest) + m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest) require.NoError(t, err) m.slotManager.available.Store(10) @@ -464,6 +505,106 @@ func TestSlotManagerInManager(t *testing.T) { require.True(t, ctrl.Satisfied()) } +func TestStartTaskExecutorResolveTaskStoreFromTaskKeyspace(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + t.Cleanup(func() { + ClearTaskExecutors() + }) + + const ( + instanceKS = "instance_ks" + taskKS = "task_ks" + ) + task := &proto.Task{ + TaskBase: proto.TaskBase{ + ID: 1, + Keyspace: taskKS, + Type: "resolve-store", + Step: proto.StepOne, + State: proto.TaskStateRunning, + RequiredSlots: 1, + }, + } + mockTaskTable := mock.NewMockTaskTable(ctrl) + m, err := NewManager(context.Background(), &storeWithKS{ks: instanceKS}, "test", mockTaskTable, proto.NodeResourceForTest) + require.NoError(t, err) + + taskStore := &storeWithKS{ks: taskKS} + mockSe := buildMockSessionWithSQLServer(&testSQLServer{ + stores: map[string]kv.Storage{ + taskKS: taskStore, + }, + }) + + mockExecutor := mock.NewMockTaskExecutor(ctrl) + var gotStore kv.Storage + RegisterTaskType(task.Type, func(_ context.Context, _ *proto.Task, param Param) TaskExecutor { + gotStore = param.TaskStore + return mockExecutor + }) + mockTaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) + mockTaskTable.EXPECT().WithNewSession(gomock.Any()).DoAndReturn(func(fn func(sessionctx.Context) error) error { + return fn(mockSe) + }) + mockExecutor.EXPECT().Init(gomock.Any()).Return(nil) + runCh := make(chan struct{}) + mockExecutor.EXPECT().GetTaskBase().Return(&task.TaskBase).AnyTimes() + mockExecutor.EXPECT().Run().DoAndReturn(func() { + <-runCh + }) + mockExecutor.EXPECT().Close() + defer func() { + close(runCh) + m.executorWG.Wait() + }() + + require.True(t, m.startTaskExecutor(&task.TaskBase)) + require.Same(t, taskStore, gotStore) +} + +func TestStartTaskExecutorResolveTaskStoreError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + const ( + instanceKS = "instance_ks" + taskKS = "task_ks" + ) + task := &proto.Task{ + TaskBase: proto.TaskBase{ + ID: 2, + Keyspace: taskKS, + Type: "resolve-store-error", + Step: proto.StepOne, + State: proto.TaskStateRunning, + RequiredSlots: 1, + }, + } + mockTaskTable := mock.NewMockTaskTable(ctrl) + m, err := NewManager(context.Background(), &storeWithKS{ks: instanceKS}, "test", mockTaskTable, proto.NodeResourceForTest) + require.NoError(t, err) + + mockSe := buildMockSessionWithSQLServer(&testSQLServer{ + stores: map[string]kv.Storage{}, + }) + factoryCalled := false + RegisterTaskType(task.Type, func(context.Context, *proto.Task, Param) TaskExecutor { + factoryCalled = true + return nil + }) + + mockTaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil) + mockTaskTable.EXPECT().WithNewSession(gomock.Any()).DoAndReturn(func(fn func(sessionctx.Context) error) error { + return fn(mockSe) + }) + + require.False(t, m.startTaskExecutor(&task.TaskBase)) + require.False(t, factoryCalled) + require.False(t, m.isExecutorStarted(task.ID)) + require.Equal(t, proto.NodeResourceForTest.TotalCPU, m.slotManager.availableSlots()) +} + func TestManagerInitMeta(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() diff --git a/pkg/dxf/framework/taskexecutor/task_executor.go b/pkg/dxf/framework/taskexecutor/task_executor.go index 4fa1ce10090ae..0de16ead4b2dd 100644 --- a/pkg/dxf/framework/taskexecutor/task_executor.go +++ b/pkg/dxf/framework/taskexecutor/task_executor.go @@ -74,7 +74,10 @@ type Param struct { nodeRc *proto.NodeResource // id, it's the same as server id now, i.e. host:port. execID string - Store kv.Storage + // TaskStore is the store for task.Keyspace. It equals the instance store in + // classic kernel mode or for SYSTEM-keyspace tasks; otherwise Manager resolves + // it from the task keyspace. + TaskStore kv.Storage } // NewParamForTest creates a new Param for test. @@ -84,7 +87,7 @@ func NewParamForTest(taskTable TaskTable, slotMgr *slotManager, nodeRc *proto.No slotMgr: slotMgr, nodeRc: nodeRc, execID: execID, - Store: store, + TaskStore: store, } } @@ -255,7 +258,13 @@ func (e *BaseTaskExecutor) updateSubtaskSummaryLoop( } // Init implements the TaskExecutor interface. -func (*BaseTaskExecutor) Init(_ context.Context) error { +func (e *BaseTaskExecutor) Init(_ context.Context) error { + if e.TaskStore.GetKeyspace() != e.GetTaskBase().Keyspace { + // shouldn't happen normally, but since keyspace mismatch might cause + // correctness error, we check it at runtime too. + return errors.Trace(fmt.Errorf("store keyspace mismatch with task: %s vs %s", + e.TaskStore.GetKeyspace(), e.GetTaskBase().Keyspace)) + } return nil } diff --git a/pkg/dxf/framework/taskexecutor/task_executor_test.go b/pkg/dxf/framework/taskexecutor/task_executor_test.go index 67f531c1e665a..b1dd3a8e1317b 100644 --- a/pkg/dxf/framework/taskexecutor/task_executor_test.go +++ b/pkg/dxf/framework/taskexecutor/task_executor_test.go @@ -140,6 +140,24 @@ func (e *taskExecutorRunEnv) mockForCheckBalanceSubtask() { }).AnyTimes() } +func TestBaseTaskExecutorInitChecksTaskStoreKeyspace(t *testing.T) { + task := &proto.Task{TaskBase: proto.TaskBase{ + ID: 1, + Type: proto.TaskTypeExample, + Keyspace: "task_ks", + }} + + taskExecutor := NewBaseTaskExecutor(context.Background(), task, Param{ + TaskStore: &storeWithKS{ks: task.Keyspace}, + }) + require.NoError(t, taskExecutor.Init(context.Background())) + + taskExecutor = NewBaseTaskExecutor(context.Background(), task, Param{ + TaskStore: &storeWithKS{ks: "other_ks"}, + }) + require.ErrorContains(t, taskExecutor.Init(context.Background()), "store keyspace mismatch with task") +} + func TestTaskExecutorRun(t *testing.T) { t.Run("context done when run", func(t *testing.T) { e := newTaskExecutorRunEnv(t) diff --git a/pkg/dxf/importinto/BUILD.bazel b/pkg/dxf/importinto/BUILD.bazel index 142df55be94f2..085fe47cf6d06 100644 --- a/pkg/dxf/importinto/BUILD.bazel +++ b/pkg/dxf/importinto/BUILD.bazel @@ -110,7 +110,7 @@ go_test( ], embed = [":importinto"], flaky = True, - shard_count = 27, + shard_count = 28, deps = [ "//pkg/config", "//pkg/config/kerneltype", diff --git a/pkg/dxf/importinto/task_executor.go b/pkg/dxf/importinto/task_executor.go index bc02994b5822c..9d6aa1a50939f 100644 --- a/pkg/dxf/importinto/task_executor.go +++ b/pkg/dxf/importinto/task_executor.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/pkg/objstore/recording" "github.com/pingcap/tidb/pkg/objstore/storeapi" "github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool" - "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/table/tables" "github.com/pingcap/tidb/pkg/util/dbterror/exeerrors" "github.com/pingcap/tidb/pkg/util/logutil" @@ -865,7 +864,6 @@ func (p *postProcessStepExecutor) RunSubtask(ctx context.Context, subtask *proto type importExecutor struct { *taskexecutor.BaseTaskExecutor - store tidbkv.Storage indicesGenKV map[int64]importer.GenKVIndex } @@ -880,7 +878,6 @@ func NewImportExecutor( s := &importExecutor{ BaseTaskExecutor: taskexecutor.NewBaseTaskExecutor(subCtx, task, param), - store: param.Store, } s.BaseTaskExecutor.Extension = s return s @@ -912,18 +909,7 @@ func (e *importExecutor) GetStepExecutor(task *proto.Task) (execute.StepExecutor indicesGenKV := importer.GetIndicesGenKV(taskMeta.Plan.TableInfo) logger.Info("got indices that generate kv", zap.Any("indices", indicesGenKV)) - store := e.store - if e.store.GetKeyspace() != task.Keyspace { - var err error - err = e.GetTaskTable().WithNewSession(func(se sessionctx.Context) error { - store, err = se.GetSQLServer().GetKSStore(task.Keyspace) - return err - }) - if err != nil { - return nil, err - } - } - + store := e.TaskStore switch task.Step { case proto.ImportStepImport, proto.ImportStepEncodeAndSort: return &importStepExecutor{ diff --git a/pkg/dxf/importinto/task_executor_test.go b/pkg/dxf/importinto/task_executor_test.go index e654d208adf56..45c449fd2ff98 100644 --- a/pkg/dxf/importinto/task_executor_test.go +++ b/pkg/dxf/importinto/task_executor_test.go @@ -70,6 +70,30 @@ func TestImportTaskExecutor(t *testing.T) { require.Error(t, err) } +func TestImportTaskExecutorUsesTaskStoreWithoutExtraLookup(t *testing.T) { + ctx := context.Background() + taskStore := &StoreWithKS{ks: "task_ks"} + executor := NewImportExecutor( + ctx, + &proto.Task{ + TaskBase: proto.TaskBase{ID: 2}, + }, + taskexecutor.NewParamForTest(nil, nil, nil, ":4000", taskStore), + ).(*importExecutor) + + taskMeta := []byte(`{"Plan":{"TableInfo":{}}}`) + stepExecutor, err := executor.GetStepExecutor(&proto.Task{ + TaskBase: proto.TaskBase{ + ID: 2, + Step: proto.ImportStepImport, + Keyspace: "another_ks", + }, + Meta: taskMeta, + }) + require.NoError(t, err) + require.Same(t, taskStore, stepExecutor.(*importStepExecutor).store) +} + func TestGetOnDupForKVGroup(t *testing.T) { t.Run("data-kv-group", func(t *testing.T) { onDup, err := getOnDupForKVGroup(nil, globalsort.DataKVGroup, importer.OnDupKeyModeCapture)