Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions br/tests/br_partial_index/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#!/bin/sh
#
# Copyright 2025 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eu
DB="$TEST_NAME"

run_sql "CREATE DATABASE $DB;"

run_sql "
USE $DB;

CREATE TABLE t0 (
id int primary key,
col1 int,
col2 int,
key idx_col1 (col1) where col2 > 10
);
INSERT INTO t0 VALUES (1, 1, 1);
INSERT INTO t0 VALUES (2, 2, 15);
INSERT INTO t0 VALUES (3, 3, 1);
INSERT INTO t0 VALUES (4, 4, 20);
INSERT INTO t0 VALUES (5, 5, 1);
"

# backup table
echo "backup start..."
run_br --pd $PD_ADDR backup db -s "local://$TEST_DIR/$DB" --db $DB

run_sql "DROP DATABASE $DB;"
run_sql "CREATE DATABASE $DB;"

# restore table
echo "restore start..."
run_br restore db --db $DB -s "local://$TEST_DIR/$DB" --pd $PD_ADDR

if run_sql "admin check table ${DB}.t0;" | grep -q 'inconsistency'; then
echo "TEST: [$TEST_NAME] failed after restoring $DB.t0"
exit 1
fi

run_sql "show create table $DB.t0;"
check_contains "WHERE \`col2\` > 10"

run_sql "DROP DATABASE $DB;"
2 changes: 1 addition & 1 deletion br/tests/run_group_br_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ groups=(
["G05"]='br_skip_checksum br_split_region_fail br_systables br_table_filter br_txn br_stats br_clustered_index br_crypter br_partition_add_index br_pitr_log_restore_backup_compatibility'
["G06"]='br_tikv_outage br_tikv_outage3 br_restore_checkpoint br_encryption br_pitr_online_table_filter'
["G07"]='br_pitr br_restore_physical'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict br_pitr_table_filter'
["G08"]='br_tikv_outage2 br_ttl br_views_and_sequences br_z_gc_safepoint br_autorandom br_file_corruption br_tiflash_conflict br_pitr_table_filter br_partial_index'
)

# Get other cases not in groups, to avoid missing any case
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -1621,6 +1621,11 @@ error = '''
Invalid storage class: %s
'''

["ddl:8272"]
error = '''
Cannot drop, change or modify column '%s': it is referenced in partial index '%s'
'''

["ddl:9014"]
error = '''
TiFlash backfill index failed: %s
Expand Down
120 changes: 101 additions & 19 deletions pkg/ddl/backfilling_operators.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/pingcap/tidb/pkg/ddl/copr"
"github.com/pingcap/tidb/pkg/ddl/ingest"
"github.com/pingcap/tidb/pkg/ddl/session"
distsqlctx "github.com/pingcap/tidb/pkg/distsql/context"
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
"github.com/pingcap/tidb/pkg/disttask/framework/taskexecutor/execute"
"github.com/pingcap/tidb/pkg/disttask/operator"
Expand All @@ -48,6 +49,7 @@ import (
"github.com/pingcap/tidb/pkg/table/tables"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/chunk"
contextutil "github.com/pingcap/tidb/pkg/util/context"
"github.com/pingcap/tidb/pkg/util/dbterror"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
Expand Down Expand Up @@ -144,11 +146,14 @@ func NewAddIndexIngestPipeline(
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
index := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo)
index, err := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo)
if err != nil {
return nil, err
}
indexes = append(indexes, index)
}
reqSrc := getDDLRequestSource(model.ActionAddIndex)
copCtx, err := NewReorgCopContext(store, reorgMeta, tbl.Meta(), idxInfos, reqSrc)
copCtx, err := NewReorgCopContext(reorgMeta, tbl.Meta(), idxInfos, reqSrc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -199,11 +204,14 @@ func NewWriteIndexToExternalStoragePipeline(
) (*operator.AsyncPipeline, error) {
indexes := make([]table.Index, 0, len(idxInfos))
for _, idxInfo := range idxInfos {
index := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo)
index, err := tables.NewIndex(tbl.GetPhysicalID(), tbl.Meta(), idxInfo)
if err != nil {
return nil, err
}
indexes = append(indexes, index)
}
reqSrc := getDDLRequestSource(model.ActionAddIndex)
copCtx, err := NewReorgCopContext(store, reorgMeta, tbl.Meta(), idxInfos, reqSrc)
copCtx, err := NewReorgCopContext(reorgMeta, tbl.Meta(), idxInfos, reqSrc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -291,6 +299,12 @@ type IndexRecordChunk struct {
Err error
Done bool
ctx *OperatorCtx
// tableScanRowCount is the number of rows scanned by the corresponding TableScanTask.
// If the index is a partial index, the number of rows in the Chunk may be less than tableScanRowCount.
tableScanRowCount int64
// conditionPushed records whether the index condition has been pushed down. If it's true, the ingest worker
// can skip running the checker in TiDB side.
conditionPushed bool
}

// RecoverArgs implements workerpool.TaskMayPanic interface.
Expand Down Expand Up @@ -473,6 +487,8 @@ func NewTableScanOperator(
cpOp ingest.CheckpointOperator,
collector execute.Collector,
) *TableScanOperator {
intest.AssertNotNil(reorgMeta)

totalCount := new(atomic.Int64)
pool := workerpool.NewWorkerPool(
"TableScanOperator",
Expand Down Expand Up @@ -543,6 +559,15 @@ func (w *tableScanWorker) Close() {
}
}

func (w *tableScanWorker) newDistSQLCtx() (*distsqlctx.DistSQLContext, error) {
warnHandler := contextutil.NewStaticWarnHandler(0)
return newReorgDistSQLCtxWithReorgMeta(
w.se.GetClient(),
w.reorgMeta,
warnHandler,
)
}

func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecordChunk)) {
logutil.Logger(w.ctx).Info("start a table scan task",
zap.Int("id", task.ID), zap.Stringer("task", task))
Expand All @@ -560,14 +585,26 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
failpoint.Return(errors.New("mock scan record error"))
})
failpoint.InjectCall("scanRecordExec", w.reorgMeta)
rs, err := buildTableScan(scanCtx, w.copCtx.GetBase(), startTS, task.Start, task.End)
selExpr, err := w.copCtx.GetCondition()
if err != nil {
return err
}

// create a new distsqlCtx for each task because the `distsqlCtx` contains `RuntimeStatsColl`, which
// will be modified during the execution.
distsqlCtx, err := w.newDistSQLCtx()
if err != nil {
return err
}
rs, conditionPushed, err := buildTableScan(scanCtx, w.copCtx.GetBase(), distsqlCtx, startTS, task.Start, task.End, selExpr)
if err != nil {
return err
}
if w.cpOp != nil {
w.cpOp.AddChunk(task.ID, task.End)
}
var done bool
var lastTableScanRowCount int64
for !done {
failpoint.InjectCall("beforeGetChunk")
srcChk := w.getChunk()
Expand All @@ -579,7 +616,10 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
}
w.collector.Accepted(execDetails.UnpackedBytesReceivedKVTotal)
execDetails = kvutil.ExecDetails{}
idxResults = append(idxResults, IndexRecordChunk{ID: task.ID, Chunk: srcChk, Done: done, ctx: w.ctx})

_, tableScanRowCount := distsqlCtx.RuntimeStatsColl.GetCopCountAndRows(tableScanCopID)
idxResults = append(idxResults, IndexRecordChunk{ID: task.ID, Chunk: srcChk, Done: done, ctx: w.ctx, tableScanRowCount: tableScanRowCount - lastTableScanRowCount, conditionPushed: conditionPushed})
lastTableScanRowCount = tableScanRowCount
}
return rs.Close()
})
Expand All @@ -588,12 +628,11 @@ func (w *tableScanWorker) scanRecords(task TableScanTask, sender func(IndexRecor
}
for i, idxResult := range idxResults {
sender(idxResult)
rowCnt := idxResult.Chunk.NumRows()
if w.cpOp != nil {
done := i == len(idxResults)-1
w.cpOp.UpdateChunk(task.ID, rowCnt, done)
w.cpOp.UpdateChunk(task.ID, int(idxResult.tableScanRowCount), done)
}
w.totalCount.Add(int64(rowCnt))
w.totalCount.Add(idxResult.tableScanRowCount)
}
}

Expand Down Expand Up @@ -666,7 +705,7 @@ func NewWriteExternalStoreOperator(
writers = append(writers, writer)
}

return &indexIngestWorker{
w := &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
Expand All @@ -678,6 +717,13 @@ func NewWriteExternalStoreOperator(
reorgMeta: reorgMeta,
totalCount: totalCount,
}
err := w.initIndexConditionCheckers()
if err != nil {
w.ctx.onError(err)
return nil
}

return w
})
return &WriteExternalStoreOperator{
AsyncOperator: operator.NewAsyncOperator(ctx, pool),
Expand Down Expand Up @@ -738,7 +784,7 @@ func NewIndexIngestOperator(
writers = append(writers, writer)
}

return &indexIngestWorker{
w := &indexIngestWorker{
ctx: ctx,
tbl: tbl,
indexes: indexes,
Expand All @@ -750,6 +796,13 @@ func NewIndexIngestOperator(
srcChunkPool: srcChunkPool,
reorgMeta: reorgMeta,
}
err := w.initIndexConditionCheckers()
if err != nil {
w.ctx.onError(err)
return nil
}

return w
})
return &IndexIngestOperator{
AsyncOperator: operator.NewAsyncOperator(ctx, pool),
Expand All @@ -759,9 +812,10 @@ func NewIndexIngestOperator(
type indexIngestWorker struct {
ctx *OperatorCtx

tbl table.PhysicalTable
indexes []table.Index
reorgMeta *model.DDLReorgMeta
tbl table.PhysicalTable
indexes []table.Index
reorgMeta *model.DDLReorgMeta
indexConditionCheckers []func(row chunk.Row) (bool, error)

copCtx copr.CopContext
sessPool opSessPool
Expand All @@ -786,19 +840,21 @@ func (w *indexIngestWorker) HandleTask(ck IndexRecordChunk, send func(IndexWrite
ID: ck.ID,
}
w.initSessCtx()
count, bytes, err := w.WriteChunk(&ck)
// TODO: find a place to display the added count
_, bytes, err := w.WriteChunk(&ck)
if err != nil {
w.ctx.onError(err)
return
}
if count == 0 {
scannedCount := ck.tableScanRowCount
if scannedCount == 0 {
logutil.Logger(w.ctx).Info("finish a index ingest task", zap.Int("id", ck.ID))
return
}
if w.totalCount != nil {
w.totalCount.Add(int64(count))
w.totalCount.Add(scannedCount)
}
result.RowCnt = count
result.RowCnt = int(ck.tableScanRowCount)
result.Bytes = bytes
if ResultCounterForTest != nil {
ResultCounterForTest.Add(1)
Expand All @@ -822,6 +878,25 @@ func (w *indexIngestWorker) initSessCtx() {
}
}

func (w *indexIngestWorker) initIndexConditionCheckers() error {
if w.indexConditionCheckers != nil {
return nil
}

w.indexConditionCheckers = make([]func(row chunk.Row) (bool, error), len(w.indexes))
var err error
for i, index := range w.indexes {
if index.Meta().HasCondition() {
w.indexConditionCheckers[i], err = buildIndexConditionChecker(w.copCtx, w.tbl.Meta(), index.Meta())
if err != nil {
return err
}
}
}

return nil
}

func (w *indexIngestWorker) Close() {
// TODO(lance6716): unify the real write action for engineInfo and external
// writer.
Expand Down Expand Up @@ -852,7 +927,14 @@ func (w *indexIngestWorker) WriteChunk(rs *IndexRecordChunk) (count int, bytes i
oprStartTime := time.Now()
vars := w.se.GetSessionVars() //nolint:forbidigo
sc := vars.StmtCtx
cnt, kvBytes, err := writeChunk(w.ctx, w.writers, w.indexes, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk, w.tbl.Meta())

indexConditionCheckers := w.indexConditionCheckers
if rs.conditionPushed && len(w.indexes) == 1 {
// If the index condition has been pushed down to tikv side, and there's only one index, we can
// skip running the checker in TiDB side.
indexConditionCheckers = nil
}
cnt, kvBytes, err := writeChunk(w.ctx, w.writers, w.indexes, indexConditionCheckers, w.copCtx, sc.TimeZone(), sc.ErrCtx(), vars.GetWriteStmtBufs(), rs.Chunk, w.tbl.Meta())
if err != nil || cnt == 0 {
return 0, 0, err
}
Expand Down
Loading