Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 3 additions & 9 deletions pkg/ddl/backfilling_dist_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,16 @@ func (s *backfillDistExecutor) newBackfillStepExecutor(
jobMeta := &s.taskMeta.Job
ddlObj := s.d

store := ddlObj.store
store := s.TaskStore
Comment thread
D3Hunter marked this conversation as resolved.
sessPool := ddlObj.sessPool
taskKS := s.task.Keyspace
if ddlObj.store.GetKeyspace() != taskKS {
Comment thread
D3Hunter marked this conversation as resolved.
var err error
err = s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error {
if err := s.GetTaskTable().WithNewSession(func(se sessionctx.Context) error {
svr := se.GetSQLServer()
store, err = svr.GetKSStore(taskKS)
if err != nil {
return err
}
sp, err := svr.GetKSSessPool(taskKS)
sessPool = sess.NewSessionPool(sp)
return err
})
if err != nil {
}); err != nil {
return nil, err
}
}
Expand Down
38 changes: 11 additions & 27 deletions pkg/ddl/backfilling_dist_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ import (
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/objstore/storeapi"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
"github.com/pingcap/tidb/pkg/store/helper"
"github.com/pingcap/tidb/pkg/table"
Expand Down Expand Up @@ -137,7 +136,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
}
job := &backfillMeta.Job
logger.Info("on next subtasks batch")
store, tbl, err := getUserStoreAndTable(ctx, sch.d, sch.d.store, task.Keyspace, job)
tbl, err := getUserTableFromTaskStore(ctx, sch.d, sch.TaskStore, job)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -147,7 +146,7 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
// TODO(tangenta): use available disk during adding index.
availableDisk := sch.nodeRes.GetTaskDiskResource(&task.TaskBase, vardef.DDLDiskQuota.Load())
logger.Info("available local disk space resource", zap.String("size", units.BytesSize(float64(availableDisk))))
return generateReadIndexPlan(ctx, sch.d, store, tbl, job, sch.GlobalSort, nodeCnt, logger)
return generateReadIndexPlan(ctx, sch.d, sch.TaskStore, tbl, job, sch.GlobalSort, nodeCnt, logger)
case proto.BackfillStepMergeSort:
metaBytes, err2 := generateMergeSortPlan(ctx, taskHandle, task, nodeCnt, backfillMeta.CloudStorageURI, logger)
if err2 != nil {
Expand All @@ -170,50 +169,35 @@ func (sch *LitBackfillScheduler) OnNextSubtasksBatch(
})
return generateGlobalSortIngestPlan(
ctx,
store.(kv.StorageWithPD),
sch.TaskStore.(kv.StorageWithPD),
taskHandle,
task,
backfillMeta.CloudStorageURI,
logger)
}
return nil, nil
case proto.BackfillStepMergeTempIndex:
return generateMergeTempIndexPlan(ctx, store, tbl, nodeCnt, backfillMeta.EleIDs, logger)
return generateMergeTempIndexPlan(ctx, sch.TaskStore, tbl, nodeCnt, backfillMeta.EleIDs, logger)
default:
return nil, nil
}
}

func getUserStoreAndTable(
func getUserTableFromTaskStore(
ctx context.Context,
d *ddl,
schStore kv.Storage,
taskKeyspace string,
taskStore kv.Storage,
job *model.Job,
) (kv.Storage, table.Table, error) {
store := schStore
if taskKeyspace != d.store.GetKeyspace() {
taskMgr, err := diststorage.GetTaskManager()
if err != nil {
return nil, nil, errors.Trace(err)
}
err = taskMgr.WithNewSession(func(se sessionctx.Context) error {
store, err = se.GetSQLServer().GetKSStore(taskKeyspace)
return err
})
if err != nil {
return nil, nil, err
}
}
tblInfo, err := getTblInfo(ctx, store, job)
) (table.Table, error) {
Comment thread
D3Hunter marked this conversation as resolved.
tblInfo, err := getTblInfo(ctx, taskStore, job)
if err != nil {
return nil, nil, err
return nil, err
}
tbl, err := getTable(d.ddlCtx.getAutoIDRequirement(), job.SchemaID, tblInfo)
if err != nil {
return nil, nil, err
return nil, err
}
return store, tbl, nil
return tbl, nil
}

// GetNextStep implements scheduler.Extension interface.
Expand Down
6 changes: 6 additions & 0 deletions pkg/ddl/backfilling_dist_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func TestBackfillingSchedulerLocalMode(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
sch, err := ddl.NewBackfillingSchedulerForTest(dom.DDL())
require.NoError(t, err)
sch.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{
Comment thread
D3Hunter marked this conversation as resolved.
Param: scheduler.Param{TaskStore: store},
}
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

Expand Down Expand Up @@ -172,6 +175,9 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) {
ext, err := ddl.NewBackfillingSchedulerForTest(dom.DDL())
require.NoError(t, err)
ext.(*ddl.LitBackfillScheduler).GlobalSort = true
ext.(*ddl.LitBackfillScheduler).BaseScheduler = &scheduler.BaseScheduler{
Param: scheduler.Param{TaskStore: store},
}
sch.Extension = ext

taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, "", 1, "", 0, proto.ExtraParams{}, task.Meta)
Expand Down
4 changes: 3 additions & 1 deletion pkg/dxf/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ type Param struct {
serverID string
allocatedSlots bool
nodeRes *proto.NodeResource
// store of the task, this store corresponds to the task keyspace in nextgen.
// TaskStore is the store for task.Keyspace. It equals the instance store in
// classic kernel mode or for SYSTEM-keyspace tasks; otherwise Manager resolves
// it from the task keyspace.
TaskStore kv.Storage
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/dxf/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,8 @@ func (s *BaseScheduler) Init() error {
if s.TaskStore.GetKeyspace() != s.GetTask().Keyspace {
// shouldn't happen normally, but since keyspace mismatch might cause
// correctness error, we check it at runtime too.
return errors.New("store keyspace mismatch with task")
return errors.Trace(fmt.Errorf("store keyspace mismatch with task: %s vs %s",
s.TaskStore.GetKeyspace(), s.GetTask().Keyspace))
}
return nil
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/dxf/framework/taskexecutor/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ go_test(
],
embed = [":taskexecutor"],
flaky = True,
shard_count = 14,
shard_count = 17,
deps = [
"//pkg/dxf/framework/mock",
"//pkg/dxf/framework/mock/execute",
Expand All @@ -65,10 +65,14 @@ go_test(
"//pkg/dxf/framework/taskexecutor/execute",
"//pkg/dxf/framework/testutil",
"//pkg/kv",
"//pkg/owner",
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/util",
"//pkg/util/logutil",
"//pkg/util/mock",
"@com_github_ngaut_pools//:pools",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//require",
Expand Down
16 changes: 15 additions & 1 deletion pkg/dxf/framework/taskexecutor/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/metrics"
litstorage "github.com/pingcap/tidb/pkg/objstore"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
tidbutil "github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/backoff"
Expand Down Expand Up @@ -314,6 +315,19 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b
zap.String("task-key", taskBase.Key), zap.Error(err))
return false
}

taskStore := m.store
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
D3Hunter marked this conversation as resolved.
if m.store.GetKeyspace() != task.Keyspace {
Comment thread
D3Hunter marked this conversation as resolved.
Comment thread
D3Hunter marked this conversation as resolved.
if err2 := m.taskTable.WithNewSession(func(se sessionctx.Context) error {
var err2 error
taskStore, err2 = se.GetSQLServer().GetKSStore(task.Keyspace)
return err2
}); err2 != nil {
m.logger.Warn("get task store failed", zap.Int64("task-id", task.ID),
zap.String("task-key", task.Key), zap.Error(err2))
return false
Comment thread
D3Hunter marked this conversation as resolved.
}
}
Comment thread
D3Hunter marked this conversation as resolved.
if !m.slotManager.alloc(&task.TaskBase) {
m.logger.Info("alloc slots failed, maybe other task executor alloc more slots at runtime",
zap.Int64("task-id", taskBase.ID), zap.String("task-key", taskBase.Key),
Expand All @@ -339,7 +353,7 @@ func (m *Manager) startTaskExecutor(taskBase *proto.TaskBase) (executorStarted b
slotMgr: m.slotManager,
nodeRc: m.getNodeResource(),
execID: m.id,
Store: m.store,
TaskStore: taskStore,
})
err = executor.Init(m.ctx)
if err != nil {
Expand Down
Loading
Loading