Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 35 additions & 6 deletions pkg/executor/slow_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ type signalsKey struct{}
// ParseSlowLogBatchSize is the batch size of slow-log lines for a worker to parse, exported for testing.
var ParseSlowLogBatchSize = 64

// slowLogTimeRangeInternalTolerance only widens internal file pruning and
// reverse-scan stop checks. Rows are still filtered by the original time ranges
// in slowLogChecker. In a real use cluster, the max time unorder drift is 50ms.
// The 1s tolerance should be enough.
const slowLogTimeRangeInternalTolerance = time.Second

// slowQueryRetriever is used to read slow log data.
type slowQueryRetriever struct {
table *model.TableInfo
Expand Down Expand Up @@ -401,7 +407,7 @@ func newSlowLogReverseScanner(e *slowQueryRetriever, sctx sessionctx.Context) *s
minStart = tr.startTime
}
}
scanner.minStartTime = minStart
scanner.minStartTime = slowLogTimeWithTolerance(minStart, tz, -slowLogTimeRangeInternalTolerance)
scanner.hasMinStart = true
}
return scanner
Expand Down Expand Up @@ -1261,6 +1267,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co
if err != nil {
return nil, err
}
tz := sctx.GetSessionVars().Location()
walkFn := func(path string, info os.DirEntry) error {
if info.IsDir() {
return nil
Expand Down Expand Up @@ -1289,12 +1296,11 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co
if err != nil {
return handleErr(err)
}
tz := sctx.GetSessionVars().Location()
start := types.NewTime(types.FromGoTime(fileStartTime.In(tz)), mysql.TypeDatetime, types.MaxFsp)
if e.checker.enableTimeCheck {
notInAllTimeRanges := true
for _, tr := range e.checker.timeRanges {
if start.Compare(tr.endTime) <= 0 {
if start.Compare(slowLogTimeWithTolerance(tr.endTime, tz, slowLogTimeRangeInternalTolerance)) <= 0 {
notInAllTimeRanges = false
break
}
Expand All @@ -1316,7 +1322,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co
end := types.NewTime(types.FromGoTime(fileEndTime.In(tz)), mysql.TypeDatetime, types.MaxFsp)
inTimeRanges := false
for _, tr := range e.checker.timeRanges {
if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) {
if slowLogMayOverlapTimeRangeWithTolerance(start, end, tr, tz) {
inTimeRanges = true
break
}
Expand Down Expand Up @@ -1360,7 +1366,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co
end := logFiles[i+1].start
inTimeRanges := false
for _, tr := range e.checker.timeRanges {
if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) {
if slowLogMayOverlapTimeRangeWithTolerance(start, end, tr, tz) {
inTimeRanges = true
break
}
Expand All @@ -1372,6 +1378,20 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co
return ret, err
}

func slowLogMayOverlapTimeRangeWithTolerance(start, end types.Time, tr *timeRange, tz *time.Location) bool {
rangeStart := slowLogTimeWithTolerance(tr.startTime, tz, -slowLogTimeRangeInternalTolerance)
rangeEnd := slowLogTimeWithTolerance(tr.endTime, tz, slowLogTimeRangeInternalTolerance)
return !(start.Compare(rangeEnd) > 0 || end.Compare(rangeStart) < 0)
}

func slowLogTimeWithTolerance(t types.Time, tz *time.Location, tolerance time.Duration) types.Time {
goTime, err := t.CoreTime().GoTime(tz)
if err != nil {
return t
}
return types.NewTime(types.FromGoTime(goTime.Add(tolerance)), t.Type(), t.Fsp())
}

func (*slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File, compressed bool) (time.Time, error) {
var t time.Time
_, err := file.Seek(0, io.SeekStart)
Expand Down Expand Up @@ -1540,7 +1560,16 @@ func readLastLines(ctx context.Context, file *os.File, endCursor int64) ([]strin
}
}
finalStr := string(lines[firstNonNewlinePos:])
return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr), nil
return splitSlowLogLines(finalStr), len(finalStr), nil
}

func splitSlowLogLines(s string) []string {
lines := strings.Split(strings.ReplaceAll(s, "\r\n", "\n"), "\n")
// strip the last empty string if the ending is new line symbol.
if len(lines) > 0 && lines[len(lines)-1] == "" {
lines = lines[:len(lines)-1]
}
return lines
}

func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) {
Expand Down
138 changes: 129 additions & 9 deletions pkg/executor/slow_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/infoschema"
"github.com/pingcap/tidb/pkg/meta/metadef"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/parser/ast"
"github.com/pingcap/tidb/pkg/parser/terror"
plannercore "github.com/pingcap/tidb/pkg/planner/core"
Expand Down Expand Up @@ -612,7 +613,7 @@ select 7;`
}
}

func TestSplitbyColon(t *testing.T) {
func TestSplitByColon(t *testing.T) {
cases := []struct {
line string
fields []string
Expand Down Expand Up @@ -814,6 +815,16 @@ select 9;`
}
}

func getTimeColIdxFromRetrieverOutput(t *testing.T, outputCol []*model.ColumnInfo) int {
for idx, col := range outputCol {
if col.Name.O == variable.SlowLogTimeStr {
return idx
}
}
t.Fatalf("cannot find Time column in retriever output, %v", outputCol)
return -1
}

func TestSlowQueryRetrieverReversedScanWithLimit(t *testing.T) {
fileName := "tidb-slow-limit-reverse-scan.log"
slowLog := `# Time: 2020-02-15T18:00:01.000000+08:00
Expand All @@ -840,14 +851,7 @@ select 5;`
retriever.extractor = &plannercore.SlowQueryExtractor{Desc: true}
retriever.limit = 2

timeColIdx := -1
for idx, col := range retriever.outputCols {
if col.Name.O == variable.SlowLogTimeStr {
timeColIdx = idx
break
}
}
require.GreaterOrEqual(t, timeColIdx, 0)
timeColIdx := getTimeColIdxFromRetrieverOutput(t, retriever.outputCols)

DashboardSlowLogReadBlockCnt4Test = 0
ctx := context.Background()
Expand All @@ -870,6 +874,122 @@ select 5;`
require.Equal(t, "2020-02-15 22:00:05.000000", t0)
require.Equal(t, "2020-02-15 21:00:05.000000", t1)
require.NoError(t, retriever.close())

// The long DB line makes the middle slow-log block cross readLastLines chunks.
// Forward scan can parse it; reverse scan should not synthesize blank lines and
// drop it. Previous there's a bug when in the execution path of LIMIT.
crossChunkFileName := "tidb-slow-limit-reverse-scan-cross-chunk.log"
crossChunkSlowLog := fmt.Sprintf(`# Time: 2020-02-15T18:00:01.000000+08:00
select 1;
# Time: 2020-02-15T19:00:05.000000+08:00
# DB: %s
select 2;
# Time: 2020-02-15T20:00:05.000000+08:00
select 3;`, strings.Repeat("x", 5000))
prepareLogs(t, []string{crossChunkSlowLog}, []string{crossChunkFileName})
defer removeFiles([]string{crossChunkFileName})
sctx.GetSessionVars().SlowQueryFile = crossChunkFileName

forwardRetriever, err := newSlowQueryRetriever()
require.NoError(t, err)
require.NoError(t, forwardRetriever.initialize(context.Background(), sctx))
reader, err := forwardRetriever.getNextReader()
require.NoError(t, err)
forwardRows, err := parseLog(forwardRetriever, sctx, reader)
require.NoError(t, err)
require.Len(t, forwardRows, 3)
require.NoError(t, forwardRetriever.close())

reverseRetriever, err := newSlowQueryRetriever()
require.NoError(t, err)
reverseRetriever.extractor = &plannercore.SlowQueryExtractor{Desc: true}
reverseRetriever.limit = 3
reverseRows := make([][]types.Datum, 0, reverseRetriever.limit)
for {
rows, err := reverseRetriever.retrieve(context.Background(), sctx)
require.NoError(t, err)
if len(rows) == 0 {
break
}
reverseRows = append(reverseRows, rows...)
}
require.Len(t, reverseRows, 3)
require.NoError(t, reverseRetriever.close())
}

func TestSlowQueryRetrieverReversedScanWithTimeJitter(t *testing.T) {
fileName := "tidb-slow-limit-reverse-scan-time-jitter.log"
slowLog := `# Time: 2020-02-15T18:00:00.000500+08:00
select in-window-1;
# Time: 2020-02-15T18:00:00.001000+08:00
select in-window-2;
# Time: 2020-02-15T17:59:59.999700+08:00
select just-before-window;
# Time: 2020-02-15T18:00:00.001500+08:00
select in-window-3;`
prepareLogs(t, []string{slowLog}, []string{fileName})
defer removeFiles([]string{fileName})

loc, err := time.LoadLocation("Asia/Shanghai")
require.NoError(t, err)
sctx := mock.NewContext()
sctx.ResetSessionAndStmtTimeZone(loc)
sctx.GetSessionVars().SlowQueryFile = fileName
startTime, err := ParseTime("2020-02-15T18:00:00.000000+08:00")
require.NoError(t, err)
endTime, err := ParseTime("2020-02-15T18:00:00.002000+08:00")
require.NoError(t, err)
timeRange := []*plannercore.TimeRange{{StartTime: startTime, EndTime: endTime}}

forwardRetriever, err := newSlowQueryRetriever()
require.NoError(t, err)
forwardRetriever.extractor = &plannercore.SlowQueryExtractor{Enable: true, TimeRanges: timeRange}
require.NoError(t, forwardRetriever.initialize(context.Background(), sctx))
reader, err := forwardRetriever.getNextReader()
require.NoError(t, err)
forwardRows, err := parseLog(forwardRetriever, sctx, reader)
require.NoError(t, err)
require.Len(t, forwardRows, 3)
timeColIdx := getTimeColIdxFromRetrieverOutput(t, forwardRetriever.outputCols)
t0, err := forwardRows[0][timeColIdx].ToString()
require.NoError(t, err)
t1, err := forwardRows[1][timeColIdx].ToString()
require.NoError(t, err)
t2, err := forwardRows[2][timeColIdx].ToString()
require.NoError(t, err)
require.Equal(t, "2020-02-15 18:00:00.000500", t0)
require.Equal(t, "2020-02-15 18:00:00.001000", t1)
require.Equal(t, "2020-02-15 18:00:00.001500", t2)

require.NoError(t, forwardRetriever.close())

// This mirrors real slow logs where adjacent # Time values can move backwards
// by a few milliseconds. A block just before the lower bound should not make
// reverse scan stop before earlier in-range blocks are read.
reverseRetriever, err := newSlowQueryRetriever()
require.NoError(t, err)
reverseRetriever.extractor = &plannercore.SlowQueryExtractor{Enable: true, Desc: true, TimeRanges: timeRange}
reverseRetriever.limit = 3
reverseRows := make([][]types.Datum, 0, reverseRetriever.limit)
for {
rows, err := reverseRetriever.retrieve(context.Background(), sctx)
require.NoError(t, err)
if len(rows) == 0 {
break
}
reverseRows = append(reverseRows, rows...)
}
require.Len(t, reverseRows, 3)
t0, err = reverseRows[0][timeColIdx].ToString()
require.NoError(t, err)
t1, err = reverseRows[1][timeColIdx].ToString()
require.NoError(t, err)
t2, err = reverseRows[2][timeColIdx].ToString()
require.NoError(t, err)
require.Equal(t, "2020-02-15 18:00:00.000500", t2)
require.Equal(t, "2020-02-15 18:00:00.001000", t1)
require.Equal(t, "2020-02-15 18:00:00.001500", t0)
require.NoError(t, reverseRetriever.close())
}

func TestPBPlanBuilderPushDownLimitToSlowQueryRetriever(t *testing.T) {
Expand Down