Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion cmd/go-tpc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,9 @@ func main() {
}
}()

rootCmd.Execute()
if err := rootCmd.Execute(); err != nil {
os.Exit(1)
}

cancel()
}
Expand Down
18 changes: 11 additions & 7 deletions cmd/go-tpc/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"github.com/pingcap/go-tpc/pkg/workload"
)

func checkPrepare(ctx context.Context, w workload.Workloader) {
func checkPrepare(ctx context.Context, w workload.Workloader) error {
// skip preparation check in csv case
if w.Name() == "tpcc-csv" {
fmt.Println("Skip preparing checking. Please load CSV data into database and check later.")
return
return nil
}
if w.Name() == "tpcc" && tpccConfig.NoCheck {
return
return nil
}

errCh := make(chan error, threads)
defer close(errCh)
var wg sync.WaitGroup
wg.Add(threads)
for i := 0; i < threads; i++ {
Expand All @@ -29,12 +31,12 @@ func checkPrepare(ctx context.Context, w workload.Workloader) {
defer w.CleanupThread(ctx, index)

if err := w.CheckPrepare(ctx, index); err != nil {
fmt.Printf("check prepare failed, err %v\n", err)
return
errCh <- err
}
}(i)
}
wg.Wait()
return <-errCh
}

func execute(timeoutCtx context.Context, w workload.Workloader, action string, threads, index int) error {
Expand Down Expand Up @@ -101,7 +103,7 @@ func execute(timeoutCtx context.Context, w workload.Workloader, action string, t
return nil
}

func executeWorkload(ctx context.Context, w workload.Workloader, threads int, action string) {
func executeWorkload(ctx context.Context, w workload.Workloader, threads int, action string) error {
var wg sync.WaitGroup
wg.Add(threads)

Expand Down Expand Up @@ -185,11 +187,13 @@ func executeWorkload(ctx context.Context, w workload.Workloader, threads int, ac

wg.Wait()

var checkErr error
if action == "prepare" {
// For prepare, we must check the data consistency after all prepare finished
checkPrepare(ctx, w)
checkErr = checkPrepare(ctx, w)
}
outputCancel()

<-ch
return checkErr
}
27 changes: 17 additions & 10 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (

var tpccConfig tpcc.Config

func executeTpcc(action string) {
func executeTpcc(action string) error {
if pprofAddr != "" {
go func() {
if err := http.ListenAndServe(pprofAddr, http.DefaultServeMux); err != nil {
Expand Down Expand Up @@ -81,10 +81,13 @@ func executeTpcc(action string) {
timeoutCtx, cancel := context.WithTimeout(globalCtx, totalTime)
defer cancel()

executeWorkload(timeoutCtx, w, threads, action)
if err := executeWorkload(timeoutCtx, w, threads, action); err != nil {
return err
}

fmt.Println("Finished")
w.OutputStats(true)
return nil
}

func registerTpcc(root *cobra.Command) {
Expand All @@ -99,8 +102,9 @@ func registerTpcc(root *cobra.Command) {
var cmdPrepare = &cobra.Command{
Use: "prepare",
Short: "Prepare data for TPCC",
Run: func(cmd *cobra.Command, _ []string) {
executeTpcc("prepare")
RunE: func(cmd *cobra.Command, _ []string) error {
cmd.SilenceUsage = true
return executeTpcc("prepare")
},
}
cmdPrepare.PersistentFlags().BoolVar(&tpccConfig.NoCheck, "no-check", false, "TPCC prepare check, default false")
Expand All @@ -117,8 +121,9 @@ func registerTpcc(root *cobra.Command) {
var cmdRun = &cobra.Command{
Use: "run",
Short: "Run workload",
Run: func(cmd *cobra.Command, _ []string) {
executeTpcc("run")
RunE: func(cmd *cobra.Command, _ []string) error {
cmd.SilenceUsage = true
return executeTpcc("run")
},
}
cmdRun.PersistentFlags().BoolVar(&tpccConfig.Wait, "wait", false, "including keying & thinking time described on TPC-C Standard Specification")
Expand All @@ -129,16 +134,18 @@ func registerTpcc(root *cobra.Command) {
var cmdCleanup = &cobra.Command{
Use: "cleanup",
Short: "Cleanup data for the workload",
Run: func(cmd *cobra.Command, _ []string) {
executeTpcc("cleanup")
RunE: func(cmd *cobra.Command, _ []string) error {
cmd.SilenceUsage = true
return executeTpcc("cleanup")
},
}

var cmdCheck = &cobra.Command{
Use: "check",
Short: "Check data consistency for the workload",
Run: func(cmd *cobra.Command, _ []string) {
executeTpcc("check")
RunE: func(cmd *cobra.Command, _ []string) error {
cmd.SilenceUsage = true
return executeTpcc("check")
},
}

Expand Down