Skip to content
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions pkg/ddl/ddl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"testing"
"time"
"unicode/utf8"

"github.com/pingcap/tidb/pkg/ddl/testargsv1"
"github.com/pingcap/tidb/pkg/domain/infosync"
Expand Down Expand Up @@ -174,6 +176,125 @@ func TestBuildCreateMaterializedViewLogPurgeInfoUpsertSQL(t *testing.T) {
require.Contains(t, sqlWithNull, ", NULL)")
}

func TestBuildMaterializedViewLogTableName(t *testing.T) {
oldMLogSeq := MLogTableNameSeq.Load()
oldShortSeq := MLogShortTableNameSeq.Load()
t.Cleanup(func() {
MLogTableNameSeq.Store(oldMLogSeq)
MLogShortTableNameSeq.Store(oldShortSeq)
})

const baseTableName = "base_table_name"
longPrefixedBaseTableName := materializedViewLogTablePrefix + strings.Repeat("t", mysql.MaxTableNameLength-len(materializedViewLogTablePrefix))
longBaseTableName := strings.Repeat("t", mysql.MaxTableNameLength-len(materializedViewLogTablePrefix)+1)

tests := []struct {
name string
baseTableName string
existingTables []string
expectedName string
expectedMLogSeq uint64
expectedShortSeq uint64
}{
{
name: "prefixed base name is renamed",
baseTableName: materializedViewLogTablePrefix + baseTableName,
expectedName: materializedViewLogTablePrefix + "1" + baseTableName,
expectedMLogSeq: 1,
},
{
name: "prefixed base name retries until renamed name does not conflict",
baseTableName: materializedViewLogTablePrefix + baseTableName,
existingTables: []string{
materializedViewLogTablePrefix + "1" + baseTableName,
materializedViewLogTablePrefix + "2" + baseTableName,
},
expectedName: materializedViewLogTablePrefix + "3" + baseTableName,
expectedMLogSeq: 3,
},
{
name: "prefixed base name falls back to short name when renamed name is too long",
baseTableName: longPrefixedBaseTableName,
expectedName: materializedViewLogTablePrefix + "1",
expectedMLogSeq: 1,
expectedShortSeq: 1,
},
{
name: "prefixed base name falls back to short name and retries conflict",
baseTableName: longPrefixedBaseTableName,
existingTables: []string{
materializedViewLogTablePrefix + "1",
materializedViewLogTablePrefix + "2",
},
expectedName: materializedViewLogTablePrefix + "3",
expectedMLogSeq: 1,
expectedShortSeq: 3,
},
{
name: "regular base name falls back to short name when mlog name is too long",
baseTableName: longBaseTableName,
expectedName: materializedViewLogTablePrefix + "1",
expectedShortSeq: 1,
},
{
name: "regular base name falls back to short name and retries conflict after mlog name is too long",
baseTableName: longBaseTableName,
existingTables: []string{
materializedViewLogTablePrefix + "1",
materializedViewLogTablePrefix + "2",
},
expectedName: materializedViewLogTablePrefix + "3",
expectedShortSeq: 3,
},
{
name: "regular base name falls back to short name when mlog name conflicts",
baseTableName: baseTableName,
existingTables: []string{
materializedViewLogTablePrefix + baseTableName,
},
expectedName: materializedViewLogTablePrefix + "1",
expectedShortSeq: 1,
},
{
name: "regular base name falls back to short name and retries conflict after mlog name conflicts",
baseTableName: baseTableName,
existingTables: []string{
materializedViewLogTablePrefix + baseTableName,
materializedViewLogTablePrefix + "1",
materializedViewLogTablePrefix + "2",
},
expectedName: materializedViewLogTablePrefix + "3",
expectedShortSeq: 3,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
MLogTableNameSeq.Store(0)
MLogShortTableNameSeq.Store(0)
exists := make(map[string]struct{}, len(tt.existingTables))
for _, tableName := range tt.existingTables {
exists[strings.ToLower(tableName)] = struct{}{}
}
tableExists := func(name pmodel.CIStr) (bool, error) {
_, ok := exists[name.L]
return ok, nil
}

name, err := GenerateMLogTableName(pmodel.NewCIStr(tt.baseTableName), tableExists)
require.NoError(t, err)
require.Equal(t, tt.expectedName, name)
require.LessOrEqual(t, utf8.RuneCountInString(strings.ToLower(name)), mysql.MaxTableNameLength)
require.Equal(t, tt.expectedMLogSeq, MLogTableNameSeq.Load())
require.Equal(t, tt.expectedShortSeq, MLogShortTableNameSeq.Load())
})
}

MLogTableNameSeq.Store(math.MaxUint64)
_, err := nextMLogTableNameNumber(&MLogTableNameSeq, "out of range")
require.ErrorContains(t, err, "out of range")
}

func TestBuildMViewRefreshOutOfPlaceCutoverInvolvingSchemaInfo(t *testing.T) {
const (
oldMViewID int64 = 101
Expand Down
54 changes: 39 additions & 15 deletions pkg/ddl/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,9 @@ type Executor interface {
DropSchema(ctx sessionctx.Context, stmt *ast.DropDatabaseStmt) error
CreateTable(ctx sessionctx.Context, stmt *ast.CreateTableStmt) error
CreateMaterializedView(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewStmt) error
CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt) error
CreateMaterializedViewLog(ctx sessionctx.Context, stmt *ast.CreateMaterializedViewLogStmt, mlogTableName string) error
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
GenerateMLogTableName(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) (string, error)
DropTable(ctx sessionctx.Context, stmt *ast.DropTableStmt) (err error)
DropMaterializedView(ctx sessionctx.Context, stmt *ast.DropMaterializedViewStmt) error
DropMaterializedViewLog(ctx sessionctx.Context, stmt *ast.DropMaterializedViewLogStmt) error
Expand Down Expand Up @@ -1063,7 +1064,40 @@ func (e *executor) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (
return e.CreateTableWithInfo(ctx, schema.Name, tbInfo, involvingRef, WithOnExist(onExist))
}

func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) error {
// GenerateMLogTableName implements the DDL interface.
func (e *executor) GenerateMLogTableName(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt) (string, error) {
is := e.infoCache.GetLatest()
schemaName := s.Table.Schema
if schemaName.O == "" {
if ctx.GetSessionVars().CurrentDB == "" {
return "", errors.Trace(plannererrors.ErrNoDB)
}
schemaName = pmodel.NewCIStr(ctx.GetSessionVars().CurrentDB)
}

baseTable, err := is.TableByName(e.ctx, schemaName, s.Table.Name)
if err != nil {
return "", err
}
if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone {
return "", dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE")
}
if baseInfo := baseTable.Meta().MaterializedViewBase; baseInfo != nil && baseInfo.MLogID != 0 {
return "", infoschema.ErrTableExists.GenWithStackByArgs(fmt.Sprintf("mlog of %s.%s has been created before", schemaName, baseTable.Meta().Name.O))
}

mlogName, err := GenerateMLogTableName(
baseTable.Meta().Name,
getExistenceOfMLogTableNameChecker(e.ctx, is, schemaName),
)
if err != nil {
return "", err
}

return mlogName, nil
}

func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.CreateMaterializedViewLogStmt, mlogTableName string) error {
is := e.infoCache.GetLatest()
schemaName := s.Table.Schema
if schemaName.O == "" {
Expand All @@ -1085,18 +1119,8 @@ func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.Crea
if baseTable.Meta().IsView() || baseTable.Meta().IsSequence() || baseTable.Meta().TempTableType != model.TempTableNone {
return dbterror.ErrWrongObject.GenWithStackByArgs(schemaName, s.Table.Name, "BASE TABLE")
}

mlogName := "$mlog$" + baseTable.Meta().Name.O
mlogNameCIStr := pmodel.NewCIStr(mlogName)
if err := checkTooLongTable(mlogNameCIStr); err != nil {
return err
}
_, err = is.TableByName(e.ctx, schemaName, mlogNameCIStr)
if err == nil {
return infoschema.ErrTableExists.GenWithStackByArgs(ast.Ident{Schema: schemaName, Name: mlogNameCIStr})
}
if !infoschema.ErrTableNotExists.Equal(err) {
return err
if baseInfo := baseTable.Meta().MaterializedViewBase; baseInfo != nil && baseInfo.MLogID != 0 {
return infoschema.ErrTableExists.GenWithStackByArgs(fmt.Sprintf("mlog of %s.%s has been created before", schemaName, baseTable.Meta().Name.O))
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

colMap := make(map[string]*model.ColumnInfo, len(baseTable.Meta().Columns))
Expand Down Expand Up @@ -1147,7 +1171,7 @@ func (e *executor) CreateMaterializedViewLog(ctx sessionctx.Context, s *ast.Crea
}

createTableStmt := &ast.CreateTableStmt{
Table: &ast.TableName{Schema: schemaName, Name: mlogNameCIStr},
Table: &ast.TableName{Schema: schemaName, Name: pmodel.NewCIStr(mlogTableName)},
Cols: colDefs,
Options: s.Options,
}
Expand Down
Loading