From 9e3b2afff2c6b4d219e1f0b08ddbd907e0173920 Mon Sep 17 00:00:00 2001 From: randomkang <550941794@qq.com> Date: Wed, 3 Jun 2026 00:36:19 +0800 Subject: [PATCH] support gpu direct rdma 1) recv all data on gpu first 2) the gpu block is alloced from a gpu block pool 3) brpc header, meta and body will be copied from gpu to cpu to process. 4) To decrease the d2h counts, we will prefetch 512B to memory Co-authored-by: sunce4t <847480001@qq.com> --- BUILD.bazel | 9 + Makefile | 1 + bazel/config/BUILD.bazel | 8 +- config_brpc.sh | 17 +- docs/cn/gdr.md | 43 ++ docs/en/gdr.md | 44 ++ example/BUILD.bazel | 5 +- example/rdma_performance/client.cpp | 58 ++- example/rdma_performance/server.cpp | 7 + src/brpc/acceptor.h | 2 +- src/brpc/channel.cpp | 2 + src/brpc/gdr_transport.cpp | 35 ++ src/brpc/gdr_transport.h | 32 ++ src/brpc/policy/baidu_rpc_protocol.cpp | 44 +- src/brpc/policy/baidu_rpc_protocol.h | 13 + src/brpc/policy/baidu_rpc_protocol_gpu.cpp | 228 ++++++++++ src/brpc/rdma/rdma_endpoint.cpp | 106 ++++- src/brpc/rdma/rdma_endpoint.h | 24 +- src/brpc/rdma/rdma_helper.cpp | 38 +- src/brpc/rdma/rdma_helper.h | 7 + src/brpc/rdma_transport.cpp | 13 +- src/brpc/rdma_transport.h | 3 +- src/brpc/socket_mode.h | 5 +- src/brpc/transport_factory.cpp | 17 +- src/butil/gpu/gpu_block_pool.cpp | 475 +++++++++++++++++++++ src/butil/gpu/gpu_block_pool.h | 175 ++++++++ src/butil/iobuf.cpp | 102 +++++ src/butil/iobuf.h | 10 + 28 files changed, 1475 insertions(+), 48 deletions(-) create mode 100644 docs/cn/gdr.md create mode 100644 docs/en/gdr.md create mode 100644 src/brpc/gdr_transport.cpp create mode 100644 src/brpc/gdr_transport.h create mode 100644 src/brpc/policy/baidu_rpc_protocol_gpu.cpp create mode 100644 src/butil/gpu/gpu_block_pool.cpp create mode 100644 src/butil/gpu/gpu_block_pool.h diff --git a/BUILD.bazel b/BUILD.bazel index 22cb508548..84f9b64f1d 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -51,6 +51,9 @@ DEFINES = [ }) + select({ "//bazel/config:brpc_with_rdma": ["BRPC_WITH_RDMA=1"], "//conditions:default": [], +}) + select({ + "//bazel/config:brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [], }) + select({ "//bazel/config:brpc_with_debug_bthread_sche_safety": ["BRPC_DEBUG_BTHREAD_SCHE_SAFETY=1"], "//conditions:default": ["BRPC_DEBUG_BTHREAD_SCHE_SAFETY=0"], @@ -94,6 +97,11 @@ LINKOPTS = [ "-libverbs", ], "//conditions:default": [], +}) + select({ + "//bazel/config:brpc_with_gdr": [ + "-lcuda -lcudart", + ], + "//conditions:default": [], }) + select({ "//bazel/config:brpc_with_asan": ["-fsanitize=address"], "//conditions:default": [], @@ -235,6 +243,7 @@ BUTIL_SRCS = [ "src/butil/iobuf.cpp", "src/butil/single_iobuf.cpp", "src/butil/iobuf_profiler.cpp", + "src/butil/gpu/gpu_block_pool.cpp", "src/butil/binary_printer.cpp", "src/butil/recordio.cc", "src/butil/popen.cpp", diff --git a/Makefile b/Makefile index abe029e360..29390ac6f5 100644 --- a/Makefile +++ b/Makefile @@ -97,6 +97,7 @@ BUTIL_SOURCES = \ src/butil/files/scoped_temp_dir.cc \ src/butil/file_util.cc \ src/butil/file_util_posix.cc \ + src/butil/gpu/gpu_block_pool.cpp \ src/butil/guid.cc \ src/butil/guid_posix.cc \ src/butil/hash.cc \ diff --git a/bazel/config/BUILD.bazel b/bazel/config/BUILD.bazel index eec551da8b..12319f0dbe 100644 --- a/bazel/config/BUILD.bazel +++ b/bazel/config/BUILD.bazel @@ -104,6 +104,12 @@ config_setting( visibility = ["//visibility:public"], ) +config_setting( + name = "brpc_with_gdr", + define_values = {"BRPC_WITH_GDR": "true"}, + visibility = ["//visibility:public"], +) + config_setting( name = "brpc_with_boringssl", define_values = {"BRPC_WITH_BORINGSSL": "true"}, @@ -149,4 +155,4 @@ config_setting( name = "with_babylon_counter", define_values = {"with_babylon_counter": "true"}, visibility = ["//visibility:public"], -) \ No newline at end of file +) diff --git a/config_brpc.sh b/config_brpc.sh index 4526d218a8..de2dfdc74a 100755 --- a/config_brpc.sh +++ b/config_brpc.sh @@ -54,10 +54,11 @@ else LDD=ldd fi -TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-mesalink,with-bthread-tracer,with-debug-bthread-sche-safety,with-debug-lock,with-asan,nodebugsymbols,werror -n 'config_brpc' -- "$@"` +TEMP=`getopt -o v: --long headers:,libs:,cc:,cxx:,with-glog,with-thrift,with-rdma,with-gdr,with-mesalink,with-bthread-tracer,with-debug-bthread-sche-safety,with-debug-lock,with-asan,nodebugsymbols,werror -n 'config_brpc' -- "$@"` WITH_GLOG=0 WITH_THRIFT=0 WITH_RDMA=0 +WITH_GDR=0 WITH_MESALINK=0 WITH_BTHREAD_TRACER=0 WITH_ASAN=0 @@ -87,6 +88,7 @@ while true; do --with-glog ) WITH_GLOG=1; shift 1 ;; --with-thrift) WITH_THRIFT=1; shift 1 ;; --with-rdma) WITH_RDMA=1; shift 1 ;; + --with-gdr) WITH_GDR=1; shift 1 ;; --with-mesalink) WITH_MESALINK=1; shift 1 ;; --with-bthread-tracer) WITH_BTHREAD_TRACER=1; shift 1 ;; --with-debug-bthread-sche-safety ) BRPC_DEBUG_BTHREAD_SCHE_SAFETY=1; shift 1 ;; @@ -532,6 +534,18 @@ if [ $WITH_RDMA != 0 ]; then append_to_output "WITH_RDMA=1" fi +if [ $WITH_GDR != 0 ]; then + CUDA_LIB="/usr/local/cuda/lib64" + CUDA_HDR="/usr/local/cuda/include" + append_to_output_libs "$CUDA_LIB" + append_to_output_headers "$CUDA_HDR" + + CPPFLAGS="${CPPFLAGS} -DBRPC_WITH_GDR" + + append_to_output "DYNAMIC_LINKINGS+=-lcuda -lcudart" + append_to_output "WITH_GDR=1" +fi + if [ $WITH_MESALINK != 0 ]; then CPPFLAGS="${CPPFLAGS} -DUSE_MESALINK" fi @@ -652,6 +666,7 @@ print_info "System: $SYSTEM" if [ $WITH_GLOG -ne 0 ]; then print_info "With glog: yes"; fi if [ $WITH_THRIFT -ne 0 ]; then print_info "With thrift: yes"; fi if [ $WITH_RDMA -ne 0 ]; then print_info "With RDMA: yes"; fi +if [ $WITH_GDR -ne 0 ]; then print_info "With GDR: yes"; fi if [ $WITH_MESALINK -ne 0 ]; then print_info "With MesaLink: yes"; fi if [ $WITH_BTHREAD_TRACER -ne 0 ]; then print_info "With bthread tracer: yes"; fi if [ $WITH_ASAN -ne 0 ]; then print_info "With ASAN: yes"; fi diff --git a/docs/cn/gdr.md b/docs/cn/gdr.md new file mode 100644 index 0000000000..7e46f19d2f --- /dev/null +++ b/docs/cn/gdr.md @@ -0,0 +1,43 @@ +# 编译 + +GDR: GPU Direct Rdma, gdr 是rdma的一种特殊模式,其通过rdma将数据直接收到了gpu的显存上。 + +由于GDR对驱动与硬件有要求,目前仅支持在Linux系统编译并运行GDR功能。 + +目前GDR只支持baidu std protocol。 + +使用config_brpc: +```bash +sh config_brpc.sh --with-rdma --with-gdr --headers="/usr/include" --libs="/usr/lib64 /usr/bin" +make + +cd example/rdma_performance # 示例程序 +make +``` + +使用bazel: +```bash +# Server +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_server +# Client +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_client +``` + +# 基本实现 + +GDR是RDMA的一种特殊形式,在使用GDR之前,必须对RDMA和GDR都进行Global Init。 +GDR新增了一个显存池,类似于RDMA内存池,显存池的数据也是按照block进行组织的。 +当打开GDR功能后,框架通过DoPostRecvGDR来发起显存上的WQE。 +在接收到数据后,我们将header、meta、body(不包括attachment)copy回内存进行处理。 +AttachMent位于显存上,用户可以调用IOBuf::copy_from_gpu接口将attachment从brpc框架层copy到应用层进行处理。 + + +注意: +1. 在使用gdr功能时,需要将环境变量MLX5_SCATTER_TO_CQE设置为0. + + +# 参数 + +可配置参数说明: +* gdr_block_size_kb: 使用gdr传送数据时,block的大小(单位为KB),默认为512; +* max_gdr_regions: gdr显存池所使用Region的最大个数,每个Region大小为1GB; diff --git a/docs/en/gdr.md b/docs/en/gdr.md new file mode 100644 index 0000000000..2e968f3e69 --- /dev/null +++ b/docs/en/gdr.md @@ -0,0 +1,44 @@ +Compile GDR: + +GPU Direct RDMA. GDR is a special mode of RDMA that allows data to be received directly into the GPU’s memory through RDMA. +Because GDR requires specific drivers and hardware support, it is currently only available for compilation and execution on Linux systems. +At present, GDR only supports the Baidu STD protocol. + +To use config_brpc: + +sh config_brpc.sh --with-rdma --with-gdr --headers="/usr/include" --libs="/usr/lib64 /usr/bin" +make +cd example/rdma_performance # Example program +make + +To use Bazel: + +# Server +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_server + +# Client +bazel build --define=BRPC_WITH_RDMA=true --define=BRPC_WITH_GDR=true example:rdma_performance_client + + +Basic Implementation: + +GDR is a special form of RDMA. Before using GDR, both RDMA and GDR must be globally initialized. + +GDR introduces a GPU memory pool, similar to the RDMA memory pool. Data in the GPU memory pool is also organized in blocks. + +When GDR is enabled, the framework initiates WQEs on GPU memory through DoPostRecvGDR. + +After receiving data, the header, meta, and body (excluding attachments) are copied back to host memory for processing. +Attachments remain in GPU memory, and users can call IOBuf::copy_from_gpu to copy attachments from the brpc framework layer to the application layer. + +Note: + +When using GDR, the environment variable MLX5_SCATTER_TO_CQE must be set to 0. + +Parameters + +Configurable parameters: + +gdr_block_size_kb: The block size (in KB) used when transferring data via GDR. Default is 512. + +max_gdr_regions: The maximum number of regions used by the GDR GPU memory pool. Each region is 1 GB. diff --git a/example/BUILD.bazel b/example/BUILD.bazel index 4ee7cb140f..098d283973 100644 --- a/example/BUILD.bazel +++ b/example/BUILD.bazel @@ -34,6 +34,9 @@ COPTS = [ }) + select({ "//bazel/config:brpc_with_rdma": ["-DBRPC_WITH_RDMA=1"], "//conditions:default": [""], +}) + select({ + "//bazel/config:brpc_with_gdr": ["-DBRPC_WITH_GDR=1"], + "//conditions:default": [""], }) brpc_proto_library( @@ -119,4 +122,4 @@ cc_binary( deps = [ "//:brpc", ], -) \ No newline at end of file +) diff --git a/example/rdma_performance/client.cpp b/example/rdma_performance/client.cpp index 2e8acc4051..40944d65a1 100644 --- a/example/rdma_performance/client.cpp +++ b/example/rdma_performance/client.cpp @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +#ifdef BRPC_WITH_GDR +#include +#include +#endif #include #include #include @@ -42,6 +46,7 @@ DEFINE_string(connection_type, "single", "Connection type of the channel"); DEFINE_string(protocol, "baidu_std", "Protocol type."); DEFINE_string(servers, "0.0.0.0:8002+0.0.0.0:8002", "IP Address of servers"); DEFINE_bool(use_rdma, true, "Use RDMA or not"); +DEFINE_bool(use_gdr, false, "Use GDR or not"); DEFINE_int32(rpc_timeout_ms, 2000, "RPC call timeout"); DEFINE_int32(test_seconds, 20, "Test running time"); DEFINE_int32(test_iterations, 0, "Test iterations"); @@ -84,16 +89,46 @@ class PerformanceTest { , _stop(false) { if (attachment_size > 0) { - _addr = malloc(attachment_size); - butil::fast_rand_bytes(_addr, attachment_size); - _attachment.append(_addr, attachment_size); +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + int gpu_id = 0; + cudaSetDevice(gpu_id); + cudaMalloc(&_addr, attachment_size); + auto pd = brpc::rdma::GetRdmaPd(); + mr = ibv_reg_mr(pd, _addr, attachment_size, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE); + if (!mr) { + LOG(FATAL) << "Failed to register MR:" << strerror(errno) + << ", addr:" << _addr; + } + auto deleter = [](void* data) {}; + _attachment.append_user_data_with_meta(_addr, attachment_size, deleter, mr->lkey); + } + else +#endif + { + _addr = malloc(attachment_size); + butil::fast_rand_bytes(_addr, attachment_size); + _attachment.append(_addr, attachment_size); + } } _echo_attachment = echo_attachment; } ~PerformanceTest() { if (_addr) { - free(_addr); +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + ibv_dereg_mr(mr); + cudaFree(_addr); + } + else +#endif + { + free(_addr); + } } delete _channel; } @@ -103,6 +138,11 @@ class PerformanceTest { int Init() { brpc::ChannelOptions options; options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP; +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + options.socket_mode = brpc::SOCKET_MODE_GDR; + } +#endif options.protocol = FLAGS_protocol; options.connection_type = FLAGS_connection_type; options.timeout_ms = FLAGS_rpc_timeout_ms; @@ -203,6 +243,9 @@ class PerformanceTest { } private: +#ifdef BRPC_WITH_GDR + ibv_mr* mr; +#endif void* _addr; brpc::Channel* _channel; uint64_t _start_time; @@ -223,6 +266,7 @@ void Test(int thread_num, int attachment_size) { << ", Depth: " << FLAGS_queue_depth << ", Attachment: " << attachment_size << "B" << ", RDMA: " << (FLAGS_use_rdma ? "yes" : "no") + << ", GDR: " << (FLAGS_use_gdr ? "yes" : "no") << ", Echo: " << (FLAGS_echo_attachment ? "yes]" : "no]") << std::endl; g_total_bytes.store(0, butil::memory_order_relaxed); @@ -278,6 +322,12 @@ int main(int argc, char* argv[]) { if (FLAGS_use_rdma) { brpc::rdma::GlobalRdmaInitializeOrDie(); } +#ifdef BRPC_WITH_GDR + else if (FLAGS_use_gdr) { + brpc::rdma::GlobalRdmaInitializeOrDie(); + brpc::rdma::GlobalGdrInitializeOrDie(); + } +#endif brpc::StartDummyServerAt(FLAGS_dummy_port); diff --git a/example/rdma_performance/server.cpp b/example/rdma_performance/server.cpp index 2e93e1eec7..eca2641513 100644 --- a/example/rdma_performance/server.cpp +++ b/example/rdma_performance/server.cpp @@ -28,6 +28,7 @@ DEFINE_int32(port, 8002, "TCP Port of this server"); DEFINE_bool(use_rdma, true, "Use RDMA or not"); +DEFINE_bool(use_gdr, false, "Use GDR or not"); butil::atomic g_last_time(0); @@ -77,6 +78,12 @@ int main(int argc, char* argv[]) { brpc::ServerOptions options; options.socket_mode = FLAGS_use_rdma? brpc::SOCKET_MODE_RDMA : brpc::SOCKET_MODE_TCP; +#ifdef BRPC_WITH_GDR + if (FLAGS_use_gdr) { + options.socket_mode = brpc::SOCKET_MODE_GDR; + } +#endif + if (server.Start(FLAGS_port, &options) != 0) { LOG(ERROR) << "Fail to start EchoServer"; return -1; diff --git a/src/brpc/acceptor.h b/src/brpc/acceptor.h index 77942beca2..f28d3f5bce 100644 --- a/src/brpc/acceptor.h +++ b/src/brpc/acceptor.h @@ -111,7 +111,7 @@ friend class Server; bool _force_ssl; std::shared_ptr _ssl_ctx; - // Choose to use a certain socket: 0 TCP, 1 RDMA + // Choose to use a certain socket: 0 TCP, 1 RDMA, 2 GDR SocketMode _socket_mode; // Acceptor belongs to this tag diff --git a/src/brpc/channel.cpp b/src/brpc/channel.cpp index a8caeaf953..08f1445a58 100644 --- a/src/brpc/channel.cpp +++ b/src/brpc/channel.cpp @@ -134,6 +134,8 @@ static ChannelSignature ComputeChannelSignature(const ChannelOptions& opt) { } if (opt.socket_mode == SOCKET_MODE_RDMA) { buf.append("|rdma"); + } else if (opt.socket_mode == SOCKET_MODE_GDR) { + buf.append("|gdr"); } butil::MurmurHash3_x64_128_Update(&mm_ctx, buf.data(), buf.size()); buf.clear(); diff --git a/src/brpc/gdr_transport.cpp b/src/brpc/gdr_transport.cpp new file mode 100644 index 0000000000..005b191380 --- /dev/null +++ b/src/brpc/gdr_transport.cpp @@ -0,0 +1,35 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include "brpc/gdr_transport.h" +#include "brpc/rdma/rdma_helper.h" + +namespace brpc { + +void GdrTransport::Init(Socket *socket, const SocketOptions &options) { + DoInit(socket, options, true); +} + +int GdrTransport::GdrContextInitOrDie() { + rdma::GlobalGdrInitializeOrDie(); + return 0; +} + +} // namespace brpc +#endif diff --git a/src/brpc/gdr_transport.h b/src/brpc/gdr_transport.h new file mode 100644 index 0000000000..0f41c0c4db --- /dev/null +++ b/src/brpc/gdr_transport.h @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BRPC_GDR_TRANSPORT_H +#define BRPC_GDR_TRANSPORT_H + +#if BRPC_WITH_GDR +#include "brpc/rdma_transport.h" + +namespace brpc { +class GdrTransport : public RdmaTransport { +public: + void Init(Socket* socket, const SocketOptions& options) override; + static int GdrContextInitOrDie(); +}; +} // namespace brpc +#endif // BRPC_WITH_GDR +#endif //BRPC_GDR_TRANSPORT_H diff --git a/src/brpc/policy/baidu_rpc_protocol.cpp b/src/brpc/policy/baidu_rpc_protocol.cpp index 2c5a7e7224..56b694726d 100644 --- a/src/brpc/policy/baidu_rpc_protocol.cpp +++ b/src/brpc/policy/baidu_rpc_protocol.cpp @@ -104,6 +104,12 @@ static void SerializeRpcHeaderAndMeta( ParseResult ParseRpcMessage(butil::IOBuf* source, Socket* socket, bool /*read_eof*/, const void*) { +#if BRPC_WITH_GDR + bool is_gpu_memory = source->is_gpu_memory(); + if (is_gpu_memory) { + return ParseRpcMessageGpu(source, socket, false /* not use */, nullptr /* not use */); + } +#endif // BRPC_WITH_GDR char header_buf[12]; const size_t n = source->copy_to(header_buf, sizeof(header_buf)); if (n >= 4) { @@ -796,7 +802,15 @@ void ProcessRpcRequest(InputMessageBase* msg_base) { butil::IOBuf req_buf; int body_without_attachment_size = req_size - meta.attachment_size(); - msg->payload.cutn(&req_buf, body_without_attachment_size); +#if BRPC_WITH_GDR + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (is_gpu_memory) { + FillReqBufGpu(&req_buf, msg.get(), body_without_attachment_size); + } else +#endif // BRPC_WITH_GDR + { + msg->payload.cutn(&req_buf, body_without_attachment_size); + } if (meta.attachment_size() > 0) { cntl->request_attachment().swap(msg->payload); } @@ -968,17 +982,25 @@ void ProcessRpcResponse(InputMessageBase* msg_base) { butil::IOBuf res_buf; const int res_size = msg->payload.length(); butil::IOBuf* res_buf_ptr = &msg->payload; - if (meta.has_attachment_size()) { - if (meta.attachment_size() > res_size) { - cntl->SetFailed( - ERESPONSE, "attachment_size=%d is larger than response_size=%d", - meta.attachment_size(), res_size); - break; +#if BRPC_WITH_GDR + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (is_gpu_memory) { + FillResBufGpu(&res_buf, msg.get(), meta, &res_buf_ptr, cntl); + } else +#endif // BRPC_WITH_GDR + { + if (meta.has_attachment_size()) { + if (meta.attachment_size() > res_size) { + cntl->SetFailed( + ERESPONSE, "attachment_size=%d is larger than response_size=%d", + meta.attachment_size(), res_size); + break; + } + int body_without_attachment_size = res_size - meta.attachment_size(); + msg->payload.cutn(&res_buf, body_without_attachment_size); + res_buf_ptr = &res_buf; + cntl->response_attachment().swap(msg->payload); } - int body_without_attachment_size = res_size - meta.attachment_size(); - msg->payload.cutn(&res_buf, body_without_attachment_size); - res_buf_ptr = &res_buf; - cntl->response_attachment().swap(msg->payload); } ContentType content_type = meta.content_type(); diff --git a/src/brpc/policy/baidu_rpc_protocol.h b/src/brpc/policy/baidu_rpc_protocol.h index 77ecc780a2..6a3c379142 100644 --- a/src/brpc/policy/baidu_rpc_protocol.h +++ b/src/brpc/policy/baidu_rpc_protocol.h @@ -19,6 +19,8 @@ #ifndef BRPC_POLICY_BRPC_PROTOCOL_H #define BRPC_POLICY_BRPC_PROTOCOL_H +#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta +#include "brpc/policy/most_common_message.h" #include "brpc/protocol.h" namespace brpc { @@ -53,6 +55,17 @@ void PackRpcRequest(butil::IOBuf* buf, // Returns the `name' of the 'content_type'. const char* ContentTypeToCStr(ContentType content_type); +#if BRPC_WITH_GDR +// Parse binary format of baidu_std +ParseResult ParseRpcMessageGpu(butil::IOBuf* source, Socket *socket, bool read_eof, + const void *arg); + +void FillReqBufGpu(butil::IOBuf* req_buf, MostCommonMessage* msg, int body_without_attachment_size); + +void FillResBufGpu(butil::IOBuf* res_buf, MostCommonMessage* msg, const RpcMeta& meta, + butil::IOBuf** res_buf_ptr, Controller* cntl); + +#endif } // namespace policy } // namespace brpc diff --git a/src/brpc/policy/baidu_rpc_protocol_gpu.cpp b/src/brpc/policy/baidu_rpc_protocol_gpu.cpp new file mode 100644 index 0000000000..392cdbbff0 --- /dev/null +++ b/src/brpc/policy/baidu_rpc_protocol_gpu.cpp @@ -0,0 +1,228 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include // MethodDescriptor +#include // Message +#include +#include +#include + +#include "butil/gpu/gpu_block_pool.h" +#include "butil/iobuf.h" // butil::IOBuf +#include "butil/logging.h" // LOG() +#include "butil/raw_pack.h" // RawPacker RawUnpacker +#include "butil/memory/scope_guard.h" +#include "butil/raw_pack.h" // RawPacker RawUnpacker +#include "butil/strings/string_util.h" + +#include "json2pb/json_to_pb.h" +#include "json2pb/pb_to_json.h" +#include "brpc/controller.h" // Controller +#include "brpc/socket.h" // Socket +#include "brpc/server.h" // Server +#include "brpc/span.h" +#include "brpc/compress.h" // ParseFromCompressedData +#include "brpc/checksum.h" +#include "brpc/stream_impl.h" +#include "brpc/rpc_dump.h" // SampledRequest +#include "brpc/rpc_pb_message_factory.h" +#include "brpc/policy/baidu_rpc_meta.pb.h" // RpcRequestMeta +#include "brpc/policy/baidu_rpc_protocol.h" +#include "brpc/policy/most_common_message.h" +#include "brpc/policy/streaming_rpc_protocol.h" +#include "brpc/details/usercode_backup_pool.h" +#include "brpc/details/controller_private_accessor.h" +#include "brpc/details/server_private_accessor.h" + +namespace brpc { +namespace policy { + +// Notes: +// 1. 12-byte header [PRPC][body_size][meta_size] +// 2. body_size and meta_size are in network byte order +// 3. Use service->full_name() + method_name to specify the method to call +// 4. `attachment_size' is set iff request/response has attachment +// 5. Not supported: chunk_info + +// Pack header into `buf' + +const int header_size = 12; +// if we recv data into gpu, the header/meta/body will be copied to cpu and processed. +// in to to limit the count of d2h, we will prefetch 512B from gpu to cpu. +// if header_size + meta_size + body_size(without attachment) is less than 512, then one +// d2h is enough for one rpc. + +const int prefetch_d2h_size = 512; + +ParseResult ParseRpcMessageGpu(butil::IOBuf* source, Socket* socket, + bool /*read_eof*/, const void*) { + + char header_buf[12]; + size_t n = 0; + uint32_t body_size; + uint32_t meta_size; + ParseError pe = PARSE_OK; + + void* prefetch_d2h_data = NULL; + bool is_gpu_memory = source->is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "RpcMessage is not in gpu!!!"; + } + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + prefetch_d2h_data = host_allocator->AllocateRaw(prefetch_d2h_size); + if (prefetch_d2h_data == NULL) { + LOG(FATAL) << "alloc host data failed!!!"; + } + + // n is the bytes we real frefetch, n maybe less than prefetch_d2h_size; + n = source->copy_from_gpu(prefetch_d2h_data, prefetch_d2h_size); + size_t copy_size = n > 12 ? 12 : n; + memcpy(header_buf, prefetch_d2h_data, copy_size); + + do { + if (n >= 4) { + void* dummy = header_buf; + if (*(const uint32_t*)dummy != *(const uint32_t*)"PRPC") { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } else { + if (memcmp(header_buf, "PRPC", n) != 0) { + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } + if (n < sizeof(header_buf)) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + butil::RawUnpacker(header_buf + 4).unpack32(body_size).unpack32(meta_size); + if (body_size > FLAGS_max_body_size) { + // We need this log to report the body_size to give users some clues + // which is not printed in InputMessenger. + LOG(ERROR) << "body_size=" << body_size << " from " + << socket->remote_side() << " is too large"; + pe = PARSE_ERROR_TOO_BIG_DATA; + break; + } else if (source->length() < sizeof(header_buf) + body_size) { + pe = PARSE_ERROR_NOT_ENOUGH_DATA; + break; + } + if (meta_size > body_size) { + LOG(ERROR) << "meta_size=" << meta_size << " is bigger than body_size=" + << body_size; + // Pop the message + source->pop_front(sizeof(header_buf) + body_size); + pe = PARSE_ERROR_TRY_OTHERS; + break; + } + } while (0); + + if (pe != PARSE_OK) { + host_allocator->DeallocateRaw(prefetch_d2h_data); + return MakeParseError(pe); + } + + source->pop_front(sizeof(header_buf)); + MostCommonMessage* msg = MostCommonMessage::Get(); + + if (header_size + meta_size <= n) { + auto deleter = [host_allocator, prefetch_d2h_data](void* data) { host_allocator->DeallocateRaw(prefetch_d2h_data); }; + // n is the bytes we real frefetch. We set n as the meta and n will be used in ProcessRpcRequest/ProcessRpcResponse. + // This is a trick, we should keep n in another better way. + msg->meta.append_user_data_with_meta((char*)prefetch_d2h_data + header_size, meta_size, deleter, n); + source->pop_front(meta_size); + } else { + host_allocator->DeallocateRaw(prefetch_d2h_data); + source->cutn_from_gpu(&msg->meta, meta_size); + } + source->cutn(&msg->payload, body_size - meta_size); + return MakeMessage(msg); +} + + +void FillReqBufGpu(butil::IOBuf* req_buf, MostCommonMessage* msg, int body_without_attachment_size) { + int meta_size = msg->meta.size(); + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "message is not on gpu!!!"; + } + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + req_buf->append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(req_buf, body_without_attachment_size); + } +} + +void FillResBufGpu(butil::IOBuf* res_buf, MostCommonMessage* msg, const RpcMeta& meta, + butil::IOBuf** res_buf_ptr, Controller* cntl) { + const int res_size = msg->payload.length(); + int meta_size = msg->meta.size(); + bool is_gpu_memory = msg->payload.is_gpu_memory(); + if (!is_gpu_memory) { + LOG(FATAL) << "message is not on gpu!!!"; + } + if (meta.has_attachment_size()) { + if (meta.attachment_size() > res_size) { + cntl->SetFailed( + ERESPONSE, "attachment_size=%d is larger than response_size=%d", + meta.attachment_size(), res_size); + return; + } + int body_without_attachment_size = res_size - meta.attachment_size(); + + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + body_without_attachment_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf->append((char*)data + meta_size, body_without_attachment_size); + msg->payload.pop_front(body_without_attachment_size); + } else { + msg->payload.cutn_from_gpu(res_buf, body_without_attachment_size); + } + *res_buf_ptr = res_buf; + cntl->response_attachment().swap(msg->payload); + } else { + int64_t real_prefetch_d2h_size = msg->meta.get_first_data_meta(); + if (header_size + meta_size + res_size <= real_prefetch_d2h_size) { + void* data = msg->meta.get_first_data_ptr(); + if (data == nullptr) { + LOG(FATAL) << "illegal data!!!"; + } + res_buf->append((char*)data + meta_size, res_size); + msg->payload.pop_front(res_size); + } else { + msg->payload.cutn_from_gpu(res_buf, res_size); + } + *res_buf_ptr = res_buf; + } +} + +} // namespace policy +} // namespace brpc + +#endif diff --git a/src/brpc/rdma/rdma_endpoint.cpp b/src/brpc/rdma/rdma_endpoint.cpp index f09d723ca1..1eb576d7fa 100644 --- a/src/brpc/rdma/rdma_endpoint.cpp +++ b/src/brpc/rdma/rdma_endpoint.cpp @@ -19,6 +19,9 @@ #include #include "butil/fd_utility.h" +#if BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif #include "butil/logging.h" // CHECK, LOG #include "butil/sys_byteorder.h" // HostToNet,NetToHost #include "bthread/bthread.h" @@ -57,7 +60,7 @@ DEFINE_bool(rdma_recv_zerocopy, true, "Enable zerocopy for receive side"); DEFINE_int32(rdma_zerocopy_min_size, 512, "The minimal size for receive zerocopy"); DEFINE_int32(rdma_cqe_poll_once, 32, "The maximum of cqe number polled once."); DEFINE_int32(rdma_prepared_qp_size, 128, "SQ and RQ size for prepared QP."); -DEFINE_int32(rdma_prepared_qp_cnt, 1024, "Initial count of prepared QP."); +DEFINE_int32(rdma_prepared_qp_cnt, 256, "Initial count of prepared QP."); DEFINE_bool(rdma_trace_verbose, false, "Print log message verbosely"); BRPC_VALIDATE_GFLAG(rdma_trace_verbose, brpc::PassValidate); DEFINE_bool(rdma_use_polling, false, "Use polling mode for RDMA."); @@ -90,6 +93,7 @@ static uint16_t g_rdma_hello_msg_len = 40; // In Byte static uint16_t g_rdma_hello_version = 2; static uint16_t g_rdma_impl_version = 1; static uint32_t g_rdma_recv_block_size = 0; +static uint32_t g_gdr_recv_block_size = 0; // static const uint32_t MAX_INLINE_DATA = 64; static const uint8_t MAX_HOP_LIMIT = 16; @@ -166,8 +170,9 @@ RdmaResource::~RdmaResource() { } } -RdmaEndpoint::RdmaEndpoint(Socket* s) +RdmaEndpoint::RdmaEndpoint(Socket* s, bool use_gdr) : _socket(s) + , _use_gdr(use_gdr) , _state(UNINIT) , _resource(NULL) , _send_cq_events(0) @@ -450,7 +455,7 @@ void* RdmaEndpoint::ProcessHandshakeAtClient(void* arg) { local_msg.msg_len = g_rdma_hello_msg_len; local_msg.hello_ver = g_rdma_hello_version; local_msg.impl_ver = g_rdma_impl_version; - local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.lid = GetRdmaLid(); @@ -668,7 +673,7 @@ void* RdmaEndpoint::ProcessHandshakeAtServer(void* arg) { } else { local_msg.lid = GetRdmaLid(); local_msg.gid = GetRdmaGid(); - local_msg.block_size = g_rdma_recv_block_size; + local_msg.block_size = ep->use_gdr() ? g_gdr_recv_block_size : g_rdma_recv_block_size; local_msg.sq_size = ep->_sq_size; local_msg.rq_size = ep->_rq_size; local_msg.hello_ver = g_rdma_hello_version; @@ -1001,8 +1006,15 @@ ssize_t RdmaEndpoint::HandleCompletion(ibv_wc& wc) { case IBV_WC_RECV: { // recv completion // Please note that only the first wc.byte_len bytes is valid if (wc.byte_len > 0) { - if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { - zerocopy = false; +#if BRPC_WITH_GDR + if (_use_gdr) { + zerocopy = true; + } else +#endif // BRPC_WITH_GDR + { + if (wc.byte_len < (uint32_t)FLAGS_rdma_zerocopy_min_size) { + zerocopy = false; + } } CHECK(_state != FALLBACK_TCP); if (zerocopy) { @@ -1063,26 +1075,76 @@ int RdmaEndpoint::DoPostRecv(void* block, size_t block_size) { return 0; } +int RdmaEndpoint::DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey) { + ibv_recv_wr wr; + memset(&wr, 0, sizeof(wr)); + ibv_sge sge; + sge.addr = (uint64_t)block; + sge.length = block_size; + sge.lkey = lkey; + wr.num_sge = 1; + wr.sg_list = &sge; + //LOG(INFO) << "POST recv: addr=0x" << std::hex << sge.addr + // << std::dec << " length=0x" << sge.length + // << " lkey=0x" << sge.lkey; + //LOG(INFO) << block << " " << _device_allocator->get_lkey(); + ibv_recv_wr* bad = NULL; + int err = ibv_post_recv(_resource->qp, &wr, &bad); + if (err != 0) { + LOG(WARNING) << "Fail to ibv_post_recv: " << berror(err); + return -1; + } + return 0; +} + int RdmaEndpoint::PostRecv(uint32_t num, bool zerocopy) { // We do the post repeatedly from the _rbuf[_rq_received]. while (num > 0) { +#if BRPC_WITH_GDR + uint32_t lkey = 0; +#endif // if BRPC_WITH_GDR if (zerocopy) { _rbuf[_rq_received].clear(); - butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], - g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); - int size = 0; - if (!os.Next(&_rbuf_data[_rq_received], &size)) { - // Memory is not enough for preparing a block - PLOG(WARNING) << "Fail to allocate rbuf"; - return -1; - } else { - CHECK(static_cast(size) == g_rdma_recv_block_size) << size; + +#if BRPC_WITH_GDR + if (_use_gdr) { + butil::gdr::BlockPoolAllocator* device_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_allocator(); + void* device_ptr = device_allocator->AllocateRaw(g_gdr_recv_block_size); + auto deleter = [device_allocator](void* data) { device_allocator->DeallocateRaw(data); }; + lkey = device_allocator->get_lkey(device_ptr); + // we keep lkey into the meta, and this is a thick. we also keep prefetch d2h size in meta too. + _rbuf[_rq_received].append_user_data_with_meta(device_ptr, g_gdr_recv_block_size, deleter, lkey); + _rbuf_data[_rq_received] = device_ptr; + } else +#endif // if BRPC_WITH_GDR + { + butil::IOBufAsZeroCopyOutputStream os(&_rbuf[_rq_received], + g_rdma_recv_block_size + IOBUF_BLOCK_HEADER_LEN); + int size = 0; + if (!os.Next(&_rbuf_data[_rq_received], &size)) { + // Memory is not enough for preparing a block + PLOG(WARNING) << "Fail to allocate rbuf"; + return -1; + } else { + CHECK(static_cast(size) == g_rdma_recv_block_size) << size; + } } } - if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { - _rbuf[_rq_received].clear(); - return -1; +#if BRPC_WITH_GDR + if (_use_gdr) { + if (DoPostRecvGDR(_rbuf_data[_rq_received], g_gdr_recv_block_size, lkey) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } + } else +#endif // if BRPC_WITH_GDR + { + if (DoPostRecv(_rbuf_data[_rq_received], g_rdma_recv_block_size) < 0) { + _rbuf[_rq_received].clear(); + return -1; + } } + --num; ++_rq_received; if (_rq_received == _rq_size) { @@ -1659,6 +1721,14 @@ void RdmaEndpoint::DebugInfo(std::ostream& os, butil::StringPiece connector) con << connector << "rdma_unsignaled_sq_wr=" << _sq_unsignaled; } +int RdmaEndpoint::GlobalGdrInitialize() { +#if BRPC_WITH_GDR + g_gdr_recv_block_size = butil::gdr::GetGdrBlockSize() - IOBUF_BLOCK_HEADER_LEN; + LOG(INFO) << "g_gdr_recv_block_size: " << g_gdr_recv_block_size; +#endif // BRPC_WITH_GDR + return 0; +} + int RdmaEndpoint::GlobalInitialize() { g_rdma_recv_block_size = GetRdmaBlockSize() - IOBUF_BLOCK_HEADER_LEN; if (g_rdma_recv_block_size <= 0) { diff --git a/src/brpc/rdma/rdma_endpoint.h b/src/brpc/rdma/rdma_endpoint.h index 54a008f1f7..d6e891903e 100644 --- a/src/brpc/rdma/rdma_endpoint.h +++ b/src/brpc/rdma/rdma_endpoint.h @@ -31,7 +31,6 @@ #include "butil/containers/mpsc_queue.h" #include "brpc/socket.h" - namespace brpc { class Socket; namespace rdma { @@ -75,15 +74,21 @@ class BAIDU_CACHELINE_ALIGNMENT RdmaEndpoint : public SocketUser { friend class RdmaConnect; friend class Socket; public: - explicit RdmaEndpoint(Socket* s); + explicit RdmaEndpoint(Socket* s, bool use_gdr = false); ~RdmaEndpoint() override; - // Global initialization + // Global Rdma initialization // Return 0 if success, -1 if failed and errno set static int GlobalInitialize(); + // Global Gdr initialization + // Return 0 if success, -1 if failed and errno set + static int GlobalGdrInitialize(); + static void GlobalRelease(); + bool use_gdr() { return _use_gdr; } + // Reset the endpoint (for next use) void Reset(); @@ -177,6 +182,16 @@ friend class Socket; // -1: failed, errno set int DoPostRecv(void* block, size_t block_size); + // Post a WR pointing to the gpu block to the local Recv Queue + // Arguments: + // block: the gpu addr to receive data (ibv_sge.addr) + // block_size: the maximum length can be received (ibv_sge.length) + // lkey: the lkey of block + // Return: + // 0: success + // -1: failed, errno set + int DoPostRecvGDR(void* block, size_t block_size, uint32_t lkey); + // Read at most len bytes from fd in _socket to data // wait for _read_butex if encounter EAGAIN // return -1 if encounter other errno (including EOF) @@ -222,6 +237,9 @@ friend class Socket; // Not owner Socket* _socket; + // whether open gpu direct rdma + bool _use_gdr; + // State of Handshake State _state; diff --git a/src/brpc/rdma/rdma_helper.cpp b/src/brpc/rdma/rdma_helper.cpp index 96348902f8..e04c4756e9 100644 --- a/src/brpc/rdma/rdma_helper.cpp +++ b/src/brpc/rdma/rdma_helper.cpp @@ -25,6 +25,9 @@ #include "butil/containers/flat_map.h" // butil::FlatMap #include "butil/fd_guard.h" #include "butil/fd_utility.h" // butil::make_non_blocking +#if BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif #include "butil/logging.h" #include "brpc/socket.h" #include "brpc/rdma/block_pool.h" @@ -86,6 +89,8 @@ static uint16_t g_lid; static int g_max_sge = 0; static uint8_t g_port_num = 1; +static int g_gpu_index = 0; + static int g_comp_vector_index = 0; butil::atomic g_rdma_available(false); @@ -95,7 +100,7 @@ DEFINE_string(rdma_device, "", "The name of the HCA device used " "(Empty means using the first active device)"); DEFINE_int32(rdma_port, 1, "The port number to use. For RoCE, it is always 1."); DEFINE_int32(rdma_gid_index, -1, "The GID index to use. -1 means using the last one."); - +DEFINE_int32(gpu_index, 0, "The GPU device index to use. In GDR, we suggest to use the GPU that is connected to the same PCIe switch with rdma devices"); // static const size_t SYSFS_SIZE = 4096; static ibv_device** g_devices = NULL; static ibv_context* g_context = NULL; @@ -588,7 +593,26 @@ static void GlobalRdmaInitializeOrDieImpl() { g_rdma_available.store(true, butil::memory_order_relaxed); } +static void GlobalGdrInitializeOrDieImpl() { +#if BRPC_WITH_GDR + g_gpu_index = FLAGS_gpu_index; + + if (!butil::gdr::InitGPUBlockPool(g_gpu_index, GetRdmaPd())) { + PLOG(ERROR) << "Fail to initialize RDMA GPU memory pool"; + ExitWithError(); + } + if (RdmaEndpoint::GlobalGdrInitialize() < 0) { + LOG(ERROR) << "gdr_block_size_kb incorrect " + << "(must be larger than 0)"; + ExitWithError(); + } + +#endif // if BRPC_WITH_GDR + +} + static pthread_once_t initialize_rdma_once = PTHREAD_ONCE_INIT; +static pthread_once_t initialize_gdr_once = PTHREAD_ONCE_INIT; void GlobalRdmaInitializeOrDie() { if (pthread_once(&initialize_rdma_once, @@ -598,6 +622,14 @@ void GlobalRdmaInitializeOrDie() { } } +void GlobalGdrInitializeOrDie() { + if (pthread_once(&initialize_gdr_once, + GlobalGdrInitializeOrDieImpl) != 0) { + LOG(FATAL) << "Fail to pthread_once GlobalGdrInitializeOrDie"; + exit(1); + } +} + uint32_t RegisterMemoryForRdma(void* buf, size_t len) { ibv_mr* mr = IbvRegMr(g_pd, buf, len, IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_RELAXED_ORDERING); if (!mr) { @@ -691,6 +723,10 @@ uint8_t GetRdmaPortNum() { return g_port_num; } +int GetGPUIndex() { + return g_gpu_index; +} + bool IsRdmaAvailable() { return g_rdma_available.load(butil::memory_order_acquire); } diff --git a/src/brpc/rdma/rdma_helper.h b/src/brpc/rdma/rdma_helper.h index 052763325b..06cbb1f5c2 100644 --- a/src/brpc/rdma/rdma_helper.h +++ b/src/brpc/rdma/rdma_helper.h @@ -33,6 +33,10 @@ namespace rdma { // Exit if failed void GlobalRdmaInitializeOrDie(); +// Initialize GDR environment +// Exit if failed +void GlobalGdrInitializeOrDie(); + // Initialize RDMA polling mode with tag bool InitPollingModeWithTag(bthread_tag_t tag, std::function callback = nullptr, @@ -74,6 +78,9 @@ int GetRdmaCompVector(); // Return current port number used uint8_t GetRdmaPortNum(); +// Get GPU index used +int GetGPUIndex(); + // Get max_sge supported by the device int GetRdmaMaxSge(); diff --git a/src/brpc/rdma_transport.cpp b/src/brpc/rdma_transport.cpp index 88d89a7b06..97b231ddcc 100644 --- a/src/brpc/rdma_transport.cpp +++ b/src/brpc/rdma_transport.cpp @@ -29,10 +29,13 @@ DECLARE_bool(usercode_in_pthread); extern SocketVarsCollector *g_vars; -void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { +void RdmaTransport::DoInit(Socket *socket, const SocketOptions &options, bool use_gdr) { CHECK(_rdma_ep == NULL); - if (options.socket_mode == SOCKET_MODE_RDMA) { - _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket); + // gdr mode is a special mode of rdma mode. + // both rdma mode and gdr mode need init rdma::RdmaEndpoint. + if (options.socket_mode == SOCKET_MODE_RDMA || + options.socket_mode == SOCKET_MODE_GDR) { + _rdma_ep = new(std::nothrow)rdma::RdmaEndpoint(socket, use_gdr); if (!_rdma_ep) { const int saved_errno = errno; PLOG(ERROR) << "Fail to create RdmaEndpoint"; @@ -54,6 +57,10 @@ void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { _tcp_transport->Init(socket, options); } +void RdmaTransport::Init(Socket *socket, const SocketOptions &options) { + DoInit(socket, options, false); +} + void RdmaTransport::Release() { if (_rdma_ep) { delete _rdma_ep; diff --git a/src/brpc/rdma_transport.h b/src/brpc/rdma_transport.h index 65ae88f7a6..bb579c6ac5 100644 --- a/src/brpc/rdma_transport.h +++ b/src/brpc/rdma_transport.h @@ -29,6 +29,7 @@ class RdmaTransport : public Transport { friend class rdma::RdmaEndpoint; friend class rdma::RdmaConnect; public: + void DoInit(Socket* socket, const SocketOptions& options, bool use_gdr); void Init(Socket* socket, const SocketOptions& options) override; void Release() override; int Reset(int32_t expected_nref) override; @@ -62,4 +63,4 @@ class RdmaTransport : public Transport { }; } // namespace brpc #endif // BRPC_WITH_RDMA -#endif //BRPC_RDMA_TRANSPORT_H \ No newline at end of file +#endif //BRPC_RDMA_TRANSPORT_H diff --git a/src/brpc/socket_mode.h b/src/brpc/socket_mode.h index b5d42be4aa..9fb0276efa 100644 --- a/src/brpc/socket_mode.h +++ b/src/brpc/socket_mode.h @@ -20,7 +20,8 @@ namespace brpc { enum SocketMode { SOCKET_MODE_TCP = 0, - SOCKET_MODE_RDMA = 1 + SOCKET_MODE_RDMA = 1, + SOCKET_MODE_GDR = 2 }; } // namespace brpc -#endif //BRPC_SOCKET_MODE_H \ No newline at end of file +#endif //BRPC_SOCKET_MODE_H diff --git a/src/brpc/transport_factory.cpp b/src/brpc/transport_factory.cpp index b689e2edd2..76623f505c 100644 --- a/src/brpc/transport_factory.cpp +++ b/src/brpc/transport_factory.cpp @@ -18,6 +18,7 @@ #include "brpc/transport_factory.h" #include "brpc/tcp_transport.h" #include "brpc/rdma_transport.h" +#include "brpc/gdr_transport.h" namespace brpc { int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const void* _options) { @@ -28,6 +29,15 @@ int TransportFactory::ContextInitOrDie(SocketMode mode, bool serverOrNot, const else if (mode == SOCKET_MODE_RDMA) { return RdmaTransport::ContextInitOrDie(serverOrNot, _options); } +#endif +#if BRPC_WITH_GDR + else if (mode == SOCKET_MODE_GDR) { + // gdr is a special case of rdma, so we should init rdma first; + if (RdmaTransport::ContextInitOrDie(serverOrNot, _options) < 0) { + return -1; + } + return GdrTransport::GdrContextInitOrDie(); + } #endif else { LOG(ERROR) << "unknown transport type " << mode; @@ -43,10 +53,15 @@ std::unique_ptr TransportFactory::CreateTransport(SocketMode mode) { else if (mode == SOCKET_MODE_RDMA) { return std::unique_ptr(new RdmaTransport()); } +#endif +#if BRPC_WITH_GDR + else if (mode == SOCKET_MODE_GDR) { + return std::unique_ptr(new GdrTransport()); + } #endif else { LOG(ERROR) << "socket_mode set error"; return nullptr; } } -} // namespace brpc \ No newline at end of file +} // namespace brpc diff --git a/src/butil/gpu/gpu_block_pool.cpp b/src/butil/gpu/gpu_block_pool.cpp new file mode 100644 index 0000000000..86a307ba06 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.cpp @@ -0,0 +1,475 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include "butil/fast_rand.h" +#include "gpu_block_pool.h" +namespace butil { +namespace gdr { +DEFINE_int32(gdr_block_size_kb, 512, "gdr block size in KB"); +DEFINE_int32(max_gdr_regions, 32, "max num of gdr regions"); + +#define CHECK_CUDA(call) \ +do { \ + auto _sts = (call); \ + if (_sts != cudaSuccess) { \ + LOG(FATAL) << " cuda error:" \ + << (cudaGetErrorString(_sts)) << std::string(" at ") \ + << __FILE__ << ": " << __LINE__; \ + } \ +} while (0); + +size_t GetGdrBlockSize() { + return FLAGS_gdr_block_size_kb * 1024; +} + +bool verify_same_context() { + static int original_device = -1; + static bool first_call = true; + + int current_device; + cudaGetDevice(¤t_device); + + if (first_call) { + original_device = current_device; + first_call = false; + return true; + } + + return (current_device == original_device); +} + +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + void *d_data; + + LOG(INFO) << "try to alloc " << gpu_mem_size << " bytes from gpu " << gpu_id; + + CHECK_CUDA(cudaMalloc(&d_data, gpu_mem_size)); + cudaDeviceSynchronize(); + return (void *)d_data; +} + +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + + LOG(INFO) << "try to alloc " << cpu_mem_size << " bytes from gpu " << gpu_id << "on host"; + + void* mem = NULL; + + CHECK_CUDA(cudaMallocHost(&mem, cpu_mem_size)); + + cudaDeviceSynchronize(); + + return mem; +} + + +BlockPoolAllocators* BlockPoolAllocators::instance_ = nullptr; + +BlockPoolAllocators* BlockPoolAllocators::singleton() { + static std::mutex mutex; + if (instance_ == nullptr) { + std::lock_guard l(mutex); + if (instance_ == nullptr) { + instance_ = new BlockPoolAllocators(); + std::atomic_thread_fence(std::memory_order_release); + } + } + std::atomic_thread_fence(std::memory_order_acquire); + return instance_; +} + +void BlockPoolAllocators::init(int gpu_id, ibv_pd* pd) { + LOG(INFO) << "set GPU BlockPoolAllocator for " << gpu_id; + size_t region_size = 1024LL * 1024 * 1024; + size_t block_size = FLAGS_gdr_block_size_kb * 1024; + gpu_mem_alloc = new BlockPoolAllocator(gpu_id, true, pd, block_size, region_size); + + region_size = 32LL * 1024 * 1024; + block_size = 512; + cpu_mem_alloc = new BlockPoolAllocator(gpu_id, false, pd, block_size, region_size); + + gpu_stream_pool = new GPUStreamPool(gpu_id); +} + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd) { + BlockPoolAllocators::singleton()->init(gpu_id, pd); + return true; +} + +class BlockHeaderList { + public: + BlockHeaderList() { + objects_.reserve(kMaxObjects); + } + virtual ~BlockHeaderList() { + for (size_t i = 0; i < objects_.size(); i++) { + delete objects_[i]; + } + } + + BlockHeader* New() { + { + std::lock_guard lock(mu_); + if (!objects_.empty()) { + BlockHeader* result = objects_.back(); + objects_.pop_back(); + return result; + } + } + return new BlockHeader; + } + void Release(BlockHeader* obj) { + obj->Reset(); + { + std::lock_guard lock(mu_); + if (objects_.size() < kMaxObjects) { + objects_.push_back(obj); + return; + } + } + delete obj; + } + + private: + static const int kMaxObjects = 100000; + + std::mutex mu_; + std::vector objects_; +}; + +static BlockHeaderList* get_bh_list() { + static BlockHeaderList* bh_list = new BlockHeaderList(); + return bh_list; +} + +BlockPoolAllocator::BlockPoolAllocator(int gpuId, bool onGpu, ibv_pd* ibvPd, + size_t blockSize, size_t regionSize) : + gpu_id(gpuId) + , on_gpu(onGpu) + , pd(ibvPd) + , BLOCK_SIZE(std::max(blockSize, sizeof(BlockHeader))) + , REGION_SIZE((regionSize / blockSize) * blockSize) // 对齐到块大小的倍数 + , freeList(nullptr) + , g_region_num(0) + , totalAllocated(0) + , totalDeallocated(0) + , peakUsage(0) { + g_regions.resize(FLAGS_max_gdr_regions); + LOG(INFO) << "Memory Pool initialized: block_size=" << BLOCK_SIZE + << ", region_size=" << REGION_SIZE << ", max_gdr_regions=" << FLAGS_max_gdr_regions + << ", gpu_id=" << gpu_id << ", on_gpu=" << on_gpu << ", pd=" << pd; + + extendRegion(); +} + +BlockPoolAllocator::~BlockPoolAllocator() { +#ifdef DEBUG + printStatistics(); +#endif + + for (int i = 0; i < FLAGS_max_gdr_regions; i++) { + Region* r = &g_regions[i]; + if (!r->mr) { + return; + } + + LOG(INFO) << "try to free " << r->size << " bytes from gpu " << gpu_id << ", on_gpu " << on_gpu; + ibv_dereg_mr(r->mr); + if (on_gpu) { + CHECK_CUDA(cudaFree(reinterpret_cast(r->start))); + } else { + CHECK_CUDA(cudaFreeHost(reinterpret_cast(r->start))); + } + } +} + +Region* BlockPoolAllocator::GetRegion(const void* buf) { + if (!buf) { + errno = EINVAL; + return NULL; + } + Region* r = NULL; + uintptr_t addr = (uintptr_t)buf; + for (int i = 0; i < FLAGS_max_gdr_regions; ++i) { + if (g_regions[i].aligned_start == 0) { + break; + } + if (addr >= g_regions[i].aligned_start && + addr < g_regions[i].aligned_start + g_regions[i].aligned_size) { + r = &g_regions[i]; + break; + } + } + return r; +} + +uint32_t BlockPoolAllocator::get_lkey(const void* buf) { + Region* r = GetRegion(buf); + if (!r) { + LOG(ERROR) << "can not get a region for buf " << buf; + return 0; + } + + if (!r->mr) { + LOG(FATAL) << "region has not been registered into rdma yet, addr:" << r->start; + return 0; + } + + return r->mr->lkey; +} + +void* BlockPoolAllocator::AllocateRaw(size_t num_bytes) { + if (num_bytes == 0) { + return nullptr; + } + if (num_bytes > BLOCK_SIZE) { + LOG(FATAL) << "try to alloc " << num_bytes << " bytes, its bigger than block_size " << BLOCK_SIZE; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + + std::lock_guard lock(poolMutex); + + if (!freeList) { + extendRegion(); + } + + BlockHeader* block = freeList; + freeList = freeList->next; + + void* addr = block->addr; + get_bh_list()->Release(block); + + totalAllocated++; + peakUsage = std::max(peakUsage, totalAllocated - totalDeallocated); + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + +#ifdef DEBUG + if (duration.count() > 1000) { + LOG(INFO) << "Slow allocation: " << duration.count() << " ns"; + } +#endif + + return addr; +} + +void BlockPoolAllocator::DeallocateRaw(void* ptr) { + if (!ptr) return; + + std::lock_guard lock(poolMutex); + + BlockHeader* block = get_bh_list()->New(); + block->addr = ptr; + block->next = freeList; + freeList = block; + + totalDeallocated++; +} + +void BlockPoolAllocator::printStatistics() const { + LOG(INFO) << "=== Memory Pool Statistics ==="; + LOG(INFO) << "Total regions: " << g_region_num + << ", Total blocks allocated: " << totalAllocated + << ", Total blocks deallocated: " << totalDeallocated + << ", Current usage: " << (totalAllocated - totalDeallocated) << " blocks" + << ", Peak usage: " << peakUsage << " blocks" + << ", Memory efficiency: " + << (static_cast(totalAllocated - totalDeallocated) / + (g_region_num * (REGION_SIZE / BLOCK_SIZE)) * 100) + << "%"; +} + +void BlockPoolAllocator::extendRegion() { + if (g_region_num == FLAGS_max_gdr_regions) { + LOG(FATAL) << "Gdr Memory pool reaches max regions"; + return ; + } + + auto startTime = std::chrono::high_resolution_clock::now(); + void* ptr = nullptr; + void* aligned_ptr = nullptr; + int alignment = 4096; + + if (on_gpu) { + ptr = get_gpu_mem(gpu_id, REGION_SIZE); + } else { + ptr = get_cpu_mem(gpu_id, REGION_SIZE); + } + + aligned_ptr = (void*)(((uintptr_t)ptr + alignment - 1) & ~(alignment - 1)); + + int64_t aligned_bytes = REGION_SIZE; + if (ptr != aligned_ptr) { + uintptr_t region_end = uintptr_t(ptr) + REGION_SIZE; + uintptr_t aligned_end_ptr = region_end & ~(alignment - 1); + aligned_bytes = uintptr_t(aligned_end_ptr) - uintptr_t(aligned_ptr); + LOG(WARNING) << "addr is not aligned with 4096: " << ptr << ", aligned_bytes: " << aligned_bytes + << ", region_size: " << REGION_SIZE; + } + + LOG(INFO) << "reg_mr for ptr: " << aligned_ptr << ", size:" << aligned_bytes; + auto mr = ibv_reg_mr(pd, aligned_ptr, aligned_bytes, + IBV_ACCESS_LOCAL_WRITE | + IBV_ACCESS_REMOTE_READ | + IBV_ACCESS_REMOTE_WRITE | + IBV_ACCESS_RELAXED_ORDERING); + + if (!mr) { + LOG(FATAL) << "Failed to register MR: " << strerror(errno) + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } else { + LOG(INFO) << "Success to register MR: " + << ", pd " << pd << ", aligned_ptr:" << aligned_ptr; + } + + LOG(INFO) << "try to init region, g_region_num:" << g_region_num; + size_t blockCount = aligned_bytes / BLOCK_SIZE; + Region* region = &g_regions[g_region_num++]; + region->start = (uintptr_t)ptr; + region->aligned_start = (uintptr_t)aligned_ptr; + region->mr = mr; + region->size = REGION_SIZE; + region->aligned_size = aligned_bytes; + region->blockCount = blockCount; + + + LOG(INFO) << "try to insert list, freeList:" << freeList << ", blockCount:" << blockCount; + BlockHeader* lastBlock = nullptr; + for (size_t i = 0; i < blockCount; ++i) { + BlockHeader* block = get_bh_list()->New(); + block->addr = reinterpret_cast(static_cast(aligned_ptr) + i * BLOCK_SIZE); + if (lastBlock != nullptr) { + lastBlock->next = block; + } else { + freeList = block; + } + lastBlock = block; + } + + if (lastBlock) { + lastBlock->next = nullptr; + } + + auto endTime = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(endTime - startTime); + + LOG(INFO) << "Extended region #" << g_region_num << ": " << blockCount + << " blocks (" << (REGION_SIZE / (1024 * 1024)) << " MB)" << ", on_gpu " << on_gpu + << ", cost " << duration.count() << " ns"; +} + +GPUStreamPool::GPUStreamPool(int gpu_id) : + gpu_id_(gpu_id) { + CHECK_CUDA(cudaSetDevice(gpu_id)); + d2d_streams_.resize(kMaxConcurrent); + d2h_streams_.resize(kMaxConcurrent); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamCreate(&d2d_streams_[i])); + CHECK_CUDA(cudaStreamCreate(&d2h_streams_[i])); + } + CHECK_CUDA(cudaDeviceSynchronize()); +} + +GPUStreamPool::~GPUStreamPool() { + CHECK_CUDA(cudaDeviceSynchronize()); + for (int i = 0; i < kMaxConcurrent; i++) { + CHECK_CUDA(cudaStreamDestroy(d2d_streams_[i])); + CHECK_CUDA(cudaStreamDestroy(d2h_streams_[i])); + } +} + +void GPUStreamPool::fast_d2d(std::vector& src_list, + std::vector& length_list, + void* dst) { +#ifdef DEBUG + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } +#endif + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2d_lb_lock_); + d2d_cnt_.fetch_add(1); + stream_idx = d2d_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2d_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToDevice, d2d_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2d_streams_[stream_idx])); +} + +void GPUStreamPool::fast_d2h(std::vector& src_list, + std::vector& length_list, + void* dst) { + if (!verify_same_context()) { + LOG(FATAL) << "Context mismatch!"; + return; + } + int64_t offset = 0; + int segs = src_list.size(); + if (segs == 0) return; + if (segs != length_list.size()) { + LOG(FATAL) << "src list size is not equal with length list size!!!"; + } + + int stream_idx = 0; + { + std::lock_guard stream_lb_lock(d2h_lb_lock_); + d2h_cnt_.fetch_add(1); + stream_idx = d2h_cnt_ % kMaxConcurrent; + } + std::lock_guard stream_lock(d2h_locks_[stream_idx]); + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); + for (int i = 0; i < segs; i++) { + if (length_list[i] == 0) { + continue; + } + CHECK_CUDA(cudaMemcpyAsync(static_cast(dst) + offset, src_list[i], length_list[i], + cudaMemcpyDeviceToHost, d2h_streams_[stream_idx])); + offset += length_list[i]; + } + CHECK_CUDA(cudaStreamSynchronize(d2h_streams_[stream_idx])); +} + +} +} + +#endif // BRPC_WITH_GDR diff --git a/src/butil/gpu/gpu_block_pool.h b/src/butil/gpu/gpu_block_pool.h new file mode 100644 index 0000000000..655c487a92 --- /dev/null +++ b/src/butil/gpu/gpu_block_pool.h @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +#ifndef BUTIL_GPU_GPU_BLOCK_POOL_H +#define BUTIL_GPU_GPU_BLOCK_POOL_H + +#if BRPC_WITH_GDR + +#include +#include +#include +#include +#include +#include +#include +#include "butil/containers/hash_tables.h" +#include "butil/logging.h" +#include +#include "cuda.h" + +namespace butil { +namespace gdr { + +size_t GetGdrBlockSize(); +void* get_gpu_mem(int gpu_id, int64_t gpu_mem_size); +void* get_cpu_mem(int gpu_id, int64_t cpu_mem_size); + +bool InitGPUBlockPool(int gpu_id, ibv_pd* pd); + +struct Region { + Region() { start = 0; aligned_start = 0;} + uintptr_t start; + uintptr_t aligned_start; + + size_t size; + size_t aligned_size; + size_t blockCount; + struct ibv_mr *mr {nullptr}; +}; + +struct BlockHeader { + BlockHeader() { addr = nullptr; next = nullptr;} + void Reset() { addr = nullptr; next = nullptr; } + void* addr; + BlockHeader* next; +}; + +class BlockPoolAllocator { + private: + int gpu_id; + bool on_gpu; + ibv_pd* pd {nullptr}; + + const size_t BLOCK_SIZE; + const size_t REGION_SIZE; + + BlockHeader* freeList; + int g_region_num {0}; + std::vector g_regions; + std::mutex poolMutex; + + // stat + size_t totalAllocated; + size_t totalDeallocated; + size_t peakUsage; + + public: + explicit BlockPoolAllocator(int gpuId, + bool onGpu, ibv_pd* ibvPd, + size_t blockSize, size_t regionSize); + + ~BlockPoolAllocator(); + + void* AllocateRaw(size_t num_bytes); + + void DeallocateRaw(void* ptr); + + void printStatistics() const; + + int64_t getCurrentUsage() const { + return totalAllocated - totalDeallocated; + } + + int64_t getTotalMemory() const { + return g_region_num * REGION_SIZE; + } + + int64_t get_block_size() const { + return BLOCK_SIZE; + } + + Region* GetRegion(const void* buf); + + uint32_t get_lkey(const void* buf); + + private: + void extendRegion(); +}; + +class GPUStreamPool { +public: + explicit GPUStreamPool(int gpu_id); + + ~GPUStreamPool(); + + GPUStreamPool(const GPUStreamPool&) = delete; + GPUStreamPool& operator=(const GPUStreamPool&) = delete; + + void fast_d2h(std::vector& src_list, std::vector& length_list, void* dst); + + void fast_d2d(std::vector& src_list, std::vector& length_list, void* dst); + + static constexpr int kMaxConcurrent = 32; +private: + int gpu_id_ {-1}; + std::atomic d2h_cnt_ {0}; + std::atomic d2d_cnt_ {0}; + std::mutex d2h_locks_[kMaxConcurrent]; + std::mutex d2d_locks_[kMaxConcurrent]; + std::mutex d2h_lb_lock_; + std::mutex d2d_lb_lock_; + std::vector d2h_streams_; + std::vector d2d_streams_; +}; + +class BlockPoolAllocators { +public: + static BlockPoolAllocators* singleton(); + BlockPoolAllocators() {} + virtual ~BlockPoolAllocators() { + CHECK_EQ(this, instance_); + instance_ = nullptr; + } + + void init(int gpu_id, ibv_pd* pd); + + BlockPoolAllocator* get_gpu_allocator() { + return gpu_mem_alloc; + } + + BlockPoolAllocator* get_cpu_allocator() { + return cpu_mem_alloc; + } + + GPUStreamPool* get_gpu_stream_pool() { + return gpu_stream_pool; + } + +public: + static BlockPoolAllocators* instance_; + +private: + BlockPoolAllocator* gpu_mem_alloc {nullptr}; + BlockPoolAllocator* cpu_mem_alloc {nullptr}; + GPUStreamPool* gpu_stream_pool {nullptr}; +}; +} +} + +#endif // BRPC_WITH_GDR + +#endif diff --git a/src/butil/iobuf.cpp b/src/butil/iobuf.cpp index af77d968cf..77bf5c9fcc 100644 --- a/src/butil/iobuf.cpp +++ b/src/butil/iobuf.cpp @@ -40,6 +40,9 @@ #include "butil/fd_guard.h" // butil::fd_guard #include "butil/iobuf.h" #include "butil/iobuf_profiler.h" +#ifdef BRPC_WITH_GDR +#include "butil/gpu/gpu_block_pool.h" +#endif namespace butil { static size_t default_block_size = 8192; @@ -737,6 +740,46 @@ size_t IOBuf::cutn(IOBuf* out, size_t n) { return saved_n; } +#if BRPC_WITH_GDR +size_t IOBuf::cutn_from_gpu(IOBuf* out, size_t n) { + if (n == 0) { + return 0; + } + + butil::gdr::BlockPoolAllocator* host_allocator = butil::gdr::BlockPoolAllocators::singleton()->get_cpu_allocator(); + bool alloc_from_host_alloc = (n <= host_allocator->get_block_size()); + void* mem = NULL; + if (alloc_from_host_alloc) { + mem = host_allocator->AllocateRaw(n); + } else { + mem = malloc(n); + } + + if (mem == NULL) { + return 0; + } + size_t saved_n = copy_from_gpu(mem, n, 0, false); + if (saved_n > 0) { + if (alloc_from_host_alloc) { + auto deleter = [host_allocator](void* data) { host_allocator->DeallocateRaw(data); }; + out->append_user_data(mem, saved_n, deleter); + } else { + auto deleter = [](void* data) { free(data); }; + out->append_user_data(mem, saved_n, deleter); + } + pop_front(saved_n); + } else { + if (alloc_from_host_alloc) { + host_allocator->DeallocateRaw(mem); + } else { + free(mem); + } + } + + return saved_n; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::cutn(void* out, size_t n) { const size_t len = length(); if (n > len) { @@ -1170,6 +1213,15 @@ uint64_t IOBuf::get_first_data_meta() { return r.block->u.data_meta; } +void* IOBuf::get_first_data_ptr() { + if (_ref_num() == 0) { + return 0; + } + IOBuf::BlockRef const& r = _ref_at(0); + return r.block->data; +} + + int IOBuf::resize(size_t n, char c) { const size_t saved_len = length(); if (n < saved_len) { @@ -1332,6 +1384,46 @@ size_t IOBuf::copy_to(void* d, size_t n, size_t pos) const { return n - m; } +#if BRPC_WITH_GDR +size_t IOBuf::copy_from_gpu(void* d, size_t n, size_t pos, bool to_gpu) const { + if (n == 0) { + return 0; + } + const size_t nref = _ref_num(); + // Skip `pos' bytes. `offset' is the starting position in starting BlockRef. + size_t offset = pos; + size_t i = 0; + for (; offset != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + if (offset < (size_t)r.length) { + break; + } + offset -= r.length; + } + + butil::gdr::GPUStreamPool* gpu_stream_pool = butil::gdr::BlockPoolAllocators::singleton()->get_gpu_stream_pool(); + size_t m = n; + std::vector src_list; + std::vector length_list; + for (; m != 0 && i < nref; ++i) { + IOBuf::BlockRef const& r = _ref_at(i); + const size_t nc = std::min(m, (size_t)r.length - offset); + void* gpu_src = r.block->data + r.offset + offset; + src_list.push_back(gpu_src); + length_list.push_back(nc); + offset = 0; + m -= nc; + } + if (to_gpu) { + gpu_stream_pool->fast_d2d(src_list, length_list, d); + } else { + gpu_stream_pool->fast_d2h(src_list, length_list, d); + } + // If nref == 0, here returns 0 correctly + return n - m; +} +#endif // BRPC_WITH_GDR + size_t IOBuf::copy_to(std::string* s, size_t n, size_t pos) const { const size_t len = length(); if (len <= pos) { @@ -1478,6 +1570,16 @@ bool IOBuf::equals(const butil::IOBuf& other) const { return true; } +#if BRPC_WITH_GDR +// when IOBuf is used for send, data_meta is set by user; +// when IOBf is used for recv and gdr is open, data_meta is set by brpc +// and it is lkey. +bool IOBuf::is_gpu_memory() { + uint64_t data_meta = get_first_data_meta(); + return (data_meta > 0 && data_meta <= UINT_MAX); +} +#endif + ////////////////////////////// IOPortal ////////////////// IOPortal::~IOPortal() { return_cached_blocks(); } diff --git a/src/butil/iobuf.h b/src/butil/iobuf.h index 978aa34fe3..487279c2a9 100644 --- a/src/butil/iobuf.h +++ b/src/butil/iobuf.h @@ -144,6 +144,13 @@ friend class SingleIOBuf; size_t cutn(IOBuf* out, size_t n); size_t cutn(void* out, size_t n); size_t cutn(std::string* out, size_t n); + +#if BRPC_WITH_GDR + size_t cutn_from_gpu(IOBuf* out, size_t n); + size_t copy_from_gpu(void* d, size_t n, size_t pos = 0, bool to_gpu = false) const; + bool is_gpu_memory(); +#endif // BRPC_WITH_GDR + // Cut off 1 byte from the front side and set to *c // Return true on cut, false otherwise. bool cut1(void* c); @@ -263,6 +270,9 @@ friend class SingleIOBuf; // 0 means the meta is invalid. uint64_t get_first_data_meta(); + // Get the data addr of the first byte in this IOBuf. + void* get_first_data_ptr(); + // Resizes the buf to a length of n characters. // If n is smaller than the current length, all bytes after n will be // truncated.