Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/sdk/rawkv/raw_kv_batch_compare_and_set_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ void RawKvBatchCompareAndSetTask::DoAsync() {
controllers_.clear();
rpcs_.clear();

controllers_.reserve(region_keys.size());
rpcs_.reserve(region_keys.size());
for (const auto& entry : region_keys) {
auto region_id = entry.first;

Expand All @@ -115,9 +117,7 @@ void RawKvBatchCompareAndSetTask::DoAsync() {
*(rpc->MutableRequest()->add_expect_values()) = key_context->second.expected_value;
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(controller);

controllers_.emplace_back(stub, *rpc, region);
rpcs_.push_back(std::move(rpc));
}

Expand Down
6 changes: 3 additions & 3 deletions src/sdk/rawkv/raw_kv_batch_delete_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ void RawKvBatchDeleteTask::DoAsync() {
controllers_.clear();
rpcs_.clear();

controllers_.reserve(region_keys.size());
rpcs_.reserve(region_keys.size());
for (const auto& entry : region_keys) {
auto region_id = entry.first;

Expand All @@ -87,9 +89,7 @@ void RawKvBatchDeleteTask::DoAsync() {
*(rpc->MutableRequest()->add_keys()) = key;
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(controller);

controllers_.emplace_back(stub, *rpc, region);
rpcs_.push_back(std::move(rpc));
}

Expand Down
6 changes: 3 additions & 3 deletions src/sdk/rawkv/raw_kv_batch_get_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ void RawKvBatchGetTask::DoAsync() {
controllers_.clear();
rpcs_.clear();

controllers_.reserve(region_keys.size());
rpcs_.reserve(region_keys.size());
for (const auto& entry : region_keys) {
auto region_id = entry.first;

Expand All @@ -92,9 +94,7 @@ void RawKvBatchGetTask::DoAsync() {
*fill = key;
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(controller);

controllers_.emplace_back(stub, *rpc, region);
rpcs_.push_back(std::move(rpc));
}

Expand Down
5 changes: 3 additions & 2 deletions src/sdk/rawkv/raw_kv_batch_put_if_absent_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ void RawKvBatchPutIfAbsentTask::DoAsync() {
controllers_.clear();
rpcs_.clear();

controllers_.reserve(region_keys.size());
rpcs_.reserve(region_keys.size());
for (const auto& entry : region_keys) {
auto region_id = entry.first;

Expand All @@ -94,8 +96,7 @@ void RawKvBatchPutIfAbsentTask::DoAsync() {
fill->set_value(kv->value);
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(controller);
controllers_.emplace_back(stub, *rpc, region);

rpcs_.push_back(std::move(rpc));
}
Expand Down
5 changes: 3 additions & 2 deletions src/sdk/rawkv/raw_kv_batch_put_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ void RawKvBatchPutTask::DoAsync() {
controllers_.clear();
rpcs_.clear();

controllers_.reserve(region_keys.size());
rpcs_.reserve(region_keys.size());
for (const auto& entry : region_keys) {
auto region_id = entry.first;

Expand All @@ -92,8 +94,7 @@ void RawKvBatchPutTask::DoAsync() {
fill->set_value(kv->value);
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(controller);
controllers_.emplace_back(stub, *rpc, region);

rpcs_.push_back(std::move(rpc));
}
Expand Down
33 changes: 24 additions & 9 deletions src/sdk/rpc/brpc/unary_rpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class UnaryRpc : public Rpc {
(end_time - start_time) / 1e6, endpoint2str(controller.remote_side()).c_str(),
str);
}
TraceRpcPerformance(end_time - start_time, Method(), endpoint2str(controller.remote_side()).c_str(), str);
// TraceRpcPerformance(end_time - start_time, Method(), endpoint2str(controller.remote_side()).c_str(), str);

if (FLAGS_enable_rocksdb_perf_metric && !controller.Failed()) {
const auto* descriptor = response->GetDescriptor();
Expand All @@ -131,7 +131,11 @@ class UnaryRpc : public Rpc {
const auto* ti_ref = time_info.GetReflection();
const auto* ti_desc = time_info.GetDescriptor();
const auto* elapsed_times_field = ti_desc->FindFieldByName("elapsed_times");
uint64_t total_time_ns = ti_ref->GetInt64(time_info, ti_desc->FindFieldByName("total_rpc_time_ns"));
int64_t total_time_ns = ti_ref->GetInt64(time_info, ti_desc->FindFieldByName("total_rpc_time_ns"));
int64_t raft_commit_time_ns = ti_ref->GetInt64(time_info, ti_desc->FindFieldByName("raft_commit_time_ns"));
int64_t total_mvcc_version = 0;
int64_t total_internal_skipped_count = 0;
int64_t total_phase_time_us = 0;
std::string elapsed_str;
if (elapsed_times_field != nullptr) {
int count = ti_ref->FieldSize(time_info, elapsed_times_field);
Expand All @@ -142,26 +146,37 @@ class UnaryRpc : public Rpc {
std::string name = et_ref->GetString(et, et_desc->FindFieldByName("name"));
uint64_t time_us = et_ref->GetUInt64(et, et_desc->FindFieldByName("time_us"));
int64_t skip_version = et_ref->GetInt64(et, et_desc->FindFieldByName("skip_version"));
total_mvcc_version += skip_version;
total_phase_time_us += time_us;
uint64_t io_time_ns = 0;
uint64_t cache_hit_count = 0;
uint64_t miss_block_cache_count = 0;
uint64_t internal_skipped_count = 0;
const auto* perf_field = et_desc->FindFieldByName("rocksdb_perf_summary");
uint64_t internal_tombstone_count = 0;
const auto* perf_field = et_desc->FindFieldByName("storage_engine_perf_summary");
if (perf_field != nullptr && et_ref->HasField(et, perf_field)) {
const auto& perf = et_ref->GetMessage(et, perf_field);
const auto* perf_ref = perf.GetReflection();
const auto* perf_desc = perf.GetDescriptor();
io_time_ns = perf_ref->GetUInt64(perf, perf_desc->FindFieldByName("io_time_ns"));
cache_hit_count = perf_ref->GetUInt64(perf, perf_desc->FindFieldByName("cache_hit_count"));
miss_block_cache_count = perf_ref->GetUInt64(perf, perf_desc->FindFieldByName("miss_block_count"));
internal_skipped_count =
perf_ref->GetUInt64(perf, perf_desc->FindFieldByName("internal_skipped_count"));
total_internal_skipped_count += internal_skipped_count;
internal_tombstone_count =
perf_ref->GetUInt64(perf, perf_desc->FindFieldByName("internal_tombstone_count"));
total_internal_skipped_count += internal_tombstone_count;
}
elapsed_str += fmt::format(" {}({} {} {} {} {})", name, time_us, skip_version, io_time_ns / 1000,
cache_hit_count, internal_skipped_count);
elapsed_str += fmt::format(" {}({} {} {} {} {} {})", name, time_us, skip_version, io_time_ns / 1000,
miss_block_cache_count, internal_skipped_count, internal_tombstone_count);
}
if (GetTxnId() != 0) {
// only print txn log
DINGO_LOG(INFO) << fmt::format("[sdk][{}][{}] total_time_us({}) {}", GetTxnId(), Method(),
total_time_ns / 1000, elapsed_str);
DINGO_LOG(INFO) << fmt::format(
"[sdk][{}][{}] total_time_us({}) total_phase_time_us({}) raft_commit_time_us({}) "
"total_mvcc_version({}) "
"total_internal_skipped({}) {}",
GetTxnId(), Method(), total_time_ns / 1000, total_phase_time_us, raft_commit_time_ns / 1000,
total_mvcc_version, total_internal_skipped_count, elapsed_str);
}
}
}
Expand Down
25 changes: 23 additions & 2 deletions src/sdk/rpc/store_rpc_controller.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,31 @@ namespace dingodb {
namespace sdk {

StoreRpcController::StoreRpcController(const ClientStub& stub, Rpc& rpc, RegionPtr region)
: stub_(stub), rpc_(rpc), region_(std::move(region)), rpc_retry_times_(0) {}
: stub_(stub), rpc_shared_(nullptr), rpc_(rpc), region_(std::move(region)), rpc_retry_times_(0) {}

StoreRpcController::StoreRpcController(const ClientStub& stub, Rpc& rpc)
: stub_(stub), rpc_(rpc), region_(nullptr), rpc_retry_times_(0) {}
: stub_(stub), rpc_shared_(nullptr), rpc_(rpc), region_(nullptr), rpc_retry_times_(0) {}

StoreRpcController::StoreRpcController(const ClientStub& stub, std::shared_ptr<Rpc> rpc, RegionPtr region)
: stub_(stub), rpc_shared_(std::move(rpc)), rpc_(*rpc_shared_), region_(std::move(region)), rpc_retry_times_(0) {}

StoreRpcController::StoreRpcController(const StoreRpcController& other)
: stub_(other.stub_),
rpc_shared_(other.rpc_shared_),
rpc_(other.rpc_),
region_(other.region_),
rpc_retry_times_(other.rpc_retry_times_),
status_(other.status_),
call_back_(other.call_back_) {}

StoreRpcController::StoreRpcController(StoreRpcController&& other) noexcept
: stub_(other.stub_),
rpc_shared_(std::move(other.rpc_shared_)),
rpc_(other.rpc_),
region_(std::move(other.region_)),
rpc_retry_times_(other.rpc_retry_times_),
status_(std::move(other.status_)),
call_back_(std::move(other.call_back_)) {}

StoreRpcController::~StoreRpcController() = default;

Expand Down
12 changes: 12 additions & 0 deletions src/sdk/rpc/store_rpc_controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
#ifndef DINGODB_SDK_STORE_RPC_CONTROLLER_H_
#define DINGODB_SDK_STORE_RPC_CONTROLLER_H_

#include <memory>

#include "dingosdk/status.h"
#include "proto/error.pb.h"
#include "sdk/client_stub.h"
Expand All @@ -31,6 +33,13 @@ class StoreRpcController {

explicit StoreRpcController(const ClientStub& stub, Rpc& rpc, RegionPtr region);

// Use this constructor when the Rpc object's lifetime must be extended beyond
// the caller's vector (e.g. txn tasks that clear rpcs_ on retry).
explicit StoreRpcController(const ClientStub& stub, std::shared_ptr<Rpc> rpc, RegionPtr region);

StoreRpcController(const StoreRpcController& other);
StoreRpcController(StoreRpcController&& other) noexcept;

virtual ~StoreRpcController();

// TODO: to remove
Expand Down Expand Up @@ -79,6 +88,9 @@ class StoreRpcController {
static const pb::error::Error& GetResponseError(Rpc& rpc);

const ClientStub& stub_;
// rpc_shared_ must be declared before rpc_ so it is constructed first;
// the shared_ptr constructor binds rpc_ to *rpc_shared_.
std::shared_ptr<Rpc> rpc_shared_;
Rpc& rpc_;
RegionPtr region_;
int rpc_retry_times_;
Expand Down
9 changes: 5 additions & 4 deletions src/sdk/transaction/txn_task/txn_batch_get_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ void TxnBatchGetTask::DoAsync() {
region_id_to_keys[tmp->RegionId()].push_back(key);
}

controllers_.reserve(region_id_to_keys.size());
rpcs_.reserve(region_id_to_keys.size());
for (const auto& entry : region_id_to_keys) {
auto region_id = entry.first;
auto iter = region_id_to_region.find(region_id);
Expand All @@ -95,7 +97,7 @@ void TxnBatchGetTask::DoAsync() {
resolved_lock = region_id_to_resolved_lock_[region_id];
}

auto rpc = std::make_unique<TxnBatchGetRpc>();
auto rpc = std::make_shared<TxnBatchGetRpc>();
rpc->MutableRequest()->Clear();
rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs());
rpc->SetTxnId(txn_impl_->GetStartTs());
Expand All @@ -104,9 +106,8 @@ void TxnBatchGetTask::DoAsync() {
for (const auto& key : entry.second) {
*rpc->MutableRequest()->add_keys() = key;
}
StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(std::move(controller));
rpcs_.push_back(std::move(rpc));
controllers_.emplace_back(stub, rpc, region);
rpcs_.push_back(rpc);
}
CHECK(rpcs_.size() == controllers_.size());
sub_tasks_count_.store(rpcs_.size());
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/transaction/txn_task/txn_batch_get_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class TxnBatchGetTask : public TxnTask {
std::unordered_map<int64_t, uint64_t> region_id_to_resolved_lock_;

std::vector<StoreRpcController> controllers_;
std::vector<std::unique_ptr<TxnBatchGetRpc>> rpcs_;
std::vector<std::shared_ptr<TxnBatchGetRpc>> rpcs_;
bool need_retry_{false};

RWLock rw_lock_;
Expand Down
9 changes: 5 additions & 4 deletions src/sdk/transaction/txn_task/txn_batch_rollback_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,15 @@ void TxnBatchRollbackTask::DoAsync() {
region_id_to_keys[tmp->RegionId()].push_back(key);
}

controllers_.reserve(region_id_to_keys.size());
rpcs_.reserve(region_id_to_keys.size());
for (const auto& entry : region_id_to_keys) {
auto region_id = entry.first;
auto iter = region_id_to_region.find(region_id);
CHECK(iter != region_id_to_region.end());
auto region = iter->second;

auto rpc = std::make_unique<TxnBatchRollbackRpc>();
auto rpc = std::make_shared<TxnBatchRollbackRpc>();
rpc->MutableRequest()->Clear();
rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs());
rpc->SetTxnId(txn_impl_->GetStartTs());
Expand All @@ -100,9 +102,8 @@ void TxnBatchRollbackTask::DoAsync() {
auto* fill = rpc->MutableRequest()->add_keys();
*fill = key;
}
StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(std::move(controller));
rpcs_.push_back(std::move(rpc));
controllers_.emplace_back(stub, rpc, region);
rpcs_.push_back(rpc);
}

CHECK(rpcs_.size() == controllers_.size());
Expand Down
2 changes: 1 addition & 1 deletion src/sdk/transaction/txn_task/txn_batch_rollback_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class TxnBatchRollbackTask : public TxnTask {
std::shared_ptr<TxnImpl> txn_impl_;

std::vector<StoreRpcController> controllers_;
std::vector<std::unique_ptr<TxnBatchRollbackRpc>> rpcs_;
std::vector<std::shared_ptr<TxnBatchRollbackRpc>> rpcs_;
std::set<std::string_view> next_keys_;
std::atomic<int> sub_tasks_count_{0};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ void TxnCheckSecondaryLocksTask::DoAsync() {
region_id_to_keys[tmp->RegionId()].push_back(key);
}

controllers_.reserve(region_id_to_keys.size());
rpcs_.reserve(region_id_to_keys.size());
for (const auto& entry : region_id_to_keys) {
auto region_id = entry.first;
auto iter = region_id_to_region.find(region_id);
Expand All @@ -94,7 +96,7 @@ void TxnCheckSecondaryLocksTask::DoAsync() {

uint64_t resolved_lock = 0;

auto rpc = std::make_unique<TxnCheckSecondaryLocksRpc>();
auto rpc = std::make_shared<TxnCheckSecondaryLocksRpc>();
rpc->MutableRequest()->Clear();
rpc->MutableRequest()->set_start_ts(primary_lock_start_ts_);
rpc->SetTxnId(caller_start_ts_);
Expand All @@ -103,9 +105,8 @@ void TxnCheckSecondaryLocksTask::DoAsync() {
for (const auto& key : entry.second) {
*rpc->MutableRequest()->add_keys() = key;
}
StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(std::move(controller));
rpcs_.push_back(std::move(rpc));
controllers_.emplace_back(stub, rpc, region);
rpcs_.push_back(rpc);
}
CHECK(rpcs_.size() == controllers_.size());
sub_tasks_count_.store(rpcs_.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TxnCheckSecondaryLocksTask : public TxnTask {
int64_t caller_start_ts_{0};

std::vector<StoreRpcController> controllers_;
std::vector<std::unique_ptr<TxnCheckSecondaryLocksRpc>> rpcs_;
std::vector<std::shared_ptr<TxnCheckSecondaryLocksRpc>> rpcs_;
RWLock rw_lock_;
Status status_;
std::atomic<int> sub_tasks_count_{0};
Expand Down
16 changes: 8 additions & 8 deletions src/sdk/transaction/txn_task/txn_commit_task.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ void TxnCommitTask::DoAsync() {
region_id_to_keys[tmp->RegionId()].push_back(key);
}

controllers_.reserve(next_keys_.size());
rpcs_.reserve(next_keys_.size());
for (const auto& entry : region_id_to_keys) {
auto region_id = entry.first;
auto iter = region_id_to_region.find(region_id);
CHECK(iter != region_id_to_region.end());
auto region = iter->second;

auto rpc = std::make_unique<TxnCommitRpc>();
auto rpc = std::make_shared<TxnCommitRpc>();
rpc->MutableRequest()->Clear();
rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs());
rpc->SetTxnId(txn_impl_->GetStartTs());
Expand All @@ -106,12 +108,11 @@ void TxnCommitTask::DoAsync() {
if (rpc->MutableRequest()->keys_size() == FLAGS_txn_max_batch_count) {
DINGO_LOG(INFO) << fmt::format("[sdk.txn.{}] commit key, region({}) keys({}) equal max batch count.",
txn_impl_->ID(), region->RegionId(), rpc->MutableRequest()->keys_size());
StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(std::move(controller));
rpcs_.push_back(std::move(rpc));
controllers_.emplace_back(stub, rpc, region);
rpcs_.push_back(rpc);

// reset rpc
rpc = std::make_unique<TxnCommitRpc>();
rpc = std::make_shared<TxnCommitRpc>();
rpc->MutableRequest()->set_start_ts(txn_impl_->GetStartTs());
rpc->SetTxnId(txn_impl_->GetStartTs());
rpc->MutableRequest()->set_commit_ts(txn_impl_->GetCommitTs());
Expand All @@ -122,9 +123,8 @@ void TxnCommitTask::DoAsync() {
*fill = key;
}

StoreRpcController controller(stub, *rpc, region);
controllers_.push_back(std::move(controller));
rpcs_.push_back(std::move(rpc));
controllers_.emplace_back(stub, rpc, region);
rpcs_.push_back(rpc);
}

CHECK(rpcs_.size() == controllers_.size());
Expand Down
Loading
Loading