Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 44 additions & 5 deletions src/brpc/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ void Controller::ResetPods() {
_request_streams.clear();
_response_streams.clear();
_remote_stream_settings = NULL;
set_bind_sock_action(BIND_SOCK_NONE);
_bind_sock.reset();
_session_data = NULL;
_auth_flags = 0;
_rpc_received_us = 0;
}
Expand All @@ -308,6 +311,12 @@ Controller::Call::Call(Controller::Call* rhs)
, peer_id(rhs->peer_id)
, begin_time_us(rhs->begin_time_us)
, sending_sock(rhs->sending_sock.release())
// A backup/retry call must behave normally w.r.t. socket disposal; it never
// inherits the originating call's reserve/use affinity. Leaving this
// uninitialized lets OnComplete read indeterminate bits and (when they
// happen to match RESERVE/USE) hijack the socket away from the pool-return
// path, hanging the RPC. Initialize explicitly, matching Reset().
, bind_sock_action(BIND_SOCK_NONE)
, stream_user_data(rhs->stream_user_data) {
// NOTE: fields in rhs should be reset because RPC could fail before
// setting all the fields to next call and _current_call.OnComplete
Expand All @@ -328,6 +337,7 @@ void Controller::Call::Reset() {
peer_id = INVALID_SOCKET_ID;
begin_time_us = 0;
sending_sock.reset(NULL);
bind_sock_action = BIND_SOCK_NONE;
stream_user_data = NULL;
}

Expand Down Expand Up @@ -824,7 +834,13 @@ void Controller::Call::OnComplete(
// assumption that one pooled connection cannot have more than one
// message at the same time.
if (sending_sock != NULL && (error_code == 0 || responded)) {
if (!sending_sock->is_read_progressive()) {
if (bind_sock_action == BIND_SOCK_RESERVE) {
// Reserve this socket on the controller for a following RPC
// (used by mysql transactions for connection affinity).
c->_bind_sock.reset(sending_sock.release());
} else if (bind_sock_action == BIND_SOCK_USE) {
// Socket is owned by the binder; do not return it to the pool.
} else if (!sending_sock->is_read_progressive()) {
// Normally-read socket which will not be used after RPC ends,
// safe to return. Notice that Socket::is_read_progressive may
// differ from Controller::is_response_read_progressively()
Expand All @@ -841,7 +857,11 @@ void Controller::Call::OnComplete(
case CONNECTION_TYPE_SHORT:
if (sending_sock != NULL) {
// Check the comment in CONNECTION_TYPE_POOLED branch.
if (!sending_sock->is_read_progressive()) {
if (bind_sock_action == BIND_SOCK_RESERVE) {
c->_bind_sock.reset(sending_sock.release());
} else if (bind_sock_action == BIND_SOCK_USE) {
// Socket is owned by the binder; do not fail it.
} else if (!sending_sock->is_read_progressive()) {
if (c->_stream_creator == NULL) {
sending_sock->SetFailed();
}
Expand Down Expand Up @@ -908,6 +928,9 @@ void Controller::EndRPC(const CompletionInfo& info) {
}
// TODO: Replace this with stream_creator.
HandleStreamConnection(_current_call.sending_sock.get());
// Propagate the reserve action; OnComplete only actually reserves the
// socket when the RPC succeeded (its error_code==0 || responded guard).
_current_call.bind_sock_action = bind_sock_action();
_current_call.OnComplete(this, _error_code, info.responded, true);
} else {
// Even if _unfinished_call succeeded, we don't use EBACKUPREQUEST
Expand Down Expand Up @@ -1092,7 +1115,19 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.need_feedback = false;
_current_call.enable_circuit_breaker = has_enabled_circuit_breaker();
SocketUniquePtr tmp_sock;
if (SingleServer()) {
if ((_connection_type & CONNECTION_TYPE_POOLED_AND_SHORT) &&
bind_sock_action() == BIND_SOCK_USE) {
// Reuse the socket reserved by a previous RPC (mysql transaction affinity).
tmp_sock.reset(_bind_sock.release());
if (!tmp_sock || (!is_health_check_call() && !tmp_sock->IsAvailable())) {
// NOTE: tmp_sock may be NULL here, so guard the id() deref.
SetFailed(EHOSTDOWN, "Not connected to bind socket yet, server_id=%" PRIu64,
tmp_sock ? tmp_sock->id() : (SocketId)0);
tmp_sock.reset(); // Release ref ASAP
return HandleSendFailed();
}
_current_call.peer_id = tmp_sock->id();
} else if (SingleServer()) {
// Don't use _current_call.peer_id which is set to -1 after construction
// of the backup call.
const int rc = Socket::Address(_single_server_id, &tmp_sock);
Expand Down Expand Up @@ -1157,7 +1192,10 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.sending_sock->set_preferred_index(_preferred_index);
} else {
int rc = 0;
if (_connection_type == CONNECTION_TYPE_POOLED) {
if (bind_sock_action() == BIND_SOCK_USE) {
// Already holding the reserved socket; use it directly.
_current_call.sending_sock.reset(tmp_sock.release());
} else if (_connection_type == CONNECTION_TYPE_POOLED) {
rc = tmp_sock->GetPooledSocket(&_current_call.sending_sock);
} else if (_connection_type == CONNECTION_TYPE_SHORT) {
rc = tmp_sock->GetShortSocket(&_current_call.sending_sock);
Expand All @@ -1179,7 +1217,8 @@ void Controller::IssueRPC(int64_t start_realtime_us) {
_current_call.sending_sock->set_preferred_index(_preferred_index);
// Set preferred_index of main_socket as well to make it easier to
// debug and observe from /connections.
if (tmp_sock->preferred_index() < 0) {
// NOTE: tmp_sock is NULL on the BIND_SOCK_USE path (released above).
if (tmp_sock && tmp_sock->preferred_index() < 0) {
tmp_sock->set_preferred_index(_preferred_index);
}
tmp_sock.reset();
Expand Down
45 changes: 45 additions & 0 deletions src/brpc/controller.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,15 @@ enum StopStyle {

const int32_t UNSET_MAGIC_NUM = -123456789;

// If a controller wants to reserve the sending socket after the RPC (used by
// mysql transactions for connection affinity), set BIND_SOCK_RESERVE; later RPCs
// reuse it via BIND_SOCK_USE.
enum BindSockAction {
BIND_SOCK_RESERVE,
BIND_SOCK_USE,
BIND_SOCK_NONE,
};

typedef butil::FlatMap<std::string, std::string> UserFieldsMap;

// A Controller mediates a single method call. The primary purpose of
Expand Down Expand Up @@ -145,6 +154,11 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
static const uint32_t FLAGS_PB_BYTES_TO_BASE64 = (1 << 11);
static const uint32_t FLAGS_ALLOW_DONE_TO_RUN_IN_PLACE = (1 << 12);
static const uint32_t FLAGS_USED_BY_RPC = (1 << 13);
// Reserve/reuse the sending socket after the RPC (mysql transactions).
// The two bits encode BindSockAction: neither=BIND_SOCK_NONE,
// RESERVE bit=BIND_SOCK_RESERVE, USE bit=BIND_SOCK_USE.
static const uint32_t FLAGS_BIND_SOCK_RESERVE = (1 << 14);
static const uint32_t FLAGS_BIND_SOCK_USE = (1 << 15);
static const uint32_t FLAGS_PB_JSONIFY_EMPTY_ARRAY = (1 << 16);
static const uint32_t FLAGS_ENABLED_CIRCUIT_BREAKER = (1 << 17);
static const uint32_t FLAGS_ALWAYS_PRINT_PRIMITIVE_FIELDS = (1 << 18);
Expand Down Expand Up @@ -762,6 +776,7 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// CONNECTION_TYPE_SINGLE. Otherwise, it may be a temporary
// socket fetched from socket pool
SocketUniquePtr sending_sock;
BindSockAction bind_sock_action;
StreamUserData* stream_user_data;
};

Expand Down Expand Up @@ -789,6 +804,26 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
{ return t ? add_flag(f) : clear_flag(f); }
inline bool has_flag(uint32_t f) const { return _flags & f; }

// BindSockAction stored in the FLAGS_BIND_SOCK_* bits of _flags instead
// of a dedicated member.
void set_bind_sock_action(BindSockAction action) {
clear_flag(FLAGS_BIND_SOCK_RESERVE | FLAGS_BIND_SOCK_USE);
if (action == BIND_SOCK_RESERVE) {
add_flag(FLAGS_BIND_SOCK_RESERVE);
} else if (action == BIND_SOCK_USE) {
add_flag(FLAGS_BIND_SOCK_USE);
}
}
BindSockAction bind_sock_action() const {
if (has_flag(FLAGS_BIND_SOCK_RESERVE)) {
return BIND_SOCK_RESERVE;
}
if (has_flag(FLAGS_BIND_SOCK_USE)) {
return BIND_SOCK_USE;
}
return BIND_SOCK_NONE;
}

void set_used_by_rpc() { add_flag(FLAGS_USED_BY_RPC); }
bool is_used_by_rpc() const { return has_flag(FLAGS_USED_BY_RPC); }

Expand Down Expand Up @@ -915,6 +950,16 @@ friend void policy::ProcessThriftRequest(InputMessageBase*);
// Defined at both sides
StreamSettings *_remote_stream_settings;

// Whether/how to reserve the sending socket after the RPC (mysql
// transactions) is stored in the FLAGS_BIND_SOCK_* bits of _flags; see
// set_bind_sock_action()/bind_sock_action(). The socket reserved by a
// previous RPC and reused when the action is BIND_SOCK_USE:
SocketUniquePtr _bind_sock;
// Opaque per-RPC slot a protocol codec may use to carry typed state from
// serialize_request to pack_request/parse (e.g. the mysql prepared-statement
// stub). Not owned by Controller.
void* _session_data;

// Thrift method name, only used when thrift protocol enabled
std::string _thrift_method_name;

Expand Down
9 changes: 8 additions & 1 deletion src/brpc/socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ Socket::Socket(Forbidden f)
, _fd(-1)
, _tos(0)
, _reset_fd_real_us(-1)
, _fd_version(0)
, _on_edge_triggered_events(NULL)
, _need_on_edge_trigger(false)
, _user(NULL)
Expand Down Expand Up @@ -578,6 +579,9 @@ int Socket::ResetFileDescriptor(int fd) {
_avg_msg_size = 0;
// MUST store `_fd' before adding itself into epoll device to avoid
// race conditions with the callback function inside epoll
static butil::atomic<uint64_t> BAIDU_CACHELINE_ALIGNMENT fd_version(0);
_fd_version.store(fd_version.fetch_add(1, butil::memory_order_relaxed),
butil::memory_order_relaxed);
_fd.store(fd, butil::memory_order_release);
_reset_fd_real_us = butil::cpuwide_time_us();
if (!ValidFileDescriptor(fd)) {
Expand Down Expand Up @@ -1618,7 +1622,10 @@ int Socket::Write(butil::IOBuf* data, const WriteOptions* options_in) {
if (options_in) {
opt = *options_in;
}
if (data->empty()) {
// An auth write (opt.auth_flags != 0) may carry an empty data buffer: some
// protocols (e.g. mysql) read the server greeting first and send their real
// bytes from the connection-phase handler, not from `data` here.
if (data->empty() && !opt.auth_flags) {
return SetError(opt.id_wait, EINVAL);
}
if (opt.pipelined_count > MAX_PIPELINED_COUNT) {
Expand Down
6 changes: 6 additions & 0 deletions src/brpc/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,9 @@ friend class TransportFactory;
// The file descriptor
int fd() const { return _fd.load(butil::memory_order_relaxed); }

// The file descriptor version, used to avoid ABA problem.
uint64_t fd_version() const { return _fd_version.load(butil::memory_order_relaxed); }

// ip/port of the local end of the connection
butil::EndPoint local_side() const { return _local_side; }

Expand Down Expand Up @@ -840,6 +843,9 @@ friend class TransportFactory;
butil::atomic<int> _fd; // -1 when not connected.
int _tos; // Type of service which is actually only 8bits.
int64_t _reset_fd_real_us; // When _fd was reset, in microseconds.
// ABA/version counter; written on fd reset and read via fd_version() from
// other threads, so use relaxed atomics to avoid a data race.
butil::atomic<uint64_t> _fd_version; // _fd_version, used only for mysql now.

// Address of peer. Initialized by SocketOptions.remote_side.
butil::EndPoint _remote_side;
Expand Down
Loading