Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/go-tpc/tpcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ func registerTpcc(root *cobra.Command) {
cmdRun.PersistentFlags().DurationVar(&tpccConfig.MaxMeasureLatency, "max-measure-latency", measurement.DefaultMaxLatency, "max measure latency in millisecond")
cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmdRun.Flags().DurationVar(&tpccConfig.ConnRefreshInterval, "conn-refresh-interval", 0, "automatically refresh database connections at specified intervals to balance traffic across new replicas (0 = disabled, e.g., 10s)")
cmdRun.PersistentFlags().BoolVar(&tpccConfig.StoredProcs, "stored-procs", false, "Use PL/pgSQL stored procedures for the 5 TPC-C transactions (postgres driver only)")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Expand Down
18 changes: 18 additions & 0 deletions tpcc/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ const (
)

func (w *Workloader) runDelivery(ctx context.Context, thread int) error {
if w.cfg.StoredProcs {
return w.runDeliveryProc(ctx, thread)
}

s := getTPCCState(ctx)

d := deliveryData{
Expand Down Expand Up @@ -171,3 +175,17 @@ func (w *Workloader) runDelivery(ctx context.Context, thread int) error {
}
return tx.Commit()
}

// runDeliveryProc dispatches DELIVERY as `CALL tpcc_delivery(...)`.
func (w *Workloader) runDeliveryProc(ctx context.Context, thread int) error {
s := getTPCCState(ctx)
wID := randInt(s.R, 1, w.cfg.Warehouses)
oCarrierID := randInt(s.R, 1, 10)

stmt := s.procStmts[tpccCallDelivery]
_, err := stmt.ExecContext(ctx, wID, oCarrierID, time.Now().Format(timeFormat))
if err != nil {
return fmt.Errorf("CALL tpcc_delivery failed: %v", err)
}
return nil
}
69 changes: 69 additions & 0 deletions tpcc/new_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"database/sql"
"fmt"
"time"

"github.com/lib/pq"
)

const (
Expand Down Expand Up @@ -118,6 +120,10 @@ type newOrderData struct {
}

func (w *Workloader) runNewOrder(ctx context.Context, thread int) error {
if w.cfg.StoredProcs {
return w.runNewOrderProc(ctx, thread)
}

s := getTPCCState(ctx)

// refer 2.4.1
Expand Down Expand Up @@ -309,3 +315,66 @@ func (w *Workloader) runNewOrder(ctx context.Context, thread int) error {
}
return tx.Commit()
}

// runNewOrderProc is the stored-procedure-mode variant of runNewOrder. It
// generates the same random inputs but dispatches the whole transaction as
// a single `CALL tpcc_new_order(...)`. The procedure handles the
// 1%-rollback case internally via in-band ROLLBACK, so the caller just
// runs the CALL outside of an explicit transaction (autocommit) and any
// failure surfaces as a normal SQL error.
func (w *Workloader) runNewOrderProc(ctx context.Context, thread int) error {
s := getTPCCState(ctx)

d := newOrderData{
wID: randInt(s.R, 1, w.cfg.Warehouses),
dID: randInt(s.R, 1, districtPerWarehouse),
cID: randCustomerID(s.R),
oOlCnt: randInt(s.R, 5, 15),
}
rbk := randInt(s.R, 1, 100)
allLocal := 1

supplyW := make([]int32, d.oOlCnt)
itemIDs := make([]int32, d.oOlCnt)
quantities := make([]int32, d.oOlCnt)

seen := make(map[int]struct{}, d.oOlCnt)
for i := 0; i < d.oOlCnt; i++ {
var iID int
if i == d.oOlCnt-1 && rbk == 1 {
iID = -1
} else {
for {
id := randItemID(s.R)
if _, ok := seen[id]; ok {
continue
}
seen[id] = struct{}{}
iID = id
break
}
}
itemIDs[i] = int32(iID)

var supply int
if w.cfg.Warehouses == 1 || randInt(s.R, 1, 100) != 1 {
supply = d.wID
} else {
supply = w.otherWarehouse(ctx, d.wID)
allLocal = 0
}
supplyW[i] = int32(supply)
quantities[i] = int32(randInt(s.R, 1, 10))
}

stmt := s.procStmts[tpccCallNewOrder]
_, err := stmt.ExecContext(ctx,
d.wID, d.dID, d.cID, d.oOlCnt,
pq.Array(supplyW), pq.Array(itemIDs), pq.Array(quantities),
time.Now().Format(timeFormat), allLocal,
)
if err != nil {
return fmt.Errorf("CALL tpcc_new_order failed: %v", err)
}
return nil
}
36 changes: 36 additions & 0 deletions tpcc/order_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ type orderStatusData struct {
}

func (w *Workloader) runOrderStatus(ctx context.Context, thread int) error {
if w.cfg.StoredProcs {
return w.runOrderStatusProc(ctx, thread)
}

s := getTPCCState(ctx)
d := orderStatusData{
wID: randInt(s.R, 1, w.cfg.Warehouses),
Expand Down Expand Up @@ -116,3 +120,35 @@ func (w *Workloader) runOrderStatus(ctx context.Context, thread int) error {

return tx.Commit()
}

// runOrderStatusProc dispatches ORDER_STATUS as `CALL tpcc_order_status(...)`.
func (w *Workloader) runOrderStatusProc(ctx context.Context, thread int) error {
s := getTPCCState(ctx)
d := orderStatusData{
wID: randInt(s.R, 1, w.cfg.Warehouses),
dID: randInt(s.R, 1, districtPerWarehouse),
}

if s.R.Intn(100) < 60 {
d.cLast = randCLast(s.R, s.Buf)
} else {
d.cID = randCustomerID(s.R)
}

var cIDArg interface{}
var cLastArg interface{}
if d.cID == 0 {
cIDArg = nil
cLastArg = d.cLast
} else {
cIDArg = d.cID
cLastArg = nil
}

stmt := s.procStmts[tpccCallOrderStatus]
_, err := stmt.ExecContext(ctx, d.wID, d.dID, cIDArg, cLastArg)
if err != nil {
return fmt.Errorf("CALL tpcc_order_status failed: %v", err)
}
return nil
}
51 changes: 51 additions & 0 deletions tpcc/payment.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ type paymentData struct {
}

func (w *Workloader) runPayment(ctx context.Context, thread int) error {
if w.cfg.StoredProcs {
return w.runPaymentProc(ctx, thread)
}

s := getTPCCState(ctx)

d := paymentData{
Expand Down Expand Up @@ -180,3 +184,50 @@ func (w *Workloader) runPayment(ctx context.Context, thread int) error {

return tx.Commit()
}

// runPaymentProc dispatches PAYMENT as `CALL tpcc_payment(...)`. By-last-name
// vs by-id lookup is selected by passing one of c_id / c_last as NULL.
func (w *Workloader) runPaymentProc(ctx context.Context, thread int) error {
s := getTPCCState(ctx)

d := paymentData{
wID: randInt(s.R, 1, w.cfg.Warehouses),
dID: randInt(s.R, 1, districtPerWarehouse),
hAmount: float64(randInt(s.R, 100, 500000)) / float64(100.0),
}

if s.R.Intn(100) < 60 {
d.cLast = randCLast(s.R, s.Buf)
} else {
d.cID = randCustomerID(s.R)
}

if w.cfg.Warehouses == 1 || s.R.Intn(100) < 85 {
d.cWID = d.wID
d.cDID = d.dID
} else {
d.cWID = w.otherWarehouse(ctx, d.wID)
d.cDID = randInt(s.R, 1, districtPerWarehouse)
}

// Pass NULL for whichever lookup key wasn't picked.
var cIDArg interface{}
var cLastArg interface{}
if d.cID == 0 {
cIDArg = nil
cLastArg = d.cLast
} else {
cIDArg = d.cID
cLastArg = nil
}

stmt := s.procStmts[tpccCallPayment]
_, err := stmt.ExecContext(ctx,
d.wID, d.dID, d.cWID, d.cDID, d.hAmount,
cIDArg, cLastArg, time.Now().Format(timeFormat),
)
if err != nil {
return fmt.Errorf("CALL tpcc_payment failed: %v", err)
}
return nil
}
Loading