diff --git a/pkg/ddl/create_table.go b/pkg/ddl/create_table.go index 6fdf2cddd7594..77b148c29852a 100644 --- a/pkg/ddl/create_table.go +++ b/pkg/ddl/create_table.go @@ -298,12 +298,16 @@ func (w *worker) onCreateMaterializedViewLog(jobCtx *jobContext, job *model.Job) } if baseTblInfo.MaterializedViewBase != nil && baseTblInfo.MaterializedViewBase.MLogID != 0 { job.State = model.JobStateCancelled - return ver, infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: pmodel.NewCIStr(job.SchemaName), Name: mlogTblInfo.Name}) + return ver, ErrMLogAlreadyExists.GenWithStackByArgs(ast.Ident{Schema: pmodel.NewCIStr(job.SchemaName), Name: baseTblInfo.Name}) } mlogTblInfo.State = model.StateNone mlogTblInfo, err = createTable(jobCtx, job, &model.CreateTableArgs{TableInfo: mlogTblInfo, FKCheck: false}) if err != nil { + if infoschema.ErrTableExists.Equal(err) || meta.ErrTableExists.Equal(err) { + job.State = model.JobStateCancelled + return ver, ErrMLogTableNameConflict.GenWithStackByArgs(ast.Ident{Schema: pmodel.NewCIStr(job.SchemaName), Name: mlogTblInfo.Name}) + } return ver, errors.Trace(err) } diff --git a/pkg/ddl/ddl_test.go b/pkg/ddl/ddl_test.go index bd0e8002e3b44..805580c2427cd 100644 --- a/pkg/ddl/ddl_test.go +++ b/pkg/ddl/ddl_test.go @@ -18,9 +18,11 @@ import ( "context" "encoding/json" "fmt" + "math" "strings" "testing" "time" + "unicode/utf8" "github.com/pingcap/tidb/pkg/ddl/testargsv1" "github.com/pingcap/tidb/pkg/domain/infosync" @@ -174,6 +176,125 @@ func TestBuildCreateMaterializedViewLogPurgeInfoUpsertSQL(t *testing.T) { require.Contains(t, sqlWithNull, ", NULL)") } +func TestBuildMaterializedViewLogTableName(t *testing.T) { + oldMLogSeq := MLogTableNameSeq.Load() + oldShortSeq := MLogShortTableNameSeq.Load() + t.Cleanup(func() { + MLogTableNameSeq.Store(oldMLogSeq) + MLogShortTableNameSeq.Store(oldShortSeq) + }) + + const baseTableName = "base_table_name" + longPrefixedBaseTableName := materializedViewLogTablePrefix + strings.Repeat("t", mysql.MaxTableNameLength-len(materializedViewLogTablePrefix)) + longBaseTableName := strings.Repeat("t", mysql.MaxTableNameLength-len(materializedViewLogTablePrefix)+1) + + tests := []struct { + name string + baseTableName string + existingTables []string + expectedName string + expectedMLogSeq uint64 + expectedShortSeq uint64 + }{ + { + name: "prefixed base name is renamed", + baseTableName: materializedViewLogTablePrefix + baseTableName, + expectedName: materializedViewLogTablePrefix + "1" + baseTableName, + expectedMLogSeq: 1, + }, + { + name: "prefixed base name retries until renamed name does not conflict", + baseTableName: materializedViewLogTablePrefix + baseTableName, + existingTables: []string{ + materializedViewLogTablePrefix + "1" + baseTableName, + materializedViewLogTablePrefix + "2" + baseTableName, + }, + expectedName: materializedViewLogTablePrefix + "3" + baseTableName, + expectedMLogSeq: 3, + }, + { + name: "prefixed base name falls back to short name when renamed name is too long", + baseTableName: longPrefixedBaseTableName, + expectedName: materializedViewLogTablePrefix + "1", + expectedMLogSeq: 1, + expectedShortSeq: 1, + }, + { + name: "prefixed base name falls back to short name and retries conflict", + baseTableName: longPrefixedBaseTableName, + existingTables: []string{ + materializedViewLogTablePrefix + "1", + materializedViewLogTablePrefix + "2", + }, + expectedName: materializedViewLogTablePrefix + "3", + expectedMLogSeq: 1, + expectedShortSeq: 3, + }, + { + name: "regular base name falls back to short name when mlog name is too long", + baseTableName: longBaseTableName, + expectedName: materializedViewLogTablePrefix + "1", + expectedShortSeq: 1, + }, + { + name: "regular base name falls back to short name and retries conflict after mlog name is too long", + baseTableName: longBaseTableName, + existingTables: []string{ + materializedViewLogTablePrefix + "1", + materializedViewLogTablePrefix + "2", + }, + expectedName: materializedViewLogTablePrefix + "3", + expectedShortSeq: 3, + }, + { + name: "regular base name falls back to short name when mlog name conflicts", + baseTableName: baseTableName, + existingTables: []string{ + materializedViewLogTablePrefix + baseTableName, + }, + expectedName: materializedViewLogTablePrefix + "1", + expectedShortSeq: 1, + }, + { + name: "regular base name falls back to short name and retries conflict after mlog name conflicts", + baseTableName: baseTableName, + existingTables: []string{ + materializedViewLogTablePrefix + baseTableName, + materializedViewLogTablePrefix + "1", + materializedViewLogTablePrefix + "2", + }, + expectedName: materializedViewLogTablePrefix + "3", + expectedShortSeq: 3, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + MLogTableNameSeq.Store(0) + MLogShortTableNameSeq.Store(0) + exists := make(map[string]struct{}, len(tt.existingTables)) + for _, tableName := range tt.existingTables { + exists[strings.ToLower(tableName)] = struct{}{} + } + tableExists := func(name pmodel.CIStr) (bool, error) { + _, ok := exists[name.L] + return ok, nil + } + + name, err := GenerateMLogTableName(pmodel.NewCIStr(tt.baseTableName), tableExists) + require.NoError(t, err) + require.Equal(t, tt.expectedName, name) + require.LessOrEqual(t, utf8.RuneCountInString(strings.ToLower(name)), mysql.MaxTableNameLength) + require.Equal(t, tt.expectedMLogSeq, MLogTableNameSeq.Load()) + require.Equal(t, tt.expectedShortSeq, MLogShortTableNameSeq.Load()) + }) + } + + MLogTableNameSeq.Store(math.MaxUint64) + _, err := nextMLogTableNameNumber(&MLogTableNameSeq, "out of range") + require.ErrorContains(t, err, "out of range") +} + func TestBuildMViewRefreshOutOfPlaceCutoverInvolvingSchemaInfo(t *testing.T) { const ( oldMViewID int64 = 101 diff --git a/pkg/ddl/executor.go b/pkg/ddl/executor.go index 18c248882e7b4..1b54d4d39f9c8 100644 --- a/pkg/ddl/executor.go +++ b/pkg/ddl/executor.go @@ -109,8 +109,9 @@ type Executor interface { DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error CreateMaterializedView(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewStmt) error - CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt) error + CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt, mlogTableName string) error CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error + GenerateMLogTableName(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) (string, error) DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error) DropMaterializedView(ctx sessionctx.Context, stmt *ast.DropMaterializedViewStmt) error DropMaterializedViewLog(ctx sessionctx.Context, stmt *ast.DropMaterializedViewLogStmt) error @@ -1063,7 +1064,40 @@ func (e *executor) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) ( return e.CreateTableWithInfo(ctx, schema.Name, tbInfo, involvingRef, WithOnExist(onExist)) } -func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) error { +// GenerateMLogTableName implements the DDL interface. +func (e *executor) GenerateMLogTableName(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) (string, error) { + is := e.infoCache.GetLatest() + schemaName := s.Table.Schema + if schemaName.O == "" { + if ctx.GetSessionVars().CurrentDB == "" { + return "", errors.Trace(plannererrors.ErrNoDB) + } + schemaName = pmodel.NewCIStr(ctx.GetSessionVars().CurrentDB) + } + + baseTable, err := is.TableByName(e.ctx, schemaName, s.Table.Name) + if err != nil { + return "", err + } + if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone { + return "", dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") + } + if baseInfo := baseTable.Meta().MaterializedViewBase; baseInfo != nil && baseInfo.MLogID != 0 { + return "", ErrMLogAlreadyExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: baseTable.Meta().Name}) + } + + mlogName, err := GenerateMLogTableName( + baseTable.Meta().Name, + getExistenceOfMLogTableNameChecker(e.ctx, is, schemaName), + ) + if err != nil { + return "", err + } + + return mlogName, nil +} + +func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt, mlogTableName string) error { is := e.infoCache.GetLatest() schemaName := s.Table.Schema if schemaName.O == "" { @@ -1085,18 +1119,8 @@ func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.Crea if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone { return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") } - - mlogName := "$mlog$" + baseTable.Meta().Name.O - mlogNameCIStr := pmodel.NewCIStr(mlogName) - if err := checkTooLongTable(mlogNameCIStr); err != nil { - return err - } - _, err = is.TableByName(e.ctx, schemaName, mlogNameCIStr) - if err == nil { - return infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: mlogNameCIStr}) - } - if !infoschema.ErrTableNotExists.Equal(err) { - return err + if baseInfo := baseTable.Meta().MaterializedViewBase; baseInfo != nil && baseInfo.MLogID != 0 { + return ErrMLogAlreadyExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: baseTable.Meta().Name}) } colMap := make(map[string]*model.ColumnInfo, len(baseTable.Meta().Columns)) @@ -1147,7 +1171,7 @@ func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.Crea } createTableStmt := &ast.CreateTableStmt{ - Table: &ast.TableName{Schema: schemaName, Name: mlogNameCIStr}, + Table: &ast.TableName{Schema: schemaName, Name: pmodel.NewCIStr(mlogTableName)}, Cols: colDefs, Options: s.Options, } diff --git a/pkg/ddl/materialized_view.go b/pkg/ddl/materialized_view.go index 35d12bd13404f..b7c81b3dc1461 100644 --- a/pkg/ddl/materialized_view.go +++ b/pkg/ddl/materialized_view.go @@ -17,8 +17,11 @@ package ddl import ( "context" "fmt" + "math" "strconv" "strings" + "sync/atomic" + "unicode/utf8" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -33,10 +36,12 @@ import ( "github.com/pingcap/tidb/pkg/parser/format" pmodel "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" + "github.com/pingcap/tidb/pkg/parser/terror" mvmerge "github.com/pingcap/tidb/pkg/planner/mview" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/variable" storeerr "github.com/pingcap/tidb/pkg/store/driver/error" + "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" driver "github.com/pingcap/tidb/pkg/types/parser_driver" "github.com/pingcap/tidb/pkg/util/dbterror" @@ -47,12 +52,191 @@ import ( ) const ( + materializedViewLogTablePrefix = "$mlog$" mviewAttrAlertWarning = "mview_alert_warning" mviewAttrAlertOverdue = "mview_alert_overdue" mviewAttrAlertRefreshFailed = "mview_alert_refresh_failed" alterMaterializedScheduleInfoUpdateLockWaitTimeoutSec = int64(10) ) +var ( + // MLogTableNameSeq allocates numeric components for mlog names derived from + // base tables whose names already start with "$mlog$". + MLogTableNameSeq atomic.Uint64 + // MLogShortTableNameSeq allocates numeric components for shortened mlog names + // used when the derived name exceeds TiDB's table-name length limit. + MLogShortTableNameSeq atomic.Uint64 + // ErrMLogAlreadyExists is returned when the base table already has an mlog. + ErrMLogAlreadyExists = dbterror.ClassSchema.NewStdErr( + mysql.ErrTableExists, + mysql.Message("materialized view log for base table '%-.192s' already exists", nil), + ) + // ErrMLogTableNameConflict is returned when a generated mlog table name is + // occupied by a concurrent DDL. It is an internal retry signal for executor. + ErrMLogTableNameConflict = dbterror.ClassDDL.NewStdErr( + mysql.ErrTableExists, + mysql.Message("generated materialized view log table name '%-.192s' already exists", nil), + ) +) + +// IsMLogTableNameConflict reports whether err is a generated mlog table-name conflict. +func IsMLogTableNameConflict(err error) bool { + return terror.ErrorEqual(err, ErrMLogTableNameConflict) +} + +// GenerateMLogTableName generates an available mlog table name for the base table. +func GenerateMLogTableName( + baseTableName pmodel.CIStr, + checkTableExistence func(pmodel.CIStr) (bool, error), +) (string, error) { + if checkTableExistence == nil { + return "", errors.New("materialized view log table name exists checker is nil") + } + + var candidate pmodel.CIStr + if strings.HasPrefix(baseTableName.L, materializedViewLogTablePrefix) { + suffix := baseTableName.O[len(materializedViewLogTablePrefix):] + for { + var err error + number, err := nextMLogTableNameNumber( + &MLogTableNameSeq, + "materialized view log table name number is out of range", + ) + if err != nil { + return "", err + } + candidate = pmodel.NewCIStr(materializedViewLogTablePrefix + strconv.FormatUint(number, 10) + suffix) + exists, err := checkTableExistence(candidate) + if err != nil { + return "", err + } + if !exists { + break + } + } + } else { + candidate = pmodel.NewCIStr(materializedViewLogTablePrefix + baseTableName.O) + } + + exists, err := checkTableExistence(candidate) + if err != nil { + return "", err + } + + if !exists && utf8.RuneCountInString(candidate.L) <= mysql.MaxTableNameLength { + return candidate.O, nil + } + + for { + var err error + number, err := nextMLogTableNameNumber( + &MLogShortTableNameSeq, + "materialized view log short table name number is out of range", + ) + if err != nil { + return "", err + } + candidate = pmodel.NewCIStr(materializedViewLogTablePrefix + strconv.FormatUint(number, 10)) + if utf8.RuneCountInString(candidate.L) > mysql.MaxTableNameLength { + return "", dbterror.ErrTooLongIdent.GenWithStackByArgs(candidate) + } + exists, err := checkTableExistence(candidate) + if err != nil { + return "", err + } + if !exists { + return candidate.O, nil + } + } +} + +func nextMLogTableNameNumber(counter *atomic.Uint64, outOfRangeErr string) (uint64, error) { + if counter.Load() == math.MaxUint64 { + return 0, errors.New(outOfRangeErr) + } + next := counter.Add(1) + if next == 0 { + return 0, errors.New(outOfRangeErr) + } + return next, nil +} + +func getExistenceOfMLogTableNameChecker( + ctx context.Context, + is infoschema.InfoSchema, + schemaName pmodel.CIStr, +) func(pmodel.CIStr) (bool, error) { + return func(tableName pmodel.CIStr) (bool, error) { + _, err := is.TableByName(ctx, schemaName, tableName) + if err == nil { + return true, nil + } + if infoschema.ErrTableNotExists.Equal(err) { + return false, nil + } + return false, err + } +} + +// GetExistenceOfMLogTableNameChecker returns a checker for whether an mlog table name already exists. +func GetExistenceOfMLogTableNameChecker( + ctx context.Context, + is infoschema.InfoSchema, + schemaName pmodel.CIStr, +) func(pmodel.CIStr) (bool, error) { + return getExistenceOfMLogTableNameChecker(ctx, is, schemaName) +} + +// GetMLogTableByBaseTable returns the mlog table recorded on the base table metadata. +func GetMLogTableByBaseTable( + ctx context.Context, + is infoschema.InfoSchema, + schemaName pmodel.CIStr, + baseTableMeta *model.TableInfo, +) (table.Table, error) { + return getMLogTableByBaseTable(ctx, is, schemaName, baseTableMeta) +} + +func getMLogTableByBaseTable( + ctx context.Context, + is infoschema.InfoSchema, + schemaName pmodel.CIStr, + baseTableMeta *model.TableInfo, +) (table.Table, error) { + if baseTableMeta == nil { + return nil, errors.New("materialized view log: invalid base table metadata") + } + if baseTableMeta.MaterializedViewBase == nil || baseTableMeta.MaterializedViewBase.MLogID == 0 { + return nil, errors.Errorf( + "materialized view log does not exist for base table %s.%s", + schemaName.O, + baseTableMeta.Name.O, + ) + } + + mlogID := baseTableMeta.MaterializedViewBase.MLogID + mlogTable, ok := is.TableByID(ctx, mlogID) + if !ok { + return nil, errors.Errorf( + "materialized view log does not exist for base table %s.%s", + schemaName.O, + baseTableMeta.Name.O, + ) + } + + mlogInfo := mlogTable.Meta().MaterializedViewLog + if mlogInfo == nil || mlogInfo.BaseTableID != baseTableMeta.ID { + return nil, errors.Errorf( + "table %s.%s is not a materialized view log for base table %s.%s", + schemaName.O, + mlogTable.Meta().Name.O, + schemaName.O, + baseTableMeta.Name.O, + ) + } + return mlogTable, nil +} + // ApplyMViewExecutionSessionVars applies MV execution vars onto a session and returns a restore closure. func ApplyMViewExecutionSessionVars(sessVars *variable.SessionVars, target variable.MViewExecutionSessionVars) (func(), error) { return applyMViewExecutionSessionVars(sessVars, target, false) @@ -235,17 +419,10 @@ func (e *executor) CreateMaterializedView(ctx sessionctx.Context, s *ast.CreateM } baseTableID := baseTable.Meta().ID - mlogName := pmodel.NewCIStr("$mlog$" + baseTable.Meta().Name.O) - mlogTable, err := is.TableByName(e.ctx, baseTableName.Schema, mlogName) + mlogTable, err := getMLogTableByBaseTable(e.ctx, is, baseTableName.Schema, baseTable.Meta()) if err != nil { - if infoschema.ErrTableNotExists.Equal(err) { - return errors.Errorf("materialized view log does not exist for base table %s.%s", baseTableName.Schema.O, baseTableName.Name.O) - } return err } - if mlogTable.Meta().MaterializedViewLog == nil || mlogTable.Meta().MaterializedViewLog.BaseTableID != baseTableID { - return errors.Errorf("table %s.%s is not a materialized view log for base table %s.%s", baseTableName.Schema.O, mlogName.O, baseTableName.Schema.O, baseTableName.Name.O) - } // Validate Stage-1 query contract and ensure MV LOG columns cover query references. groupByInfos, err := validateCreateMaterializedViewQuery( @@ -433,16 +610,12 @@ func (e *executor) DropMaterializedViewLog(ctx sessionctx.Context, s *ast.DropMa if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone { return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") } - baseTableID := baseTable.Meta().ID - mlogName := pmodel.NewCIStr("$mlog$" + baseTable.Meta().Name.O) - mlogTable, err := is.TableByName(e.ctx, schemaName, mlogName) + mlogTable, err := getMLogTableByBaseTable(e.ctx, is, schemaName, baseTable.Meta()) if err != nil { return err } - if mlogTable.Meta().MaterializedViewLog == nil || mlogTable.Meta().MaterializedViewLog.BaseTableID != baseTableID { - return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName.O, mlogName, "MATERIALIZED VIEW LOG") - } + mlogName := mlogTable.Meta().Name // MV LOG cannot be dropped while any MV still depends on the base table. if hasMaterializedViewDependsOnBaseTable(baseTable.Meta()) { @@ -561,16 +734,12 @@ func (e *executor) AlterMaterializedViewLog(ctx sessionctx.Context, s *ast.Alter if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone { return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") } - baseTableID := baseTable.Meta().ID - mlogName := pmodel.NewCIStr("$mlog$" + baseTable.Meta().Name.O) - mlogTable, err := is.TableByName(e.ctx, schemaName, mlogName) + mlogTable, err := getMLogTableByBaseTable(e.ctx, is, schemaName, baseTable.Meta()) if err != nil { return err } - if mlogTable.Meta().MaterializedViewLog == nil || mlogTable.Meta().MaterializedViewLog.BaseTableID != baseTableID { - return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName.O, mlogName, "MATERIALIZED VIEW LOG") - } + mlogName := mlogTable.Meta().Name for _, action := range s.Actions { switch action.Tp { diff --git a/pkg/ddl/schematracker/BUILD.bazel b/pkg/ddl/schematracker/BUILD.bazel index 3eb7d103f1369..e3aa1080ac61c 100644 --- a/pkg/ddl/schematracker/BUILD.bazel +++ b/pkg/ddl/schematracker/BUILD.bazel @@ -49,7 +49,7 @@ go_test( ], embed = [":schematracker"], flaky = True, - shard_count = 17, + shard_count = 18, deps = [ "//pkg/executor", "//pkg/infoschema", diff --git a/pkg/ddl/schematracker/checker.go b/pkg/ddl/schematracker/checker.go index 8cfdec696415e..ff98be63d241e 100644 --- a/pkg/ddl/schematracker/checker.go +++ b/pkg/ddl/schematracker/checker.go @@ -276,14 +276,19 @@ func (d *Checker) CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) e return nil } +// GenerateMLogTableName implements the DDL interface. +func (d *Checker) GenerateMLogTableName(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) (string, error) { + return d.realExecutor.GenerateMLogTableName(ctx, s) +} + // CreateMaterializedViewLog implements the DDL interface. -func (d *Checker) CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt) error { - err := d.realExecutor.CreateMaterializedViewLog(ctx, stmt) +func (d *Checker) CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt, mlogTableName string) error { + err := d.realExecutor.CreateMaterializedViewLog(ctx, stmt, mlogTableName) if err != nil || d.closed.Load() { return err } - err = d.tracker.CreateMaterializedViewLog(ctx, stmt) + err = d.tracker.CreateMaterializedViewLog(ctx, stmt, mlogTableName) if err != nil { panic(err) } @@ -292,7 +297,7 @@ func (d *Checker) CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.Cr if schemaName.O == "" { schemaName = pmodel.NewCIStr(ctx.GetSessionVars().CurrentDB) } - d.checkTableInfo(ctx, schemaName, pmodel.NewCIStr("$mlog$"+stmt.Table.Name.O)) + d.checkTableInfo(ctx, schemaName, pmodel.NewCIStr(mlogTableName)) d.checkTableInfo(ctx, schemaName, stmt.Table.Name) return nil } diff --git a/pkg/ddl/schematracker/dm_tracker.go b/pkg/ddl/schematracker/dm_tracker.go index 259100c6404ad..4bc1145efa8d9 100644 --- a/pkg/ddl/schematracker/dm_tracker.go +++ b/pkg/ddl/schematracker/dm_tracker.go @@ -21,7 +21,6 @@ package schematracker import ( "context" "strings" - "unicode/utf8" "github.com/pingcap/errors" "github.com/pingcap/tidb/pkg/ddl" @@ -231,8 +230,13 @@ func (d *SchemaTracker) CreateTable(ctx sessionctx.Context, s *ast.CreateTableSt return d.CreateTableWithInfo(ctx, schema.Name, tbInfo, nil, ddl.WithOnExist(onExist)) } +// GenerateMLogTableName implements the DDL interface. +func (*SchemaTracker) GenerateMLogTableName(sessionctx.Context, *ast.CreateMaterializedViewLogStmt) (string, error) { + return "", errors.New("not implemented") +} + // CreateMaterializedViewLog implements the DDL interface. -func (d *SchemaTracker) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) error { +func (d *SchemaTracker) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt, mlogTableName string) error { schemaName := s.Table.Schema if schemaName.O == "" { if ctx == nil || ctx.GetSessionVars().CurrentDB == "" { @@ -245,6 +249,21 @@ func (d *SchemaTracker) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast return infoschema.ErrDatabaseNotExists.GenWithStackByArgs(schemaName) } + mlogNameCIStr := pmodel.NewCIStr(mlogTableName) + mlogExists, err := ddl.GetExistenceOfMLogTableNameChecker( + context.Background(), + InfoStoreAdaptor{inner: d.InfoStore}, + schemaName, + )(mlogNameCIStr) + + if err != nil { + return err + } + + if mlogExists { + return infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: mlogNameCIStr}) + } + baseTable, err := d.TableClonedByName(schemaName, s.Table.Name) if err != nil { return err @@ -252,16 +271,8 @@ func (d *SchemaTracker) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast if baseTable.IsView() || baseTable.IsSequence() || baseTable.TempTableType != model.TempTableNone { return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") } - - mlogName := "$mlog$" + baseTable.Name.O - mlogNameCIStr := pmodel.NewCIStr(mlogName) - if utf8.RuneCountInString(mlogNameCIStr.L) > mysql.MaxTableNameLength { - return dbterror.ErrTooLongIdent.GenWithStackByArgs(mlogNameCIStr) - } - if _, err := d.TableByName(context.Background(), schemaName, mlogNameCIStr); err == nil { - return infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: mlogNameCIStr}) - } else if !infoschema.ErrTableNotExists.Equal(err) { - return err + if baseTable.MaterializedViewBase != nil && baseTable.MaterializedViewBase.MLogID != 0 { + return ddl.ErrMLogAlreadyExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: baseTable.Name}) } colMap := make(map[string]*model.ColumnInfo, len(baseTable.Columns)) diff --git a/pkg/ddl/schematracker/dm_tracker_test.go b/pkg/ddl/schematracker/dm_tracker_test.go index edb11894e03a7..45bb9e86f1335 100644 --- a/pkg/ddl/schematracker/dm_tracker_test.go +++ b/pkg/ddl/schematracker/dm_tracker_test.go @@ -570,9 +570,24 @@ func TestCreateMaterializedViewLogScheduleExprTypeCheck(t *testing.T) { return stmt.(*ast.CreateMaterializedViewLogStmt) } - err := tracker.CreateMaterializedViewLog(sctx, parseStmt("create materialized view log on test.t (a) purge start with 1 next date_add(now(), interval 1 hour)")) + err := tracker.CreateMaterializedViewLog(sctx, parseStmt("create materialized view log on test.t (a) purge start with 1 next date_add(now(), interval 1 hour)"), "test_mlog") require.ErrorContains(t, err, "PURGE START WITH expression must return DATETIME/TIMESTAMP") - err = tracker.CreateMaterializedViewLog(sctx, parseStmt("create materialized view log on test.t (a) purge start with now() next 1")) + err = tracker.CreateMaterializedViewLog(sctx, parseStmt("create materialized view log on test.t (a) purge start with now() next 1"), "test_mlog") require.ErrorContains(t, err, "PURGE NEXT expression must return DATETIME/TIMESTAMP") } + +func TestCreateMaterializedViewLogRejectExistingMLogName(t *testing.T) { + tracker := schematracker.NewSchemaTracker(2) + tracker.CreateTestDB(nil) + execCreate(t, tracker, "create table test.t (a int)") + execCreate(t, tracker, "create table test.test_mlog (a int)") + + sctx := mock.NewContext() + p := parser.New() + stmt, err := p.ParseOneStmt("create materialized view log on test.t (a)", "", "") + require.NoError(t, err) + + err = tracker.CreateMaterializedViewLog(sctx, stmt.(*ast.CreateMaterializedViewLogStmt), "test_mlog") + require.True(t, infoschema.ErrTableExists.Equal(err), err) +} diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 061d21e8270ed..b53bc3139941d 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -334,6 +334,7 @@ go_test( "cluster_table_test.go", "compact_table_test.go", "copr_cache_test.go", + "ddl_test.go", "delete_test.go", "detach_integration_test.go", "detach_test.go", diff --git a/pkg/executor/ddl.go b/pkg/executor/ddl.go index ce4c40c6e7116..fc63434fca9c1 100644 --- a/pkg/executor/ddl.go +++ b/pkg/executor/ddl.go @@ -18,8 +18,10 @@ import ( "context" "fmt" "strings" + "time" "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/domain" @@ -59,6 +61,31 @@ type DDLExec struct { done bool } +const ( + maxCreateMLogNameConflictRetries = 8 + createMLogNameConflictBackoff = time.Millisecond + maxCreateMLogNameConflictBackoff = 32 * time.Millisecond +) + +var waitCreateMLogNameConflictRetry = func(ctx context.Context, retry int) error { + delay := createMLogNameConflictBackoff + for i := 0; i < retry && delay < maxCreateMLogNameConflictBackoff; i++ { + delay *= 2 + if delay > maxCreateMLogNameConflictBackoff { + delay = maxCreateMLogNameConflictBackoff + } + } + + timer := time.NewTimer(delay) + defer timer.Stop() + select { + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return nil + } +} + // toErr converts the error to the ErrInfoSchemaChanged when the schema is outdated. func (e *DDLExec) toErr(err error) error { // The err may be cause by schema changed, here we distinguish the ErrInfoSchemaChanged error from other errors. @@ -166,7 +193,7 @@ func (e *DDLExec) Next(ctx context.Context, _ *chunk.Chunk) (err error) { case *ast.CreateMaterializedViewStmt: err = e.ddlExecutor.CreateMaterializedView(e.Ctx(), x) case *ast.CreateMaterializedViewLogStmt: - err = e.ddlExecutor.CreateMaterializedViewLog(e.Ctx(), x) + err = e.executeCreateMaterializedViewLog(ctx, x) case *ast.AlterMaterializedViewStmt: err = e.ddlExecutor.AlterMaterializedView(e.Ctx(), x) case *ast.AlterMaterializedViewLogStmt: @@ -330,6 +357,33 @@ func (e *DDLExec) executeCreateView(ctx context.Context, s *ast.CreateViewStmt) return e.ddlExecutor.CreateView(e.Ctx(), s) } +func (e *DDLExec) executeCreateMaterializedViewLog(ctx context.Context, stmt *ast.CreateMaterializedViewLogStmt) error { + retries := 0 + for { + mlogTableName, err := e.ddlExecutor.GenerateMLogTableName(e.Ctx(), stmt) + if err != nil { + return err + } + + failpoint.InjectCall("afterCreateMaterializedViewLogNameGenerated", mlogTableName) + err = e.ddlExecutor.CreateMaterializedViewLog(e.Ctx(), stmt, mlogTableName) + if err != nil { + if ddl.IsMLogTableNameConflict(err) { + if retries >= maxCreateMLogNameConflictRetries { + return err + } + if backoffErr := waitCreateMLogNameConflictRetry(ctx, retries); backoffErr != nil { + return backoffErr + } + retries++ + continue + } + return err + } + return nil + } +} + func (e *DDLExec) executeCreateIndex(s *ast.CreateIndexStmt) error { if _, ok := e.getLocalTemporaryTable(s.Table.Schema, s.Table.Name); ok { return dbterror.ErrUnsupportedLocalTempTableDDL.GenWithStackByArgs("CREATE INDEX") diff --git a/pkg/executor/ddl_test.go b/pkg/executor/ddl_test.go new file mode 100644 index 0000000000000..b0c9a9d418fb4 --- /dev/null +++ b/pkg/executor/ddl_test.go @@ -0,0 +1,155 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package executor + +import ( + "context" + "errors" + "testing" + + "github.com/pingcap/tidb/pkg/ddl" + execpkg "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/infoschema" + "github.com/pingcap/tidb/pkg/parser/ast" + "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util/mock" + "github.com/stretchr/testify/require" +) + +type mockMLogDDLExecutor struct { + ddl.Executor + + generatedNames []string + createErrs []error + + generateCalls int + createNames []string +} + +func (m *mockMLogDDLExecutor) GenerateMLogTableName(sessionctx.Context, *ast.CreateMaterializedViewLogStmt) (string, error) { + if m.generateCalls >= len(m.generatedNames) { + return "", errors.New("unexpected GenerateMLogTableName call") + } + name := m.generatedNames[m.generateCalls] + m.generateCalls++ + return name, nil +} + +func (m *mockMLogDDLExecutor) CreateMaterializedViewLog(_ sessionctx.Context, _ *ast.CreateMaterializedViewLogStmt, mlogTableName string) error { + m.createNames = append(m.createNames, mlogTableName) + idx := len(m.createNames) - 1 + if idx >= len(m.createErrs) { + return nil + } + return m.createErrs[idx] +} + +func TestDDLExecExecuteCreateMaterializedViewLog(t *testing.T) { + tableExistsErr := infoschema.ErrTableExists.GenWithStackByArgs("mlog") + mlogAlreadyExistsErr := ddl.ErrMLogAlreadyExists.GenWithStackByArgs("test.t") + tableNameConflictErr := ddl.ErrMLogTableNameConflict.GenWithStackByArgs("mlog") + otherErr := errors.New("non table exists error") + oldBackoff := waitCreateMLogNameConflictRetry + waitCreateMLogNameConflictRetry = func(context.Context, int) error { return nil } + defer func() { + waitCreateMLogNameConflictRetry = oldBackoff + }() + + repeatNames := func(name string, n int) []string { + names := make([]string, n) + for i := range names { + names[i] = name + } + return names + } + repeatErrs := func(err error, n int) []error { + errs := make([]error, n) + for i := range errs { + errs[i] = err + } + return errs + } + + tests := []struct { + name string + generatedNames []string + createErrs []error + expectedNames []string + expectedErr error + }{ + { + name: "return ordinary table exists error", + generatedNames: []string{"$mlog$t"}, + createErrs: []error{tableExistsErr}, + expectedNames: []string{"$mlog$t"}, + expectedErr: tableExistsErr, + }, + { + name: "return base mlog already exists error", + generatedNames: []string{"$mlog$t"}, + createErrs: []error{mlogAlreadyExistsErr}, + expectedNames: []string{"$mlog$t"}, + expectedErr: mlogAlreadyExistsErr, + }, + { + name: "retry on generated table name conflict", + generatedNames: []string{"$mlog$t", "$mlog$1"}, + createErrs: []error{tableNameConflictErr, nil}, + expectedNames: []string{"$mlog$t", "$mlog$1"}, + }, + { + name: "return generated table name conflict after retry limit", + generatedNames: repeatNames("$mlog$t", maxCreateMLogNameConflictRetries+1), + createErrs: repeatErrs(tableNameConflictErr, maxCreateMLogNameConflictRetries+1), + expectedNames: repeatNames("$mlog$t", maxCreateMLogNameConflictRetries+1), + expectedErr: tableNameConflictErr, + }, + { + name: "return non table exists error", + generatedNames: []string{"$mlog$t"}, + createErrs: []error{otherErr}, + expectedNames: []string{"$mlog$t"}, + expectedErr: otherErr, + }, + { + name: "return nil on success", + generatedNames: []string{"$mlog$t"}, + createErrs: []error{nil}, + expectedNames: []string{"$mlog$t"}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ddlExecutor := &mockMLogDDLExecutor{ + generatedNames: tt.generatedNames, + createErrs: tt.createErrs, + } + exec := &DDLExec{ + BaseExecutor: execpkg.NewBaseExecutor(mock.NewContext(), nil, 0), + ddlExecutor: ddlExecutor, + } + + err := exec.executeCreateMaterializedViewLog(context.Background(), &ast.CreateMaterializedViewLogStmt{}) + if tt.expectedErr != nil { + require.ErrorIs(t, err, tt.expectedErr) + } else { + require.NoError(t, err) + } + require.Equal(t, tt.expectedNames, ddlExecutor.createNames) + require.Equal(t, len(tt.expectedNames), ddlExecutor.generateCalls) + }) + } +} diff --git a/pkg/executor/materialized_view.go b/pkg/executor/materialized_view.go index a39d42976cd0d..4ea0fda9a478d 100644 --- a/pkg/executor/materialized_view.go +++ b/pkg/executor/materialized_view.go @@ -2295,29 +2295,12 @@ func (e *PurgeMaterializedViewLogExec) resolvePurgeMaterializedViewLogMeta( return schemaName, nil, mlogName, 0, nil, dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE") } baseTableMeta = baseTable.Meta() - baseTableID := baseTableMeta.ID - mlogName = pmodel.NewCIStr("$mlog$" + baseTableMeta.Name.O) - mlogTable, err := is.TableByName(context.Background(), schemaName, mlogName) + mlogTable, err := ddl.GetMLogTableByBaseTable(context.Background(), is, schemaName, baseTableMeta) if err != nil { - if infoschema.ErrTableNotExists.Equal(err) { - return schemaName, baseTableMeta, mlogName, 0, nil, errors.Errorf( - "materialized view log does not exist for base table %s.%s", - schemaName.O, - s.Table.Name.O, - ) - } return schemaName, baseTableMeta, mlogName, 0, nil, err } - if mlogTable.Meta().MaterializedViewLog == nil || mlogTable.Meta().MaterializedViewLog.BaseTableID != baseTableID { - return schemaName, baseTableMeta, mlogName, 0, nil, errors.Errorf( - "table %s.%s is not a materialized view log for base table %s.%s", - schemaName.O, - mlogName.O, - schemaName.O, - s.Table.Name.O, - ) - } + mlogName = mlogTable.Meta().Name mlogID = mlogTable.Meta().ID mlogInfo = mlogTable.Meta().MaterializedViewLog diff --git a/pkg/executor/test/ddl/mview_log_ddl_test.go b/pkg/executor/test/ddl/mview_log_ddl_test.go index 86371d577b735..8b96573560426 100644 --- a/pkg/executor/test/ddl/mview_log_ddl_test.go +++ b/pkg/executor/test/ddl/mview_log_ddl_test.go @@ -71,6 +71,17 @@ func differentIsolationReadEnginesForTest(current string) string { return "tikv" } +func resetMLogNameSeqForTest(t *testing.T) { + oldMLogSeq := ddl.MLogTableNameSeq.Load() + oldShortSeq := ddl.MLogShortTableNameSeq.Load() + ddl.MLogTableNameSeq.Store(0) + ddl.MLogShortTableNameSeq.Store(0) + t.Cleanup(func() { + ddl.MLogTableNameSeq.Store(oldMLogSeq) + ddl.MLogShortTableNameSeq.Store(oldShortSeq) + }) +} + func TestCreateMaterializedViewLogBasic(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) tk := testkit.NewTestKit(t, store) @@ -121,8 +132,50 @@ func TestCreateMaterializedViewLogBasic(t *testing.T) { require.True(t, hasDMLType) require.True(t, hasOldNew) - // Duplicated MV LOG should fail (same derived table name). - tk.MustGetErrMsg("create materialized view log on t (a)", "[schema:1050]Table 'test.$mlog$t' already exists") + // Duplicated MV LOG should fail because the base table already has one. + tk.MustGetErrMsg("create materialized view log on t (a)", "[schema:1050]materialized view log for base table 'test.t' already exists") +} + +func TestCreateMaterializedViewLogRetriesWhenGeneratedNameStolen(t *testing.T) { + resetMLogNameSeqForTest(t) + store, dom := testkit.CreateMockStoreAndDomain(t) + if ttlJobManager := dom.TTLJobManager(); ttlJobManager != nil { + ttlJobManager.Stop() + require.NoError(t, ttlJobManager.WaitStopped(context.Background(), 10*time.Second)) + } + tk := testkit.NewTestKit(t, store) + thief := testkit.NewTestKit(t, store) + tk.MustExec("use test") + thief.MustExec("use test") + tk.MustExec("create table t_retry_mlog_name (a int)") + + const stolenMLogName = "$mlog$t_retry_mlog_name" + var stolen atomic.Bool + failpointName := "github.com/pingcap/tidb/pkg/executor/afterCreateMaterializedViewLogNameGenerated" + require.NoError(t, failpoint.EnableCall(failpointName, func(mlogTableName string) { + if mlogTableName != stolenMLogName { + return + } + if stolen.CompareAndSwap(false, true) { + thief.MustExec(fmt.Sprintf("create table `%s` (a int)", mlogTableName)) + } + })) + defer func() { + require.NoError(t, failpoint.Disable(failpointName)) + }() + + tk.MustExec("create materialized view log on t_retry_mlog_name (a)") + require.True(t, stolen.Load()) + tk.MustQuery("show tables like '$mlog$t_retry_mlog_name'").Check(testkit.Rows("$mlog$t_retry_mlog_name")) + tk.MustQuery("show tables like '$mlog$1'").Check(testkit.Rows("$mlog$1")) + + is := dom.InfoSchema() + baseTable, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("t_retry_mlog_name")) + require.NoError(t, err) + mlogTable, err := is.TableByName(context.Background(), pmodel.NewCIStr("test"), pmodel.NewCIStr("$mlog$1")) + require.NoError(t, err) + require.NotNil(t, baseTable.Meta().MaterializedViewBase) + require.Equal(t, mlogTable.Meta().ID, baseTable.Meta().MaterializedViewBase.MLogID) } func TestCreateMaterializedViewLogPrivilege(t *testing.T) { @@ -211,6 +264,26 @@ func TestGrantMaterializedViewObjectPrivileges(t *testing.T) { require.ErrorContains(t, err, "cannot grant Update privilege on materialized view log") } +func TestCreateMaterializedViewLogNameWithMLogPrefix(t *testing.T) { + resetMLogNameSeqForTest(t) + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table `$mlog$base_table_name` (a int, key idx_a(a))") + tk.MustExec("create table `$mlog$1base_table_name` (a int)") + + tk.MustExec("create materialized view log on `$mlog$base_table_name` (a)") + tk.MustQuery("show tables like '$mlog$2base_table_name'").Check(testkit.Rows("$mlog$2base_table_name")) + tk.MustQuery("show create materialized view log on `$mlog$base_table_name`"). + CheckContain("CREATE MATERIALIZED VIEW LOG ON `$mlog$base_table_name` (`a`)") + + tk.MustExec("create materialized view mv_mlog_prefix (a, cnt) refresh fast as select a, count(1) from `$mlog$base_table_name` group by a") + tk.MustExec("alter materialized view log on `$mlog$base_table_name` purge next date_add(now(), interval 1 hour)") + tk.MustExec("drop materialized view mv_mlog_prefix") + tk.MustExec("drop materialized view log on `$mlog$base_table_name`") + tk.MustQuery("show tables like '$mlog$2base_table_name'").Check(testkit.Rows()) +} + func TestShowCreateMaterializedViewLog(t *testing.T) { store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) @@ -566,6 +639,7 @@ func TestCreateMaterializedViewLogRejectNonBaseObject(t *testing.T) { } func TestCreateMaterializedViewLogNameLengthByRune(t *testing.T) { + resetMLogNameSeqForTest(t) store := testkit.CreateMockStore(t) tk := testkit.NewTestKit(t, store) tk.MustExec("use test") @@ -576,8 +650,12 @@ func TestCreateMaterializedViewLogNameLengthByRune(t *testing.T) { tk.MustQuery(fmt.Sprintf("select count(*) from information_schema.tables where table_schema='test' and table_name='%s'", "$mlog$"+maxName)).Check(testkit.Rows("1")) tooLongName := strings.Repeat("่กจ", 59) + tk.MustExec("create table `$mlog$1` (a int)") tk.MustExec(fmt.Sprintf("create table `%s` (a int)", tooLongName)) - tk.MustGetErrCode(fmt.Sprintf("create materialized view log on `%s` (a)", tooLongName), errno.ErrTooLongIdent) + tk.MustExec(fmt.Sprintf("create materialized view log on `%s` (a)", tooLongName)) + tk.MustQuery("select count(*) from information_schema.tables where table_schema='test' and table_name='$mlog$2'").Check(testkit.Rows("1")) + tk.MustQuery(fmt.Sprintf("show create materialized view log on `%s`", tooLongName)). + CheckContain(fmt.Sprintf("CREATE MATERIALIZED VIEW LOG ON `%s` (`a`)", tooLongName)) } func TestCreateMaterializedViewLogUpdatesPlacementBundle(t *testing.T) {