diff --git a/pkg/executor/slow_query.go b/pkg/executor/slow_query.go index 17fdcf294fdff..8cab021c79900 100644 --- a/pkg/executor/slow_query.go +++ b/pkg/executor/slow_query.go @@ -60,6 +60,12 @@ type signalsKey struct{} // ParseSlowLogBatchSize is the batch size of slow-log lines for a worker to parse, exported for testing. var ParseSlowLogBatchSize = 64 +// slowLogTimeRangeInternalTolerance only widens internal file pruning and +// reverse-scan stop checks. Rows are still filtered by the original time ranges +// in slowLogChecker. In a real use cluster, the max time unorder drift is 50ms. +// The 1s tolerance should be enough. +const slowLogTimeRangeInternalTolerance = time.Second + // slowQueryRetriever is used to read slow log data. type slowQueryRetriever struct { table *model.TableInfo @@ -401,7 +407,7 @@ func newSlowLogReverseScanner(e *slowQueryRetriever, sctx sessionctx.Context) *s minStart = tr.startTime } } - scanner.minStartTime = minStart + scanner.minStartTime = slowLogTimeWithTolerance(minStart, tz, -slowLogTimeRangeInternalTolerance) scanner.hasMinStart = true } return scanner @@ -1261,6 +1267,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if err != nil { return nil, err } + tz := sctx.GetSessionVars().Location() walkFn := func(path string, info os.DirEntry) error { if info.IsDir() { return nil @@ -1289,12 +1296,11 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co if err != nil { return handleErr(err) } - tz := sctx.GetSessionVars().Location() start := types.NewTime(types.FromGoTime(fileStartTime.In(tz)), mysql.TypeDatetime, types.MaxFsp) if e.checker.enableTimeCheck { notInAllTimeRanges := true for _, tr := range e.checker.timeRanges { - if start.Compare(tr.endTime) <= 0 { + if start.Compare(slowLogTimeWithTolerance(tr.endTime, tz, slowLogTimeRangeInternalTolerance)) <= 0 { notInAllTimeRanges = false break } @@ -1316,7 +1322,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co end := types.NewTime(types.FromGoTime(fileEndTime.In(tz)), mysql.TypeDatetime, types.MaxFsp) inTimeRanges := false for _, tr := range e.checker.timeRanges { - if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) { + if slowLogMayOverlapTimeRangeWithTolerance(start, end, tr, tz) { inTimeRanges = true break } @@ -1360,7 +1366,7 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co end := logFiles[i+1].start inTimeRanges := false for _, tr := range e.checker.timeRanges { - if !(start.Compare(tr.endTime) > 0 || end.Compare(tr.startTime) < 0) { + if slowLogMayOverlapTimeRangeWithTolerance(start, end, tr, tz) { inTimeRanges = true break } @@ -1372,6 +1378,20 @@ func (e *slowQueryRetriever) getAllFiles(ctx context.Context, sctx sessionctx.Co return ret, err } +func slowLogMayOverlapTimeRangeWithTolerance(start, end types.Time, tr *timeRange, tz *time.Location) bool { + rangeStart := slowLogTimeWithTolerance(tr.startTime, tz, -slowLogTimeRangeInternalTolerance) + rangeEnd := slowLogTimeWithTolerance(tr.endTime, tz, slowLogTimeRangeInternalTolerance) + return !(start.Compare(rangeEnd) > 0 || end.Compare(rangeStart) < 0) +} + +func slowLogTimeWithTolerance(t types.Time, tz *time.Location, tolerance time.Duration) types.Time { + goTime, err := t.CoreTime().GoTime(tz) + if err != nil { + return t + } + return types.NewTime(types.FromGoTime(goTime.Add(tolerance)), t.Type(), t.Fsp()) +} + func (*slowQueryRetriever) getFileStartTime(ctx context.Context, file *os.File, compressed bool) (time.Time, error) { var t time.Time _, err := file.Seek(0, io.SeekStart) @@ -1540,7 +1560,16 @@ func readLastLines(ctx context.Context, file *os.File, endCursor int64) ([]strin } } finalStr := string(lines[firstNonNewlinePos:]) - return strings.Split(strings.ReplaceAll(finalStr, "\r\n", "\n"), "\n"), len(finalStr), nil + return splitSlowLogLines(finalStr), len(finalStr), nil +} + +func splitSlowLogLines(s string) []string { + lines := strings.Split(strings.ReplaceAll(s, "\r\n", "\n"), "\n") + // strip the last empty string if the ending is new line symbol. + if len(lines) > 0 && lines[len(lines)-1] == "" { + lines = lines[:len(lines)-1] + } + return lines } func (e *slowQueryRetriever) initializeAsyncParsing(ctx context.Context, sctx sessionctx.Context) { diff --git a/pkg/executor/slow_query_test.go b/pkg/executor/slow_query_test.go index 31281553ae92a..d61e72fa54f77 100644 --- a/pkg/executor/slow_query_test.go +++ b/pkg/executor/slow_query_test.go @@ -31,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/meta/metadef" + "github.com/pingcap/tidb/pkg/meta/model" "github.com/pingcap/tidb/pkg/parser/ast" "github.com/pingcap/tidb/pkg/parser/terror" plannercore "github.com/pingcap/tidb/pkg/planner/core" @@ -612,7 +613,7 @@ select 7;` } } -func TestSplitbyColon(t *testing.T) { +func TestSplitByColon(t *testing.T) { cases := []struct { line string fields []string @@ -814,6 +815,16 @@ select 9;` } } +func getTimeColIdxFromRetrieverOutput(t *testing.T, outputCol []*model.ColumnInfo) int { + for idx, col := range outputCol { + if col.Name.O == variable.SlowLogTimeStr { + return idx + } + } + t.Fatalf("cannot find Time column in retriever output, %v", outputCol) + return -1 +} + func TestSlowQueryRetrieverReversedScanWithLimit(t *testing.T) { fileName := "tidb-slow-limit-reverse-scan.log" slowLog := `# Time: 2020-02-15T18:00:01.000000+08:00 @@ -840,14 +851,7 @@ select 5;` retriever.extractor = &plannercore.SlowQueryExtractor{Desc: true} retriever.limit = 2 - timeColIdx := -1 - for idx, col := range retriever.outputCols { - if col.Name.O == variable.SlowLogTimeStr { - timeColIdx = idx - break - } - } - require.GreaterOrEqual(t, timeColIdx, 0) + timeColIdx := getTimeColIdxFromRetrieverOutput(t, retriever.outputCols) DashboardSlowLogReadBlockCnt4Test = 0 ctx := context.Background() @@ -870,6 +874,122 @@ select 5;` require.Equal(t, "2020-02-15 22:00:05.000000", t0) require.Equal(t, "2020-02-15 21:00:05.000000", t1) require.NoError(t, retriever.close()) + + // The long DB line makes the middle slow-log block cross readLastLines chunks. + // Forward scan can parse it; reverse scan should not synthesize blank lines and + // drop it. Previous there's a bug when in the execution path of LIMIT. + crossChunkFileName := "tidb-slow-limit-reverse-scan-cross-chunk.log" + crossChunkSlowLog := fmt.Sprintf(`# Time: 2020-02-15T18:00:01.000000+08:00 +select 1; +# Time: 2020-02-15T19:00:05.000000+08:00 +# DB: %s +select 2; +# Time: 2020-02-15T20:00:05.000000+08:00 +select 3;`, strings.Repeat("x", 5000)) + prepareLogs(t, []string{crossChunkSlowLog}, []string{crossChunkFileName}) + defer removeFiles([]string{crossChunkFileName}) + sctx.GetSessionVars().SlowQueryFile = crossChunkFileName + + forwardRetriever, err := newSlowQueryRetriever() + require.NoError(t, err) + require.NoError(t, forwardRetriever.initialize(context.Background(), sctx)) + reader, err := forwardRetriever.getNextReader() + require.NoError(t, err) + forwardRows, err := parseLog(forwardRetriever, sctx, reader) + require.NoError(t, err) + require.Len(t, forwardRows, 3) + require.NoError(t, forwardRetriever.close()) + + reverseRetriever, err := newSlowQueryRetriever() + require.NoError(t, err) + reverseRetriever.extractor = &plannercore.SlowQueryExtractor{Desc: true} + reverseRetriever.limit = 3 + reverseRows := make([][]types.Datum, 0, reverseRetriever.limit) + for { + rows, err := reverseRetriever.retrieve(context.Background(), sctx) + require.NoError(t, err) + if len(rows) == 0 { + break + } + reverseRows = append(reverseRows, rows...) + } + require.Len(t, reverseRows, 3) + require.NoError(t, reverseRetriever.close()) +} + +func TestSlowQueryRetrieverReversedScanWithTimeJitter(t *testing.T) { + fileName := "tidb-slow-limit-reverse-scan-time-jitter.log" + slowLog := `# Time: 2020-02-15T18:00:00.000500+08:00 +select in-window-1; +# Time: 2020-02-15T18:00:00.001000+08:00 +select in-window-2; +# Time: 2020-02-15T17:59:59.999700+08:00 +select just-before-window; +# Time: 2020-02-15T18:00:00.001500+08:00 +select in-window-3;` + prepareLogs(t, []string{slowLog}, []string{fileName}) + defer removeFiles([]string{fileName}) + + loc, err := time.LoadLocation("Asia/Shanghai") + require.NoError(t, err) + sctx := mock.NewContext() + sctx.ResetSessionAndStmtTimeZone(loc) + sctx.GetSessionVars().SlowQueryFile = fileName + startTime, err := ParseTime("2020-02-15T18:00:00.000000+08:00") + require.NoError(t, err) + endTime, err := ParseTime("2020-02-15T18:00:00.002000+08:00") + require.NoError(t, err) + timeRange := []*plannercore.TimeRange{{StartTime: startTime, EndTime: endTime}} + + forwardRetriever, err := newSlowQueryRetriever() + require.NoError(t, err) + forwardRetriever.extractor = &plannercore.SlowQueryExtractor{Enable: true, TimeRanges: timeRange} + require.NoError(t, forwardRetriever.initialize(context.Background(), sctx)) + reader, err := forwardRetriever.getNextReader() + require.NoError(t, err) + forwardRows, err := parseLog(forwardRetriever, sctx, reader) + require.NoError(t, err) + require.Len(t, forwardRows, 3) + timeColIdx := getTimeColIdxFromRetrieverOutput(t, forwardRetriever.outputCols) + t0, err := forwardRows[0][timeColIdx].ToString() + require.NoError(t, err) + t1, err := forwardRows[1][timeColIdx].ToString() + require.NoError(t, err) + t2, err := forwardRows[2][timeColIdx].ToString() + require.NoError(t, err) + require.Equal(t, "2020-02-15 18:00:00.000500", t0) + require.Equal(t, "2020-02-15 18:00:00.001000", t1) + require.Equal(t, "2020-02-15 18:00:00.001500", t2) + + require.NoError(t, forwardRetriever.close()) + + // This mirrors real slow logs where adjacent # Time values can move backwards + // by a few milliseconds. A block just before the lower bound should not make + // reverse scan stop before earlier in-range blocks are read. + reverseRetriever, err := newSlowQueryRetriever() + require.NoError(t, err) + reverseRetriever.extractor = &plannercore.SlowQueryExtractor{Enable: true, Desc: true, TimeRanges: timeRange} + reverseRetriever.limit = 3 + reverseRows := make([][]types.Datum, 0, reverseRetriever.limit) + for { + rows, err := reverseRetriever.retrieve(context.Background(), sctx) + require.NoError(t, err) + if len(rows) == 0 { + break + } + reverseRows = append(reverseRows, rows...) + } + require.Len(t, reverseRows, 3) + t0, err = reverseRows[0][timeColIdx].ToString() + require.NoError(t, err) + t1, err = reverseRows[1][timeColIdx].ToString() + require.NoError(t, err) + t2, err = reverseRows[2][timeColIdx].ToString() + require.NoError(t, err) + require.Equal(t, "2020-02-15 18:00:00.000500", t2) + require.Equal(t, "2020-02-15 18:00:00.001000", t1) + require.Equal(t, "2020-02-15 18:00:00.001500", t0) + require.NoError(t, reverseRetriever.close()) } func TestPBPlanBuilderPushDownLimitToSlowQueryRetriever(t *testing.T) {