Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,6 @@ local_scripts/
contrib/xdprocks/

# allure-reports
allure-reports/
allure-reports/
# Claude
CLAUDE.md
35 changes: 20 additions & 15 deletions include/dingosdk/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include <fmt/format.h>

#include <atomic>
#include <cstdint>
#include <memory>
#include <string>
Expand Down Expand Up @@ -166,33 +167,37 @@ struct KeyOpState {
};

struct TraceMetrics {
uint64_t total_time_us;
std::atomic<uint64_t> total_time_us{0};

struct Metric {
uint64_t sdk_time_us;
uint64_t rpc_time_us;
uint64_t retry_count;
uint64_t rpc_retry_count;
std::atomic<uint64_t> sdk_time_us{0};
std::atomic<uint64_t> rpc_time_us{0};
std::atomic<uint64_t> retry_count{0};
std::atomic<uint64_t> rpc_retry_count{0};

std::string ToString() const {
return fmt::format("{} {} {} {}", sdk_time_us.load(std::memory_order_relaxed),
rpc_time_us.load(std::memory_order_relaxed), retry_count.load(std::memory_order_relaxed),
rpc_retry_count.load(std::memory_order_relaxed));
}
};

Metric read_metric;
Metric prewrite_metric;
Metric commit_metric;

uint64_t resolve_lock_time_us;
std::atomic<uint64_t> resolve_lock_time_us{0};

uint64_t sleep_time_us;
uint64_t sleep_count;
std::atomic<uint64_t> sleep_time_us{0};
std::atomic<uint64_t> sleep_count{0};

std::string ToString() {
std::string ToString() const {
return fmt::format(
"total_time_us({}) read({} {} {} {}) prewrite({} {} {} {}) commit({} {} {} {}) "
"total_time_us({}) read({}) prewrite({}) commit({}) "
"resolve_lock_time_us({}) sleep({} {})",
total_time_us, read_metric.sdk_time_us, read_metric.rpc_time_us, read_metric.retry_count,
read_metric.rpc_retry_count, prewrite_metric.sdk_time_us, prewrite_metric.rpc_time_us,
prewrite_metric.retry_count, prewrite_metric.rpc_retry_count, commit_metric.sdk_time_us,
commit_metric.rpc_time_us, commit_metric.retry_count, commit_metric.rpc_retry_count, resolve_lock_time_us,
sleep_time_us, sleep_count);
total_time_us.load(std::memory_order_relaxed), read_metric.ToString(), prewrite_metric.ToString(),
commit_metric.ToString(), resolve_lock_time_us.load(std::memory_order_relaxed),
sleep_time_us.load(std::memory_order_relaxed), sleep_count.load(std::memory_order_relaxed));
}
};

Expand Down
5 changes: 3 additions & 2 deletions src/sdk/common/helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ static Status LogAndSendRpc(const ClientStub& stub, StoreClientRpc& rpc, std::sh
StoreRpcController controller(stub, rpc, region);
Status s = controller.Call();

DINGO_LOG_IF(INFO, fLB::FLAGS_log_rpc_time) << fmt::format("[rpc.{}][{}ms][region.{}] rpc finish", rpc.Method(),
TimestampMs() - start_time_ms, region->RegionId());
DINGO_LOG_IF(INFO, fLB::FLAGS_log_rpc_time)
<< fmt::format("[rpc.{}][{}ms][region.{}] [response.{}]rpc finish", rpc.Method(), TimestampMs() - start_time_ms,
region->RegionId(), rpc.Response()->ShortDebugString());
return s;
}

Expand Down
2 changes: 1 addition & 1 deletion src/sdk/common/param_config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ DEFINE_int64(rpc_time_out_ms, 500000, "rpc call timeout ms");

DEFINE_bool(enable_trace_rpc_performance, true, "enable trance rpc performance, use for debug");
DEFINE_int64(rpc_elapse_time_threshold_us, 1000, "rpc elapse time us threshold");
DEFINE_int64(rpc_trace_full_info_threshold_us, 1000000,
DEFINE_int64(rpc_trace_full_info_threshold_us, 100000,
"log full rpc detail when elapsed time exceeds this threshold (us)");

DEFINE_int64(store_rpc_retry_delay_ms, 500, "store rpc retry delay ms");
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/common/tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include <memory>
#include <string>

#include "sdk/common/helper.h"
#include "sdk/common/helper.h"

namespace dingodb {
namespace sdk {
Expand Down
12 changes: 10 additions & 2 deletions src/sdk/region_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
#include <optional>
#include <vector>

#include <memory>

#include "dingosdk/client.h"
#include "sdk/region.h"
#include "sdk/utils/callback.h"

namespace dingodb {
namespace sdk {

class Tracker;
using TrackerPtr = std::shared_ptr<Tracker>;

class ClientStub;
class RegionScanner {
public:
Expand Down Expand Up @@ -71,19 +76,22 @@ struct ScannerOptions {
std::string end_key;
std::optional<const TransactionOptions> txn_options;
std::optional<int64_t> start_ts;
TrackerPtr tracker;

explicit ScannerOptions(const ClientStub& p_stub, std::shared_ptr<Region> p_region, std::string p_start_key,
std::string p_end_key)
: stub(p_stub), region(std::move(p_region)), start_key(std::move(p_start_key)), end_key(std::move(p_end_key)) {}

explicit ScannerOptions(const ClientStub& p_stub, std::shared_ptr<Region> p_region, std::string p_start_key,
std::string p_end_key, const TransactionOptions p_txn_options, int64_t p_start_ts)
std::string p_end_key, const TransactionOptions p_txn_options, int64_t p_start_ts,
TrackerPtr p_tracker = nullptr)
: stub(p_stub),
region(std::move(p_region)),
start_key(std::move(p_start_key)),
end_key(std::move(p_end_key)),
txn_options(p_txn_options),
start_ts(p_start_ts) {}
start_ts(p_start_ts),
tracker(std::move(p_tracker)) {}
};

class RegionScannerFactory {
Expand Down
5 changes: 5 additions & 0 deletions src/sdk/rpc/rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ class Rpc {

void SetStatus(const Status& s) { status = s; }

void SetTxnId(uint64_t id) { txn_id = id; }

uint64_t GetTxnId() const { return txn_id; }

void IncRetryTimes() { retry_times++; }

int GetRetryTimes() const { return retry_times; }
Expand Down Expand Up @@ -88,6 +92,7 @@ class Rpc {
std::string cmd;
EndPoint end_point;
Status status;
uint64_t txn_id{0};
int retry_times{0};
uint64_t sleep_time_us{0};
uint64_t sleep_count{0};
Expand Down
3 changes: 3 additions & 0 deletions src/sdk/rpc/store_rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ void StoreRpcController::SendStoreRpc() {
void StoreRpcController::MaybeDelay() {
if (NeedDelay(status_)) {
auto delay_ms = FLAGS_store_rpc_retry_delay_ms;
DINGO_LOG(INFO) << fmt::format("[sdk.rpc.{}] txn_id:{} method:{} region({}) sleep {}ms, reason: {}",
rpc_.LogId(), rpc_.GetTxnId(), rpc_.Method(), region_->RegionId(), delay_ms,
status_.ToString());
rpc_.IncSleepCount();
rpc_.IncSleepTimesUs(FLAGS_coordinator_interaction_delay_ms * 1000);
SleepUs(delay_ms * 1000);
Expand Down
12 changes: 10 additions & 2 deletions src/sdk/transaction/txn_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ Status TxnImpl::Rollback() { return DoRollback(); }
bool TxnImpl::IsNeedRetry(int& times) {
bool retry = times++ < FLAGS_txn_op_max_retry;
if (retry) {
DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] sleep {}ms, reason: txn op retry({}/{}).",
GetStartTs(), FLAGS_txn_op_delay_ms, times, FLAGS_txn_op_max_retry);
SleepUs(FLAGS_txn_op_delay_ms * 1000);
}

Expand Down Expand Up @@ -310,7 +312,12 @@ Status TxnImpl::ProcessScanState(ScanState& scan_state, uint64_t limit, std::vec
Status TxnImpl::DoScan(const std::string& start_key, const std::string& end_key, uint64_t limit,
std::vector<KVPair>& out_kvs) {
auto start_time = TimestampUs();
SCOPED_CLEANUP(GetTracer()->IncrementReadSdkTime(TimestampUs() - start_time););
uint64_t rpc_time_before = GetTracer()->ReadRpcTime();
SCOPED_CLEANUP({
uint64_t rpc_time_added = GetTracer()->ReadRpcTime() - rpc_time_before;
uint64_t total_time = TimestampUs() - start_time;
GetTracer()->IncrementReadSdkTime(total_time > rpc_time_added ? total_time - rpc_time_added : 0);
});

// check whether region exist
RegionPtr region;
Expand Down Expand Up @@ -376,7 +383,8 @@ Status TxnImpl::DoScan(const std::string& start_key, const std::string& end_key,
DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] scan region({}) range[{}, {}).", ID(), region->RegionId(),
StringToHex(amend_start_key), StringToHex(amend_end_key));

ScannerOptions scan_options(stub_, region, amend_start_key, amend_end_key, options_, start_ts_.load());
ScannerOptions scan_options(stub_, region, amend_start_key, amend_end_key, options_, start_ts_.load(),
tracker_);
CHECK(stub_.GetTxnRegionScannerFactory()->NewRegionScanner(scan_options, scanner).IsOK());
CHECK(scanner->Open().ok());

Expand Down
2 changes: 1 addition & 1 deletion src/sdk/transaction/txn_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ class TxnImpl : public std::enable_shared_from_this<TxnImpl> {
uint32_t pending_offset{0};
};

static bool IsNeedRetry(int& times);
bool IsNeedRetry(int& times);
static bool IsNeedRetry(const Status& status);
Status LookupRegion(const std::string_view& key, RegionPtr& region);
Status LookupRegion(std::string_view start_key, std::string_view end_key, std::shared_ptr<Region>& region);
Expand Down
60 changes: 55 additions & 5 deletions src/sdk/transaction/txn_lock_resolver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,29 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf
bool force_sync_commit) {
DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] resolve lock, lock_info({}).", start_ts,
conflict_lock_info.ShortDebugString());
// 1. Try hitting the local cache first
int64_t conflict_lock_ts = conflict_lock_info.lock_ts();
std::shared_ptr<TxnStatus> cached_status;
{
// Both read and LRU promotion require WriteLock / Mutex logic
WriteLockGuard guard(cache_rw_lock_);
auto iter = resolved_lock_cache_.find(conflict_lock_ts);
if (iter != resolved_lock_cache_.end()) {
cached_status = iter->second.first;
// Promote to front of LRU
resolved_lock_lru_queue_.splice(resolved_lock_lru_queue_.begin(), resolved_lock_lru_queue_, iter->second.second);
}
}

// If cached status is definitive (Committed or Rollbacked), use it to resolve immediately
if (cached_status != nullptr && (cached_status->IsCommitted() || cached_status->IsRollbacked())) {
DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] hit local cache for lock_ts({}), txn_status({}).", start_ts,
conflict_lock_ts, cached_status->ToString());
if (cached_status->primary_lock_info.use_async_commit() && !force_sync_commit) {
return ResolveLockSecondaryLocks(cached_status->primary_lock_info, start_ts, *cached_status, conflict_lock_info);
}
return ResolveNormalLock(conflict_lock_info, start_ts, *cached_status);
}

// check primary key lock status
TxnStatus txn_status;
Expand All @@ -66,6 +89,10 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf
}
if (conflict_lock_info.lock_ttl() > current_ts) {
// lock ttl not expired, wait and retry
DINGO_LOG(INFO) << fmt::format(
"[sdk.txn.{}] sleep {}ms, reason: TxnNotFound and lock_ttl({}) > current_ts({}), lock_info({}).",
start_ts, FLAGS_txn_check_status_interval_ms, conflict_lock_info.lock_ttl(), current_ts,
conflict_lock_info.ShortDebugString());
SleepUs(FLAGS_txn_check_status_interval_ms * 1000);
} else {
// lock ttl expired, next check will rollback txn if not exist
Expand All @@ -84,12 +111,33 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf
}
if (txn_status.primary_lock_info.lock_ts() > 0 && txn_status.primary_lock_info.lock_ttl() > current_ts) {
// lock ttl not expired, wait and retry
DINGO_LOG(INFO) << fmt::format(
"[sdk.txn.{}] sleep {}ms, reason: TxnLockConflict and lock_ttl({}) > current_ts({}), lock_info({}).",
start_ts, FLAGS_txn_check_status_interval_ms, txn_status.primary_lock_info.lock_ttl(), current_ts,
txn_status.primary_lock_info.ShortDebugString());
SleepUs(FLAGS_txn_check_status_interval_ms * 1000);
}
} else {
return status;
}
} else {
// Save the definitive state to the cache and perform LRU eviction
if (txn_status.IsCommitted() || txn_status.IsRollbacked()) {
WriteLockGuard guard(cache_rw_lock_);
if (resolved_lock_cache_.find(conflict_lock_ts) == resolved_lock_cache_.end()) {
// insert into LRU if not present
resolved_lock_lru_queue_.push_front(conflict_lock_ts);
resolved_lock_cache_[conflict_lock_ts] = {std::make_shared<TxnStatus>(txn_status),
resolved_lock_lru_queue_.begin()};

// check eviction
if (resolved_lock_cache_.size() > max_cache_size_) {
int64_t lru_lock_ts = resolved_lock_lru_queue_.back();
resolved_lock_lru_queue_.pop_back();
resolved_lock_cache_.erase(lru_lock_ts);
}
}
}
break;
}
retry_times++;
Expand Down Expand Up @@ -143,7 +191,7 @@ Status TxnLockResolver::ResolveNormalLock(const pb::store::LockInfo& lock_info,
return Status::OK();
}
std::vector<std::string> keys = {lock_info.key()};
TxnResolveLockTask task_resolve_lock(stub_, lock_info.lock_ts(), keys, txn_status.commit_ts);
TxnResolveLockTask task_resolve_lock(stub_, lock_info.lock_ts(), keys, txn_status.commit_ts, start_ts);
status = task_resolve_lock.Run();
if (!status.IsOK()) {
DINGO_LOG(WARNING) << fmt::format("[sdk.txn.{}] resolve lock fail, lock_ts({}) key({}) txn_status({}) status({}).",
Expand Down Expand Up @@ -187,7 +235,8 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri
std::vector<std::string> secondary_keys(primary_lock_info.secondaries().begin(),
primary_lock_info.secondaries().end());
TxnCheckSecondaryLocksTask txn_check_secondary_locks_task(stub_, std::move(secondary_keys),
primary_lock_info.lock_ts(), txn_secondary_lock_status);
primary_lock_info.lock_ts(), txn_secondary_lock_status,
start_ts);
Status status = txn_check_secondary_locks_task.Run();
if (!status.ok()) {
DINGO_LOG(WARNING) << fmt::format(
Expand All @@ -212,7 +261,8 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri
keys_to_rollback.push_back(lock.key());
}

TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_rollback, 0 /* commit_ts */);
TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_rollback, 0 /* commit_ts */,
start_ts);
status = task_resolve_lock.Run();
if (!status.ok()) {
DINGO_LOG(WARNING) << fmt::format(
Expand All @@ -238,7 +288,7 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri

// commit all locked keys
TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit,
txn_secondary_lock_status.commit_ts);
txn_secondary_lock_status.commit_ts, start_ts);
status = task_resolve_lock.Run();
if (!status.ok()) {
DINGO_LOG(WARNING) << fmt::format(
Expand Down Expand Up @@ -276,7 +326,7 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri
primary_lock_info.ShortDebugString());

// commit all keys
TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit, min_commit_ts);
TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit, min_commit_ts, start_ts);
status = task_resolve_lock.Run();
if (!status.ok()) {
DINGO_LOG(WARNING) << fmt::format(
Expand Down
9 changes: 9 additions & 0 deletions src/sdk/transaction/txn_lock_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

#include <algorithm>
#include <cstdint>
#include <list>
#include <string>
#include <unordered_map>
#include <vector>

#include "dingosdk/status.h"
#include "fmt/core.h"
#include "proto/store.pb.h"
#include "sdk/utils/rw_lock.h"

namespace dingodb {
namespace sdk {
Expand Down Expand Up @@ -108,6 +111,12 @@ class TxnLockResolver {
Status ResolveNormalLock(const pb::store::LockInfo& lock_info, int64_t start_ts, const TxnStatus& txn_status);

const ClientStub& stub_;

// Local LRU cache to prevent thundering herd effect on the same lock_ts
RWLock cache_rw_lock_;
size_t max_cache_size_{10000};
std::list<int64_t> resolved_lock_lru_queue_;
std::unordered_map<int64_t, std::pair<std::shared_ptr<TxnStatus>, std::list<int64_t>::iterator>> resolved_lock_cache_;
};

} // namespace sdk
Expand Down
Loading
Loading