diff --git a/.gitignore b/.gitignore index e8550f2..fff09f1 100644 --- a/.gitignore +++ b/.gitignore @@ -76,4 +76,6 @@ local_scripts/ contrib/xdprocks/ # allure-reports -allure-reports/ \ No newline at end of file +allure-reports/ +# Claude +CLAUDE.md diff --git a/include/dingosdk/client.h b/include/dingosdk/client.h index 94cab85..50eead6 100644 --- a/include/dingosdk/client.h +++ b/include/dingosdk/client.h @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -166,33 +167,37 @@ struct KeyOpState { }; struct TraceMetrics { - uint64_t total_time_us; + std::atomic total_time_us{0}; struct Metric { - uint64_t sdk_time_us; - uint64_t rpc_time_us; - uint64_t retry_count; - uint64_t rpc_retry_count; + std::atomic sdk_time_us{0}; + std::atomic rpc_time_us{0}; + std::atomic retry_count{0}; + std::atomic rpc_retry_count{0}; + + std::string ToString() const { + return fmt::format("{} {} {} {}", sdk_time_us.load(std::memory_order_relaxed), + rpc_time_us.load(std::memory_order_relaxed), retry_count.load(std::memory_order_relaxed), + rpc_retry_count.load(std::memory_order_relaxed)); + } }; Metric read_metric; Metric prewrite_metric; Metric commit_metric; - uint64_t resolve_lock_time_us; + std::atomic resolve_lock_time_us{0}; - uint64_t sleep_time_us; - uint64_t sleep_count; + std::atomic sleep_time_us{0}; + std::atomic sleep_count{0}; - std::string ToString() { + std::string ToString() const { return fmt::format( - "total_time_us({}) read({} {} {} {}) prewrite({} {} {} {}) commit({} {} {} {}) " + "total_time_us({}) read({}) prewrite({}) commit({}) " "resolve_lock_time_us({}) sleep({} {})", - total_time_us, read_metric.sdk_time_us, read_metric.rpc_time_us, read_metric.retry_count, - read_metric.rpc_retry_count, prewrite_metric.sdk_time_us, prewrite_metric.rpc_time_us, - prewrite_metric.retry_count, prewrite_metric.rpc_retry_count, commit_metric.sdk_time_us, - commit_metric.rpc_time_us, commit_metric.retry_count, commit_metric.rpc_retry_count, resolve_lock_time_us, - sleep_time_us, sleep_count); + total_time_us.load(std::memory_order_relaxed), read_metric.ToString(), prewrite_metric.ToString(), + commit_metric.ToString(), resolve_lock_time_us.load(std::memory_order_relaxed), + sleep_time_us.load(std::memory_order_relaxed), sleep_count.load(std::memory_order_relaxed)); } }; diff --git a/src/sdk/common/helper.h b/src/sdk/common/helper.h index d6c0f26..13e17cb 100644 --- a/src/sdk/common/helper.h +++ b/src/sdk/common/helper.h @@ -66,8 +66,9 @@ static Status LogAndSendRpc(const ClientStub& stub, StoreClientRpc& rpc, std::sh StoreRpcController controller(stub, rpc, region); Status s = controller.Call(); - DINGO_LOG_IF(INFO, fLB::FLAGS_log_rpc_time) << fmt::format("[rpc.{}][{}ms][region.{}] rpc finish", rpc.Method(), - TimestampMs() - start_time_ms, region->RegionId()); + DINGO_LOG_IF(INFO, fLB::FLAGS_log_rpc_time) + << fmt::format("[rpc.{}][{}ms][region.{}] [response.{}]rpc finish", rpc.Method(), TimestampMs() - start_time_ms, + region->RegionId(), rpc.Response()->ShortDebugString()); return s; } diff --git a/src/sdk/common/param_config.cc b/src/sdk/common/param_config.cc index e31a122..1ca468d 100644 --- a/src/sdk/common/param_config.cc +++ b/src/sdk/common/param_config.cc @@ -38,7 +38,7 @@ DEFINE_int64(rpc_time_out_ms, 500000, "rpc call timeout ms"); DEFINE_bool(enable_trace_rpc_performance, true, "enable trance rpc performance, use for debug"); DEFINE_int64(rpc_elapse_time_threshold_us, 1000, "rpc elapse time us threshold"); -DEFINE_int64(rpc_trace_full_info_threshold_us, 1000000, +DEFINE_int64(rpc_trace_full_info_threshold_us, 100000, "log full rpc detail when elapsed time exceeds this threshold (us)"); DEFINE_int64(store_rpc_retry_delay_ms, 500, "store rpc retry delay ms"); diff --git a/src/sdk/common/tracker.h b/src/sdk/common/tracker.h index f53cada..3f96ada 100644 --- a/src/sdk/common/tracker.h +++ b/src/sdk/common/tracker.h @@ -19,7 +19,7 @@ #include #include -#include "sdk/common/helper.h" +#include "sdk/common/helper.h" namespace dingodb { namespace sdk { diff --git a/src/sdk/region_scanner.h b/src/sdk/region_scanner.h index ce3befe..abf7cb6 100644 --- a/src/sdk/region_scanner.h +++ b/src/sdk/region_scanner.h @@ -20,6 +20,8 @@ #include #include +#include + #include "dingosdk/client.h" #include "sdk/region.h" #include "sdk/utils/callback.h" @@ -27,6 +29,9 @@ namespace dingodb { namespace sdk { +class Tracker; +using TrackerPtr = std::shared_ptr; + class ClientStub; class RegionScanner { public: @@ -71,19 +76,22 @@ struct ScannerOptions { std::string end_key; std::optional txn_options; std::optional start_ts; + TrackerPtr tracker; explicit ScannerOptions(const ClientStub& p_stub, std::shared_ptr p_region, std::string p_start_key, std::string p_end_key) : stub(p_stub), region(std::move(p_region)), start_key(std::move(p_start_key)), end_key(std::move(p_end_key)) {} explicit ScannerOptions(const ClientStub& p_stub, std::shared_ptr p_region, std::string p_start_key, - std::string p_end_key, const TransactionOptions p_txn_options, int64_t p_start_ts) + std::string p_end_key, const TransactionOptions p_txn_options, int64_t p_start_ts, + TrackerPtr p_tracker = nullptr) : stub(p_stub), region(std::move(p_region)), start_key(std::move(p_start_key)), end_key(std::move(p_end_key)), txn_options(p_txn_options), - start_ts(p_start_ts) {} + start_ts(p_start_ts), + tracker(std::move(p_tracker)) {} }; class RegionScannerFactory { diff --git a/src/sdk/rpc/rpc.h b/src/sdk/rpc/rpc.h index 6d37cfe..2666792 100644 --- a/src/sdk/rpc/rpc.h +++ b/src/sdk/rpc/rpc.h @@ -46,6 +46,10 @@ class Rpc { void SetStatus(const Status& s) { status = s; } + void SetTxnId(uint64_t id) { txn_id = id; } + + uint64_t GetTxnId() const { return txn_id; } + void IncRetryTimes() { retry_times++; } int GetRetryTimes() const { return retry_times; } @@ -88,6 +92,7 @@ class Rpc { std::string cmd; EndPoint end_point; Status status; + uint64_t txn_id{0}; int retry_times{0}; uint64_t sleep_time_us{0}; uint64_t sleep_count{0}; diff --git a/src/sdk/rpc/store_rpc_controller.cc b/src/sdk/rpc/store_rpc_controller.cc index 8536360..90b81b1 100644 --- a/src/sdk/rpc/store_rpc_controller.cc +++ b/src/sdk/rpc/store_rpc_controller.cc @@ -107,6 +107,9 @@ void StoreRpcController::SendStoreRpc() { void StoreRpcController::MaybeDelay() { if (NeedDelay(status_)) { auto delay_ms = FLAGS_store_rpc_retry_delay_ms; + DINGO_LOG(INFO) << fmt::format("[sdk.rpc.{}] txn_id:{} method:{} region({}) sleep {}ms, reason: {}", + rpc_.LogId(), rpc_.GetTxnId(), rpc_.Method(), region_->RegionId(), delay_ms, + status_.ToString()); rpc_.IncSleepCount(); rpc_.IncSleepTimesUs(FLAGS_coordinator_interaction_delay_ms * 1000); SleepUs(delay_ms * 1000); diff --git a/src/sdk/transaction/txn_impl.cc b/src/sdk/transaction/txn_impl.cc index d955d03..f79ffd6 100644 --- a/src/sdk/transaction/txn_impl.cc +++ b/src/sdk/transaction/txn_impl.cc @@ -225,6 +225,8 @@ Status TxnImpl::Rollback() { return DoRollback(); } bool TxnImpl::IsNeedRetry(int& times) { bool retry = times++ < FLAGS_txn_op_max_retry; if (retry) { + DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] sleep {}ms, reason: txn op retry({}/{}).", + GetStartTs(), FLAGS_txn_op_delay_ms, times, FLAGS_txn_op_max_retry); SleepUs(FLAGS_txn_op_delay_ms * 1000); } @@ -310,7 +312,12 @@ Status TxnImpl::ProcessScanState(ScanState& scan_state, uint64_t limit, std::vec Status TxnImpl::DoScan(const std::string& start_key, const std::string& end_key, uint64_t limit, std::vector& out_kvs) { auto start_time = TimestampUs(); - SCOPED_CLEANUP(GetTracer()->IncrementReadSdkTime(TimestampUs() - start_time);); + uint64_t rpc_time_before = GetTracer()->ReadRpcTime(); + SCOPED_CLEANUP({ + uint64_t rpc_time_added = GetTracer()->ReadRpcTime() - rpc_time_before; + uint64_t total_time = TimestampUs() - start_time; + GetTracer()->IncrementReadSdkTime(total_time > rpc_time_added ? total_time - rpc_time_added : 0); + }); // check whether region exist RegionPtr region; @@ -376,7 +383,8 @@ Status TxnImpl::DoScan(const std::string& start_key, const std::string& end_key, DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] scan region({}) range[{}, {}).", ID(), region->RegionId(), StringToHex(amend_start_key), StringToHex(amend_end_key)); - ScannerOptions scan_options(stub_, region, amend_start_key, amend_end_key, options_, start_ts_.load()); + ScannerOptions scan_options(stub_, region, amend_start_key, amend_end_key, options_, start_ts_.load(), + tracker_); CHECK(stub_.GetTxnRegionScannerFactory()->NewRegionScanner(scan_options, scanner).IsOK()); CHECK(scanner->Open().ok()); diff --git a/src/sdk/transaction/txn_impl.h b/src/sdk/transaction/txn_impl.h index 86a9979..c1d8139 100644 --- a/src/sdk/transaction/txn_impl.h +++ b/src/sdk/transaction/txn_impl.h @@ -194,7 +194,7 @@ class TxnImpl : public std::enable_shared_from_this { uint32_t pending_offset{0}; }; - static bool IsNeedRetry(int& times); + bool IsNeedRetry(int& times); static bool IsNeedRetry(const Status& status); Status LookupRegion(const std::string_view& key, RegionPtr& region); Status LookupRegion(std::string_view start_key, std::string_view end_key, std::shared_ptr& region); diff --git a/src/sdk/transaction/txn_lock_resolver.cc b/src/sdk/transaction/txn_lock_resolver.cc index b85aad0..cc1db8c 100644 --- a/src/sdk/transaction/txn_lock_resolver.cc +++ b/src/sdk/transaction/txn_lock_resolver.cc @@ -43,6 +43,29 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf bool force_sync_commit) { DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] resolve lock, lock_info({}).", start_ts, conflict_lock_info.ShortDebugString()); + // 1. Try hitting the local cache first + int64_t conflict_lock_ts = conflict_lock_info.lock_ts(); + std::shared_ptr cached_status; + { + // Both read and LRU promotion require WriteLock / Mutex logic + WriteLockGuard guard(cache_rw_lock_); + auto iter = resolved_lock_cache_.find(conflict_lock_ts); + if (iter != resolved_lock_cache_.end()) { + cached_status = iter->second.first; + // Promote to front of LRU + resolved_lock_lru_queue_.splice(resolved_lock_lru_queue_.begin(), resolved_lock_lru_queue_, iter->second.second); + } + } + + // If cached status is definitive (Committed or Rollbacked), use it to resolve immediately + if (cached_status != nullptr && (cached_status->IsCommitted() || cached_status->IsRollbacked())) { + DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] hit local cache for lock_ts({}), txn_status({}).", start_ts, + conflict_lock_ts, cached_status->ToString()); + if (cached_status->primary_lock_info.use_async_commit() && !force_sync_commit) { + return ResolveLockSecondaryLocks(cached_status->primary_lock_info, start_ts, *cached_status, conflict_lock_info); + } + return ResolveNormalLock(conflict_lock_info, start_ts, *cached_status); + } // check primary key lock status TxnStatus txn_status; @@ -66,6 +89,10 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf } if (conflict_lock_info.lock_ttl() > current_ts) { // lock ttl not expired, wait and retry + DINGO_LOG(INFO) << fmt::format( + "[sdk.txn.{}] sleep {}ms, reason: TxnNotFound and lock_ttl({}) > current_ts({}), lock_info({}).", + start_ts, FLAGS_txn_check_status_interval_ms, conflict_lock_info.lock_ttl(), current_ts, + conflict_lock_info.ShortDebugString()); SleepUs(FLAGS_txn_check_status_interval_ms * 1000); } else { // lock ttl expired, next check will rollback txn if not exist @@ -84,12 +111,33 @@ Status TxnLockResolver::ResolveLock(const pb::store::LockInfo& conflict_lock_inf } if (txn_status.primary_lock_info.lock_ts() > 0 && txn_status.primary_lock_info.lock_ttl() > current_ts) { // lock ttl not expired, wait and retry + DINGO_LOG(INFO) << fmt::format( + "[sdk.txn.{}] sleep {}ms, reason: TxnLockConflict and lock_ttl({}) > current_ts({}), lock_info({}).", + start_ts, FLAGS_txn_check_status_interval_ms, txn_status.primary_lock_info.lock_ttl(), current_ts, + txn_status.primary_lock_info.ShortDebugString()); SleepUs(FLAGS_txn_check_status_interval_ms * 1000); } } else { return status; } } else { + // Save the definitive state to the cache and perform LRU eviction + if (txn_status.IsCommitted() || txn_status.IsRollbacked()) { + WriteLockGuard guard(cache_rw_lock_); + if (resolved_lock_cache_.find(conflict_lock_ts) == resolved_lock_cache_.end()) { + // insert into LRU if not present + resolved_lock_lru_queue_.push_front(conflict_lock_ts); + resolved_lock_cache_[conflict_lock_ts] = {std::make_shared(txn_status), + resolved_lock_lru_queue_.begin()}; + + // check eviction + if (resolved_lock_cache_.size() > max_cache_size_) { + int64_t lru_lock_ts = resolved_lock_lru_queue_.back(); + resolved_lock_lru_queue_.pop_back(); + resolved_lock_cache_.erase(lru_lock_ts); + } + } + } break; } retry_times++; @@ -143,7 +191,7 @@ Status TxnLockResolver::ResolveNormalLock(const pb::store::LockInfo& lock_info, return Status::OK(); } std::vector keys = {lock_info.key()}; - TxnResolveLockTask task_resolve_lock(stub_, lock_info.lock_ts(), keys, txn_status.commit_ts); + TxnResolveLockTask task_resolve_lock(stub_, lock_info.lock_ts(), keys, txn_status.commit_ts, start_ts); status = task_resolve_lock.Run(); if (!status.IsOK()) { DINGO_LOG(WARNING) << fmt::format("[sdk.txn.{}] resolve lock fail, lock_ts({}) key({}) txn_status({}) status({}).", @@ -187,7 +235,8 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri std::vector secondary_keys(primary_lock_info.secondaries().begin(), primary_lock_info.secondaries().end()); TxnCheckSecondaryLocksTask txn_check_secondary_locks_task(stub_, std::move(secondary_keys), - primary_lock_info.lock_ts(), txn_secondary_lock_status); + primary_lock_info.lock_ts(), txn_secondary_lock_status, + start_ts); Status status = txn_check_secondary_locks_task.Run(); if (!status.ok()) { DINGO_LOG(WARNING) << fmt::format( @@ -212,7 +261,8 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri keys_to_rollback.push_back(lock.key()); } - TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_rollback, 0 /* commit_ts */); + TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_rollback, 0 /* commit_ts */, + start_ts); status = task_resolve_lock.Run(); if (!status.ok()) { DINGO_LOG(WARNING) << fmt::format( @@ -238,7 +288,7 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri // commit all locked keys TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit, - txn_secondary_lock_status.commit_ts); + txn_secondary_lock_status.commit_ts, start_ts); status = task_resolve_lock.Run(); if (!status.ok()) { DINGO_LOG(WARNING) << fmt::format( @@ -276,7 +326,7 @@ Status TxnLockResolver::ResolveLockSecondaryLocks(const pb::store::LockInfo& pri primary_lock_info.ShortDebugString()); // commit all keys - TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit, min_commit_ts); + TxnResolveLockTask task_resolve_lock(stub_, primary_lock_info.lock_ts(), keys_to_commit, min_commit_ts, start_ts); status = task_resolve_lock.Run(); if (!status.ok()) { DINGO_LOG(WARNING) << fmt::format( diff --git a/src/sdk/transaction/txn_lock_resolver.h b/src/sdk/transaction/txn_lock_resolver.h index b1fe3ef..4fde870 100644 --- a/src/sdk/transaction/txn_lock_resolver.h +++ b/src/sdk/transaction/txn_lock_resolver.h @@ -19,12 +19,15 @@ #include #include +#include #include +#include #include #include "dingosdk/status.h" #include "fmt/core.h" #include "proto/store.pb.h" +#include "sdk/utils/rw_lock.h" namespace dingodb { namespace sdk { @@ -108,6 +111,12 @@ class TxnLockResolver { Status ResolveNormalLock(const pb::store::LockInfo& lock_info, int64_t start_ts, const TxnStatus& txn_status); const ClientStub& stub_; + + // Local LRU cache to prevent thundering herd effect on the same lock_ts + RWLock cache_rw_lock_; + size_t max_cache_size_{10000}; + std::list resolved_lock_lru_queue_; + std::unordered_map, std::list::iterator>> resolved_lock_cache_; }; } // namespace sdk diff --git a/src/sdk/transaction/txn_region_scanner_impl.cc b/src/sdk/transaction/txn_region_scanner_impl.cc index 7496946..4134f09 100644 --- a/src/sdk/transaction/txn_region_scanner_impl.cc +++ b/src/sdk/transaction/txn_region_scanner_impl.cc @@ -32,7 +32,7 @@ namespace sdk { TxnRegionScannerImpl::TxnRegionScannerImpl(const ClientStub& stub, RegionPtr region, const TransactionOptions& txn_options, int64_t txn_start_ts, - std::string start_key, std::string end_key) + std::string start_key, std::string end_key, TrackerPtr tracker) : RegionScanner(stub, region), txn_options_(txn_options), txn_start_ts_(txn_start_ts), @@ -40,7 +40,8 @@ TxnRegionScannerImpl::TxnRegionScannerImpl(const ClientStub& stub, RegionPtr reg end_key_(std::move(end_key)), opened_(false), has_more_(false), - batch_size_(FLAGS_scan_batch_size) {} + batch_size_(FLAGS_scan_batch_size), + tracker_(std::move(tracker)) {} TxnRegionScannerImpl::~TxnRegionScannerImpl() { Close(); } @@ -64,6 +65,7 @@ std::unique_ptr TxnRegionScannerImpl::GenTxnScanRpc(uint64_t resolve auto rpc = std::make_unique(); rpc->MutableRequest()->set_start_ts(txn_start_ts_); + rpc->SetTxnId(txn_start_ts_); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), {resolved_lock}, ToIsolationLevel(txn_options_.isolation)); @@ -93,16 +95,29 @@ Status TxnRegionScannerImpl::NextBatch(std::vector& kvs) { rpc = GenTxnScanRpc(resolved_lock); DINGO_RETURN_NOT_OK(LogAndSendRpc(stub, *rpc, region)); + if (tracker_) { + tracker_->IncrementReadRpcTime(rpc->ElapsedTimeUs()); + tracker_->IncrementReadRpcRetryCount(rpc->GetRetryTimes()); + tracker_->IncrementSleepTime(rpc->GetSleepTimesUs()); + tracker_->IncrementSleepCount(rpc->GetSleepCount()); + } + const auto* response = rpc->Response(); if (response->has_txn_result()) { const auto& txn_result = response->txn_result(); status = CheckTxnResultInfo(txn_result); if (status.IsTxnLockConflict()) { + auto lock_start = TimestampUs(); status = stub.GetTxnLockResolver()->ResolveLock(txn_result.locked(), txn_start_ts_); + if (tracker_) { + tracker_->IncrementResolveLockTime(TimestampUs() - lock_start); + } if (status.ok()) { + if (tracker_) tracker_->IncrementReadRetryCount(1); continue; } else if (status.IsPushMinCommitTs()) { resolved_lock = txn_result.locked().lock_ts(); + if (tracker_) tracker_->IncrementReadRetryCount(1); continue; } } @@ -118,16 +133,18 @@ Status TxnRegionScannerImpl::NextBatch(std::vector& kvs) { return status; } - const auto* response = rpc->Response(); + auto* mut_response = rpc->MutableResponse(); - for (const auto& kv : response->kvs()) { + for (auto& kv : *mut_response->mutable_kvs()) { DINGO_LOG(DEBUG) << fmt::format("[sdk.txn.{}] scan region({}) key({}) value({}).", txn_start_ts_, region->RegionId(), StringToHex(kv.key()), StringToHex(kv.value())); - kvs.push_back({kv.key(), kv.value()}); + + // Transfer ownership of the string from its underlying allocation + kvs.push_back({std::move(*kv.mutable_key()), std::move(*kv.mutable_value())}); } - has_more_ = response->stream_meta().has_more(); - stream_id_ = response->stream_meta().stream_id(); + has_more_ = mut_response->stream_meta().has_more(); + stream_id_ = mut_response->stream_meta().stream_id(); return status; } @@ -149,7 +166,15 @@ Status TxnRegionScannerImpl::SetBatchSize(int64_t size) { bool TxnRegionScannerImpl::IsNeedRetry(int& times) { bool retry = times++ < FLAGS_txn_op_max_retry; if (retry) { - SleepUs(FLAGS_txn_op_delay_ms * 1000); + uint64_t sleep_us = FLAGS_txn_op_delay_ms * 1000; + DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] sleep {}ms, reason: scan retry({}/{}), region({}).", + txn_start_ts_, FLAGS_txn_op_delay_ms, times, FLAGS_txn_op_max_retry, + region->RegionId()); + SleepUs(sleep_us); + if (tracker_) { + tracker_->IncrementSleepTime(sleep_us); + tracker_->IncrementSleepCount(1); + } } return retry; @@ -176,7 +201,8 @@ Status TxnRegionScannerFactoryImpl::NewRegionScanner(const ScannerOptions& optio "end_key({}) should little than region range end_key({})", options.end_key, options.region->GetRange().end_key); RegionScannerPtr tmp(new TxnRegionScannerImpl(options.stub, options.region, options.txn_options.value(), - options.start_ts.value(), options.start_key, options.end_key)); + options.start_ts.value(), options.start_key, options.end_key, + options.tracker)); scanner = std::move(tmp); return Status::OK(); diff --git a/src/sdk/transaction/txn_region_scanner_impl.h b/src/sdk/transaction/txn_region_scanner_impl.h index ebd062d..75916a0 100644 --- a/src/sdk/transaction/txn_region_scanner_impl.h +++ b/src/sdk/transaction/txn_region_scanner_impl.h @@ -21,6 +21,7 @@ #include "dingosdk/client.h" #include "dingosdk/status.h" +#include "sdk/common/tracker.h" #include "sdk/region_scanner.h" #include "sdk/rpc/store_rpc.h" @@ -33,7 +34,8 @@ class TestBase; class TxnRegionScannerImpl : public RegionScanner { public: explicit TxnRegionScannerImpl(const ClientStub& stub, RegionPtr region, const TransactionOptions& txn_options, - int64_t txn_start_ts, std::string start_key, std::string end_key); + int64_t txn_start_ts, std::string start_key, std::string end_key, + TrackerPtr tracker = nullptr); ~TxnRegionScannerImpl() override; @@ -66,7 +68,7 @@ class TxnRegionScannerImpl : public RegionScanner { private: std::unique_ptr GenTxnScanRpc(uint64_t resolved_lock); - static bool IsNeedRetry(int& times); + bool IsNeedRetry(int& times); const TransactionOptions txn_options_; int64_t txn_start_ts_; @@ -76,6 +78,7 @@ class TxnRegionScannerImpl : public RegionScanner { bool opened_; bool has_more_; std::string stream_id_; + TrackerPtr tracker_; }; class TxnRegionScannerFactoryImpl final : public RegionScannerFactory { diff --git a/src/sdk/transaction/txn_task/txn_batch_get_task.cc b/src/sdk/transaction/txn_task/txn_batch_get_task.cc index 8c242ed..6df7e72 100644 --- a/src/sdk/transaction/txn_task/txn_batch_get_task.cc +++ b/src/sdk/transaction/txn_task/txn_batch_get_task.cc @@ -98,6 +98,7 @@ void TxnBatchGetTask::DoAsync() { auto rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), {resolved_lock}, ToIsolationLevel(txn_impl_->GetOptions().isolation)); for (const auto& key : entry.second) { diff --git a/src/sdk/transaction/txn_task/txn_batch_rollback_task.cc b/src/sdk/transaction/txn_task/txn_batch_rollback_task.cc index f215a96..c2a0842 100644 --- a/src/sdk/transaction/txn_task/txn_batch_rollback_task.cc +++ b/src/sdk/transaction/txn_task/txn_batch_rollback_task.cc @@ -61,12 +61,20 @@ void TxnBatchRollbackTask::DoAsync() { std::unordered_map> region_id_to_keys; auto meta_cache = stub.GetMetaCache(); + std::shared_ptr cached_region = nullptr; for (const auto& key : next_batch) { std::shared_ptr tmp; - Status s = meta_cache->LookupRegionByKey(key, tmp); - if (!s.ok()) { - DoAsyncDone(s); - return; + if (cached_region != nullptr && + key >= cached_region->GetRange().start_key && + key < cached_region->GetRange().end_key) { + tmp = cached_region; + } else { + Status s = meta_cache->LookupRegionByKey(key, tmp); + if (!s.ok()) { + DoAsyncDone(s); + return; + } + cached_region = tmp; } auto iter = region_id_to_region.find(tmp->RegionId()); if (iter == region_id_to_region.end()) { @@ -84,6 +92,7 @@ void TxnBatchRollbackTask::DoAsync() { auto rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), ToIsolationLevel(txn_impl_->GetOptions().isolation)); diff --git a/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.cc b/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.cc index 3e5ac21..4d997b3 100644 --- a/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.cc +++ b/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.cc @@ -64,12 +64,20 @@ void TxnCheckSecondaryLocksTask::DoAsync() { std::unordered_map> region_id_to_keys; auto meta_cache = stub.GetMetaCache(); + std::shared_ptr cached_region = nullptr; for (const auto& key : next_batch) { std::shared_ptr tmp; - Status s = meta_cache->LookupRegionByKey(key, tmp); - if (!s.ok()) { - DoAsyncDone(s); - return; + if (cached_region != nullptr && + key >= cached_region->GetRange().start_key && + key < cached_region->GetRange().end_key) { + tmp = cached_region; + } else { + Status s = meta_cache->LookupRegionByKey(key, tmp); + if (!s.ok()) { + DoAsyncDone(s); + return; + } + cached_region = tmp; } auto iter = region_id_to_region.find(tmp->RegionId()); if (iter == region_id_to_region.end()) { @@ -89,6 +97,7 @@ void TxnCheckSecondaryLocksTask::DoAsync() { auto rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(primary_lock_start_ts_); + rpc->SetTxnId(caller_start_ts_); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), pb::store::IsolationLevel::SnapshotIsolation); for (const auto& key : entry.second) { diff --git a/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.h b/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.h index 4050812..94b10ae 100644 --- a/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.h +++ b/src/sdk/transaction/txn_task/txn_check_secondary_locks_task.h @@ -33,11 +33,13 @@ namespace sdk { class TxnCheckSecondaryLocksTask : public TxnTask { public: TxnCheckSecondaryLocksTask(const ClientStub& stub, const std::vector secondary_keys, - int64_t primary_lock_start_ts, TxnSecondaryLockStatus& txn_check_secondary_status) + int64_t primary_lock_start_ts, TxnSecondaryLockStatus& txn_check_secondary_status, + int64_t caller_start_ts = 0) : TxnTask(stub), secondary_keys_(secondary_keys), primary_lock_start_ts_(primary_lock_start_ts), - txn_check_secondary_status_(txn_check_secondary_status) {} + txn_check_secondary_status_(txn_check_secondary_status), + caller_start_ts_(caller_start_ts) {} ~TxnCheckSecondaryLocksTask() override = default; @@ -54,6 +56,7 @@ class TxnCheckSecondaryLocksTask : public TxnTask { TxnSecondaryLockStatus& txn_check_secondary_status_; std::set next_keys_; int64_t primary_lock_start_ts_{0}; + int64_t caller_start_ts_{0}; std::vector controllers_; std::vector> rpcs_; diff --git a/src/sdk/transaction/txn_task/txn_commit_task.cc b/src/sdk/transaction/txn_task/txn_commit_task.cc index bf64aad..9d25499 100644 --- a/src/sdk/transaction/txn_task/txn_commit_task.cc +++ b/src/sdk/transaction/txn_task/txn_commit_task.cc @@ -97,6 +97,7 @@ void TxnCommitTask::DoAsync() { auto rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); rpc->MutableRequest()->set_commit_ts(txn_impl_->GetCommitTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), ToIsolationLevel(txn_impl_->GetOptions().isolation)); @@ -112,6 +113,7 @@ void TxnCommitTask::DoAsync() { // reset rpc rpc = std::make_unique(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); rpc->MutableRequest()->set_commit_ts(txn_impl_->GetCommitTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), ToIsolationLevel(txn_impl_->GetOptions().isolation)); @@ -185,6 +187,14 @@ void TxnCommitTask::TxnCommitRpcCallback(const Status& status, TxnCommitRpc* rpc } } +bool TxnCommitTask::NeedRetry() { + bool retry = TxnTask::NeedRetry(); + if (retry) { + txn_impl_->GetTracer()->IncrementCommitRetryCount(1); + } + return retry; +} + Status TxnCommitTask::ProcessTxnCommitResponse(const TxnCommitResponse* response, bool is_primary) { std::string pk = txn_impl_->GetPrimaryKey(); int64_t txn_id = txn_impl_->ID(); diff --git a/src/sdk/transaction/txn_task/txn_commit_task.h b/src/sdk/transaction/txn_task/txn_commit_task.h index 19e145d..7a4b4d9 100644 --- a/src/sdk/transaction/txn_task/txn_commit_task.h +++ b/src/sdk/transaction/txn_task/txn_commit_task.h @@ -51,6 +51,8 @@ class TxnCommitTask : public TxnTask { Status ProcessTxnCommitResponse(const TxnCommitResponse* response, bool is_primary); + bool NeedRetry() override; + const std::vector keys_; bool is_primary_{false}; std::shared_ptr txn_impl_; diff --git a/src/sdk/transaction/txn_task/txn_get_task.cc b/src/sdk/transaction/txn_task/txn_get_task.cc index 13cdbdc..baf63b4 100644 --- a/src/sdk/transaction/txn_task/txn_get_task.cc +++ b/src/sdk/transaction/txn_task/txn_get_task.cc @@ -41,6 +41,7 @@ void TxnGetTask::DoAsync() { rpc_.MutableRequest()->Clear(); rpc_.MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc_.SetTxnId(txn_impl_->GetStartTs()); rpc_.MutableRequest()->set_key(key_); FillRpcContext(*rpc_.MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), {resolved_lock_}, ToIsolationLevel(txn_impl_->GetOptions().isolation)); diff --git a/src/sdk/transaction/txn_task/txn_heartbeat_task.cc b/src/sdk/transaction/txn_task/txn_heartbeat_task.cc index 9fe642e..f19d0c7 100644 --- a/src/sdk/transaction/txn_task/txn_heartbeat_task.cc +++ b/src/sdk/transaction/txn_task/txn_heartbeat_task.cc @@ -49,6 +49,7 @@ void TxnHeartbeatTask::DoAsync() { FillRpcContext(*rpc_.MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), pb::store::IsolationLevel::SnapshotIsolation); rpc_.MutableRequest()->set_start_ts(lock_ts_); + rpc_.SetTxnId(lock_ts_); rpc_.MutableRequest()->set_primary_lock(primary_key_); rpc_.MutableRequest()->set_advise_lock_ttl(physical_ts_ + FLAGS_txn_heartbeat_lock_delay_ms); diff --git a/src/sdk/transaction/txn_task/txn_prewrite_task.cc b/src/sdk/transaction/txn_task/txn_prewrite_task.cc index defa1b7..9b47265 100644 --- a/src/sdk/transaction/txn_task/txn_prewrite_task.cc +++ b/src/sdk/transaction/txn_task/txn_prewrite_task.cc @@ -134,6 +134,7 @@ void TxnPrewriteTask::DoAsync() { auto rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), ToIsolationLevel(txn_impl_->GetOptions().isolation)); rpc->MutableRequest()->set_primary_lock(primary_key_); @@ -164,6 +165,7 @@ void TxnPrewriteTask::DoAsync() { rpc = std::make_unique(); rpc->MutableRequest()->Clear(); rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs()); + rpc->SetTxnId(txn_impl_->GetStartTs()); FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), ToIsolationLevel(txn_impl_->GetOptions().isolation)); rpc->MutableRequest()->set_primary_lock(primary_key_); @@ -339,6 +341,7 @@ bool TxnPrewriteTask::NeedRetry() { if (IsRetryError()) { retry_count_++; if (retry_count_ < FLAGS_txn_prewrite_max_retry) { + txn_impl_->GetTracer()->IncrementPrewriteRetryCount(1); return true; } else { std::string msg = diff --git a/src/sdk/transaction/txn_task/txn_resolve_lock_task.cc b/src/sdk/transaction/txn_task/txn_resolve_lock_task.cc index f44eb76..419021f 100644 --- a/src/sdk/transaction/txn_task/txn_resolve_lock_task.cc +++ b/src/sdk/transaction/txn_task/txn_resolve_lock_task.cc @@ -43,7 +43,7 @@ Status TxnResolveLockTask::Init() { } void TxnResolveLockTask::DoAsync() { - std::unordered_set next_batch; + std::set next_batch; { WriteLockGuard guard(rw_lock_); next_batch = next_keys_; @@ -62,12 +62,20 @@ void TxnResolveLockTask::DoAsync() { std::unordered_map> region_id_to_keys; auto meta_cache = stub.GetMetaCache(); + std::shared_ptr cached_region = nullptr; for (const auto& key : next_batch) { std::shared_ptr tmp; - Status s = meta_cache->LookupRegionByKey(key, tmp); - if (!s.ok()) { - DoAsyncDone(s); - return; + if (cached_region != nullptr && + key >= cached_region->GetRange().start_key && + key < cached_region->GetRange().end_key) { + tmp = cached_region; + } else { + Status s = meta_cache->LookupRegionByKey(key, tmp); + if (!s.ok()) { + DoAsyncDone(s); + return; + } + cached_region = tmp; } auto iter = region_id_to_region.find(tmp->RegionId()); if (iter == region_id_to_region.end()) { @@ -86,6 +94,7 @@ void TxnResolveLockTask::DoAsync() { FillRpcContext(*rpc->MutableRequest()->mutable_context(), region->RegionId(), region->GetEpoch(), pb::store::IsolationLevel::SnapshotIsolation); rpc->MutableRequest()->set_start_ts(lock_ts_); + rpc->SetTxnId(caller_start_ts_); rpc->MutableRequest()->set_commit_ts(commit_ts_); for (const auto& key : entry.second) { *rpc->MutableRequest()->add_keys() = key; diff --git a/src/sdk/transaction/txn_task/txn_resolve_lock_task.h b/src/sdk/transaction/txn_task/txn_resolve_lock_task.h index f436f12..b47e8c7 100644 --- a/src/sdk/transaction/txn_task/txn_resolve_lock_task.h +++ b/src/sdk/transaction/txn_task/txn_resolve_lock_task.h @@ -16,9 +16,9 @@ #define TXN_RESOLVE_LOCK_TASK_H #include +#include #include #include -#include #include #include "dingosdk/status.h" @@ -34,8 +34,9 @@ namespace sdk { class TxnResolveLockTask : public TxnTask { public: - TxnResolveLockTask(const ClientStub& stub, int64_t lock_ts, const std::vector& keys, int64_t commit_ts) - : TxnTask(stub), lock_ts_(lock_ts), keys_(keys), commit_ts_(commit_ts) {} + TxnResolveLockTask(const ClientStub& stub, int64_t lock_ts, const std::vector& keys, int64_t commit_ts, + int64_t caller_start_ts = 0) + : TxnTask(stub), lock_ts_(lock_ts), keys_(keys), commit_ts_(commit_ts), caller_start_ts_(caller_start_ts) {} ~TxnResolveLockTask() override = default; @@ -51,11 +52,12 @@ class TxnResolveLockTask : public TxnTask { int64_t lock_ts_{0}; const std::vector& keys_; int64_t commit_ts_{0}; + int64_t caller_start_ts_{0}; std::vector controllers_; std::vector> rpcs_; std::atomic sub_tasks_count_{0}; - std::unordered_set next_keys_; + std::set next_keys_; RWLock rw_lock_; Status status_; diff --git a/store-proto b/store-proto index 1d5f3c8..127fc1e 160000 --- a/store-proto +++ b/store-proto @@ -1 +1 @@ -Subproject commit 1d5f3c8254d19234bcd575a5dd94e4cdb8bf3047 +Subproject commit 127fc1e81516bdf9ace70e012805816e1eef12af diff --git a/test/unit_test/sdk/transaction/test_txn_region_scanner.cc b/test/unit_test/sdk/transaction/test_txn_region_scanner.cc new file mode 100644 index 0000000..08e3cba --- /dev/null +++ b/test/unit_test/sdk/transaction/test_txn_region_scanner.cc @@ -0,0 +1,97 @@ +// Copyright (c) 2023 dingodb.com, Inc. All Rights Reserved +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include + +#include "gtest/gtest.h" +#include "gmock/gmock.h" +#include "sdk/transaction/txn_region_scanner_impl.h" +#include "../mock_client_stub.h" +#include "../mock_rpc_client.h" +#include "proto/error.pb.h" + +namespace dingodb { +namespace sdk { + +class TxnRegionScannerImplTest : public testing::Test { + protected: + void SetUp() override { + stub = std::make_unique(); + RpcClientOptions rpc_options; + rpc_client = std::make_shared(rpc_options); + + EXPECT_CALL(*stub, GetRpcClient()).WillRepeatedly(testing::Return(rpc_client)); + + dingodb::pb::common::Range range; + range.set_start_key("a"); + range.set_end_key("z"); + dingodb::pb::common::RegionEpoch epoch; + epoch.set_conf_version(1); + epoch.set_version(1); + Replica replica; + replica.end_point = EndPoint("127.0.0.1", 8080); + replica.role = kLeader; + region = std::make_shared(1, range, epoch, dingodb::pb::common::RegionType::STORE_REGION, std::vector{replica}); + } + + std::unique_ptr stub; + std::shared_ptr rpc_client; + std::shared_ptr region; +}; + +TEST_F(TxnRegionScannerImplTest, NextBatchZeroCopy) { + TransactionOptions options; + options.kind = kOptimistic; + options.isolation = kSnapshotIsolation; + + TxnRegionScannerImpl scanner(*stub, region, options, 100, "a", "c"); + + EXPECT_CALL(*rpc_client, SendRpc).WillOnce([&](Rpc& rpc, std::function cb) { + auto* txn_rpc = dynamic_cast(&rpc); + CHECK_NOTNULL(txn_rpc); + + auto* response = txn_rpc->MutableResponse(); + auto* kv1 = response->add_kvs(); + kv1->set_key("a1"); + // Ensure we are setting a value that has an underlying allocation we can steal + std::string v1{"val1"}; + kv1->set_value(v1); + + auto* kv2 = response->add_kvs(); + kv2->set_key("a2"); + std::string v2{"val2"}; + kv2->set_value(v2); + + response->mutable_stream_meta()->set_has_more(false); + + cb(); + }); + + EXPECT_TRUE(scanner.Open().ok()); + + std::vector kvs; + Status s = scanner.NextBatch(kvs); + EXPECT_TRUE(s.ok()); + EXPECT_EQ(kvs.size(), 2); + EXPECT_EQ(kvs[0].key, "a1"); + EXPECT_EQ(kvs[0].value, "val1"); + EXPECT_EQ(kvs[1].key, "a2"); + EXPECT_EQ(kvs[1].value, "val2"); + + EXPECT_FALSE(scanner.HasMore()); +} + +} // namespace sdk +} // namespace dingodb