Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,16 @@ func (s *backfillDistExecutor) newBackfillStepExecutor(
jobMeta := &s.taskMeta.Job
ddlObj := s.d

store := ddlObj.store
store := s.TaskStore
Comment thread
D3Hunter marked this conversation as resolved.
sessPool := ddlObj.sessPool
taskKS := s.task.Keyspace
if ddlObj.store.GetKeyspace() != taskKS {
Comment thread
D3Hunter marked this conversation as resolved.
var err error
err = s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error {
if err := s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error {
svr := se.GetSQLServer()
store, err = svr.GetKSStore(taskKS)
if err != nil {
return err
}
sp, err := svr.GetKSSessPool(taskKS)
sessPool = sess.NewSessionPool(sp)
return err
})
if err != nil {
}); err != nil {
return nil, err
}
}
Expand Down
38 changes: 11 additions & 27 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -137,7 +136,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
}
job := &backfillMeta.Job
logger.Info("on next subtasks batch")
store, tbl, err := getUserStoreAndTable(ctx, sch.d, sch.d.store, task.Keyspace, job)
tbl, err := getUserTableFromTaskStore(ctx, sch.d, sch.TaskStore, job)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -147,7 +146,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
// TODO(tangenta): use available disk during adding index.
availableDisk := sch.nodeRes.GetTaskDiskResource(&task.TaskBase, vardef.DDLDiskQuota.Load())
logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk))))
return generateReadIndexPlan(ctx, sch.d, store, tbl, job, sch.GlobalSort, nodeCnt, logger)
return generateReadIndexPlan(ctx, sch.d, sch.TaskStore, tbl, job, sch.GlobalSort, nodeCnt, logger)
case proto.BackfillStepMergeSort:
metaBytes, err2 := generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger)
if err2 != nil {
Expand All @@ -170,50 +169,35 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
})
return generateGlobalSortIngestPlan(
ctx,
store.(kv.StorageWithPD),
sch.TaskStore.(kv.StorageWithPD),
taskHandle,
task,
backfillMeta.CloudStorageURI,
logger)
}
return nil, nil
case proto.BackfillStepMergeTempIndex:
return generateMergeTempIndexPlan(ctx, store, tbl, nodeCnt, backfillMeta.EleIDs, logger)
return generateMergeTempIndexPlan(ctx, sch.TaskStore, tbl, nodeCnt, backfillMeta.EleIDs, logger)
default:
return nil, nil
}
}

func getUserStoreAndTable(
func getUserTableFromTaskStore(
ctx context.Context,
d *ddl,
schStore kv.Storage,
taskKeyspace string,
taskStore kv.Storage,
job *model.Job,
) (kv.Storage, table.Table, error) {
store := schStore
if taskKeyspace != d.store.GetKeyspace() {
taskMgr, err := diststorage.GetTaskManager()
if err != nil {
return nil, nil, errors.Trace(err)
}
err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
store, err = se.GetSQLServer().GetKSStore(taskKeyspace)
return err
})
if err != nil {
return nil, nil, err
}
}
tblInfo, err := getTblInfo(ctx, store, job)
) (table.Table, error) {
Comment thread
D3Hunter marked this conversation as resolved.
tblInfo, err := getTblInfo(ctx, taskStore, job)
if err != nil {
return nil, nil, err
return nil, err
}
tbl, err := getTable(d.ddlCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
if err != nil {
return nil, nil, err
return nil, err
}
return store, tbl, nil
return tbl, nil
}

// GetNextStep implements scheduler.Extension interface.
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sch, err := ddl.NewBackfillingSchedulerForTest(dom.DDL())
require.NoError(t, err)
sch.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{
Comment thread
D3Hunter marked this conversation as resolved.
Param: scheduler.Param{TaskStore: store},
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

Expand Down Expand Up @@ -172,6 +175,9 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
ext, err := ddl.NewBackfillingSchedulerForTest(dom.DDL())
require.NoError(t, err)
ext.(*ddl.LitBackfillScheduler).GlobalSort = true
ext.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{
Param: scheduler.Param{TaskStore: store},
}
sch.Extension = ext

taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, "", 1, "", 0, proto.ExtraParams{}, task.Meta)
Expand Down
6 changes: 5 additions & 1 deletion pkg/dxf/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 14,
shard_count = 16,
deps = [
"//pkg/dxf/framework/mock",
"//pkg/dxf/framework/mock/execute",
Expand All @@ -65,10 +65,14 @@ go_test(
"//pkg/dxf/framework/taskexecutor/execute",
"//pkg/dxf/framework/testutil",
"//pkg/kv",
"//pkg/owner",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util",
"//pkg/util/logutil",
"//pkg/util/mock",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
15 changes: 14 additions & 1 deletion pkg/dxf/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
litstorage "github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
Expand Down Expand Up @@ -314,6 +315,18 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b
zap.String("task-key", taskBase.Key), zap.Error(err))
return false
}

taskStore := m.store
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
D3Hunter marked this conversation as resolved.
if m.store.GetKeyspace() != task.Keyspace {
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
D3Hunter marked this conversation as resolved.
if err2 := m.taskTable.WithNewSession(func(se sessionctx.Context) error {
var err2 error
taskStore, err2 = se.GetSQLServer().GetKSStore(task.Keyspace)
return err2
}); err2 != nil {
m.failSubtask(err2, task.ID, nil)
Comment thread
D3Hunter marked this conversation as resolved.
Outdated
return false
Comment thread
D3Hunter marked this conversation as resolved.
}
}
Comment thread
D3Hunter marked this conversation as resolved.
if !m.slotManager.alloc(&task.TaskBase) {
m.logger.Info("alloc slots failed, maybe other task executor alloc more slots at runtime",
zap.Int64("task-id", taskBase.ID), zap.String("task-key", taskBase.Key),
Expand All @@ -339,7 +352,7 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b
slotMgr: m.slotManager,
nodeRc: m.getNodeResource(),
execID: m.id,
Store: m.store,
TaskStore: taskStore,
})
err = executor.Init(m.ctx)
if err != nil {
Expand Down
147 changes: 142 additions & 5 deletions pkg/dxf/framework/taskexecutor/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,65 @@ package taskexecutor
import (
"context"
"errors"
"fmt"
"testing"
"time"

"github.com/pingcap/tidb/pkg/dxf/framework/mock"
"github.com/pingcap/tidb/pkg/dxf/framework/proto"
"github.com/pingcap/tidb/pkg/dxf/framework/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/logutil"
mockctx "github.com/pingcap/tidb/pkg/util/mock"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
)

type storeWithKS struct {
kv.Storage
ks string
}

func (s *storeWithKS) GetKeyspace() string {
return s.ks
}

type testSQLServer struct {
stores map[string]kv.Storage
}

func (s *testSQLServer) GetKSSessPool(string) (tidbutil.DestroyableSessionPool, error) {
return nil, errors.New("not implemented")
}

func (s *testSQLServer) GetKSStore(targetKS string) (kv.Storage, error) {
store, ok := s.stores[targetKS]
if !ok {
return nil, fmt.Errorf("ks store not found: %s", targetKS)
}
return store, nil
}

func (*testSQLServer) GetDDLOwnerMgr() owner.Manager {
return nil
}

func buildMockSessionWithSQLServer(server *testSQLServer) sessionctx.Context {
se := mockctx.NewContextDeprecated()
se.BindDomainAndSchValidator(server, nil)
return se
}

func TestManageTaskExecutor(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

mockTaskTable := mock.NewMockTaskTable(ctrl)
m, err := NewManager(context.Background(), nil, "test", mockTaskTable, proto.NodeResourceForTest)
m, err := NewManager(context.Background(), &storeWithKS{}, "test", mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)

// add executor 1
Expand Down Expand Up @@ -95,7 +136,7 @@ func TestHandleExecutableTasks(t *testing.T) {
task := &proto.TaskBase{ID: taskID, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type", RequiredSlots: 6}
mockInternalExecutor.EXPECT().GetTaskBase().Return(task).AnyTimes()

m, err := NewManager(ctx, nil, id, mockTaskTable, proto.NodeResourceForTest)
m, err := NewManager(ctx, &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)
m.slotManager.available.Store(16)

Expand Down Expand Up @@ -164,7 +205,7 @@ func TestManager(t *testing.T) {
})
id := "test"

m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest)
m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)

task1 := &proto.TaskBase{ID: 1, State: proto.TaskStateRunning, Step: proto.StepOne, Type: "type"}
Expand Down Expand Up @@ -205,7 +246,7 @@ func TestManagerHandleTasks(t *testing.T) {
})
id := "test"

m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest)
m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)
m.slotManager.available.Store(16)

Expand Down Expand Up @@ -286,7 +327,7 @@ func TestSlotManagerInManager(t *testing.T) {
})
id := "test"

m, err := NewManager(context.Background(), nil, id, mockTaskTable, proto.NodeResourceForTest)
m, err := NewManager(context.Background(), &storeWithKS{}, id, mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)
m.slotManager.available.Store(10)

Expand Down Expand Up @@ -464,6 +505,102 @@ func TestSlotManagerInManager(t *testing.T) {
require.True(t, ctrl.Satisfied())
}

func TestStartTaskExecutorResolveTaskStoreFromTaskKeyspace(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

const (
instanceKS = "instance_ks"
taskKS = "task_ks"
)
task := &proto.Task{
TaskBase: proto.TaskBase{
ID: 1,
Keyspace: taskKS,
Type: "resolve-store",
Step: proto.StepOne,
State: proto.TaskStateRunning,
RequiredSlots: 1,
},
}
mockTaskTable := mock.NewMockTaskTable(ctrl)
m, err := NewManager(context.Background(), &storeWithKS{ks: instanceKS}, "test", mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)

taskStore := &storeWithKS{ks: taskKS}
mockSe := buildMockSessionWithSQLServer(&testSQLServer{
stores: map[string]kv.Storage{
taskKS: taskStore,
},
})

Comment thread
D3Hunter marked this conversation as resolved.
mockExecutor := mock.NewMockTaskExecutor(ctrl)
var gotStore kv.Storage
RegisterTaskType(task.Type, func(_ context.Context, _ *proto.Task, param Param) TaskExecutor {
Comment thread
D3Hunter marked this conversation as resolved.
gotStore = param.TaskStore
return mockExecutor
})
mockTaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil)
mockTaskTable.EXPECT().WithNewSession(gomock.Any()).DoAndReturn(func(fn func(sessionctx.Context) error) error {
return fn(mockSe)
})
mockExecutor.EXPECT().Init(gomock.Any()).Return(nil)
runCh := make(chan struct{})
Comment thread
D3Hunter marked this conversation as resolved.
mockExecutor.EXPECT().GetTaskBase().Return(&task.TaskBase).AnyTimes()
mockExecutor.EXPECT().Run().DoAndReturn(func() {
<-runCh
})
mockExecutor.EXPECT().Close()
defer func() {
close(runCh)
m.executorWG.Wait()
}()

require.True(t, m.startTaskExecutor(&task.TaskBase))
require.Same(t, taskStore, gotStore)
}

func TestStartTaskExecutorResolveTaskStoreError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

const (
instanceKS = "instance_ks"
taskKS = "task_ks"
)
task := &proto.Task{
TaskBase: proto.TaskBase{
ID: 2,
Keyspace: taskKS,
Type: "resolve-store-error",
Step: proto.StepOne,
State: proto.TaskStateRunning,
RequiredSlots: 1,
},
}
mockTaskTable := mock.NewMockTaskTable(ctrl)
m, err := NewManager(context.Background(), &storeWithKS{ks: instanceKS}, "test", mockTaskTable, proto.NodeResourceForTest)
require.NoError(t, err)

mockSe := buildMockSessionWithSQLServer(&testSQLServer{
stores: map[string]kv.Storage{},
})

mockTaskTable.EXPECT().GetTaskByID(gomock.Any(), task.ID).Return(task, nil)
mockTaskTable.EXPECT().WithNewSession(gomock.Any()).DoAndReturn(func(fn func(sessionctx.Context) error) error {
return fn(mockSe)
})
mockTaskTable.EXPECT().FailSubtask(gomock.Any(), "test", task.ID, gomock.Any()).DoAndReturn(
func(_ context.Context, _ string, _ int64, err error) error {
require.ErrorContains(t, err, "ks store not found")
return nil
},
)

require.False(t, m.startTaskExecutor(&task.TaskBase))
require.Equal(t, proto.NodeResourceForTest.TotalCPU, m.slotManager.availableSlots())
}

func TestManagerInitMeta(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
Loading
Loading