Curve 是云原生计算基金会 (CNCF) Sandbox 项目,是网易主导自研和开源的高性能、易运维、云原生的分布式存储系统,由块存储 CurveBS 和文件系统 CurveFS 两部分组成。
简介
Curve 为 BRPC 自主实现了 RDMA 通讯,Curve 的 RDMA 实现是基于开源项目 UCX,关于 UCX 的详细情况可参考:
对 BRPC 的修改主要在 BRPC 的 Socket 的层面展开,引入了 UCX 中的 UCP 连接,UCP 通讯支持 Active message、Tag和Stream,我们的实现使用 Active Message,并允许 busy poll 和乱序消息投递,以提高性能。这些实现对于上层 API 都是透明的。
概念
BRPC 中的通讯主要集中在以下几个基本概念:
1、EndPoint
UCP 通讯的地址形式与 TCP 相同,都是 IP 地址和PORT。但是它们在底层走不同的协议。 EndPoint 是一个代表通讯地址的数据结构, 是一个 C++ 类,由于现在要支持不同的通讯端口类型,我们增加了枚举字段 kind, 其值由 TCP、UCP 和 UNIX 组成,并增加了一些构造函数以接受 kind 值。 同时,一些辅助函数例如 set_ucp(), set_tcp() 可以直接改变类型。在引入 unix socket 类型之前本来 这个修改是二进制兼容原始的 BRPC 的,原来port是32位,我们将port变成short,并把kind也设计成short,这样原来一个int的大小依然得以保留,不过现在引入了 unix socket path, 这个二进制格式不再兼容原始 BRPC。
struct EndPoint {enum { TCP, UCP, UNIX };EndPoint() : ip(IP_ANY), port(0), kind(TCP), socket_file("") {}EndPoint(ip_t ip2, int port2) : ip(ip2), port(port2), kind(TCP), socket_file("") {}EndPoint(ip_t ip2, int port2, int k) : ip(ip2), port(port2), kind(k), socket_file("") {}explicit EndPoint(const sockaddr_in& in): ip(in.sin_addr), port(ntohs(in.sin_port)), kind(TCP), socket_file("") {}explicit EndPoint(const sockaddr_in& in, int k): ip(in.sin_addr), port(ntohs(in.sin_port)), kind(k), socket_file("") {}explicit EndPoint(const char* file) : ip(IP_ANY), port(0), kind(UNIX),socket_file(file) {}explicit EndPoint(const std::string& file) : ip(IP_ANY), port(0), kind(UNIX),socket_file(file) {}bool is_tcp() const { return TCP == kind; }void set_tcp() { kind = TCP; }bool is_ucp() const { return UCP == kind; }void set_ucp() { kind = UCP; }bool is_unix() const { return UNIX == kind; }void set_unix() { kind = UNIX; }ip_t ip;#if ARCH_CPU_LITTLE_ENDIANunsigned short port;unsigned short kind;#elseunsigned short kind;unsigned short port;#endifstd::string socket_file;};
2、ChannelOptions
struct ChannelOptions {// Constructed with default options.ChannelOptions();// Issue error when a connection is not established after so many// milliseconds. -1 means wait indefinitely.// Default: 200 (milliseconds)// Maximum: 0x7fffffff (roughly 30 days)int32_t connect_timeout_ms;// Max duration of RPC over this Channel. -1 means wait indefinitely.// Overridable by Controller.set_timeout_ms().// Default: 500 (milliseconds)// Maximum: 0x7fffffff (roughly 30 days)int32_t timeout_ms;// Send another request if RPC does not finish after so many milliseconds.// Overridable by Controller.set_backup_request_ms().// The request will be sent to a different server by best effort.// If timeout_ms is set and backup_request_ms >= timeout_ms, backup request// will never be sent.// backup request does NOT imply server-side cancelation.// Default: -1 (disabled)// Maximum: 0x7fffffff (roughly 30 days)int32_t backup_request_ms;// Retry limit for RPC over this Channel. <=0 means no retry.// Overridable by Controller.set_max_retry().// Default: 3// Maximum: INT_MAXint max_retry;......// Use ucp transportbool use_ucp;};
以上所见,我们只增加了 use_ucp 一项。
class Channel : public ChannelBase {friend class Controller;friend class SelectiveChannel;public:Channel(ProfilerLinker = ProfilerLinker());~Channel();// Connect this channel to a single server whose address is given by the// first parameter. Use default options if `options' is NULL.int Init(butil::EndPoint server_addr_and_port, const ChannelOptions* options);int Init(const char* server_addr_and_port, const ChannelOptions* options);int Init(const char* server_addr, int port, const ChannelOptions* options);int InitWithSockFile(const char* socket_file, const ChannelOptions* options);......}
我们在上面第一个 Init 函数里进行了检查,server_addr_and_port里的kind字段如果是 UCP,必须与 Options 中的 use_tcp 保持一致。
BRCP 的服务器使用了 Server 类,这个类我们没有改动,而是对 brpc::ServerOptions options 增加了字段以支持 UCP 的 RDMA 连接。
struct ServerOptions {ServerOptions(); // Constructed with default options......// Enable ucp listenerbool enable_ucp;// Ucp listener ip addressstd::string ucp_address;// Ucp listener portuint16_t ucp_port;}
enable_ucp 这个字段如果为 true,则 ucp_address 和 ucp_port 将被使用来打开 UCP 侦听端口。
Server类具有Start函数,这个函数始终接受一个TCP地址,这些参数是为了打开TCP服务,在我们的修改里,tcp连接永远被使能,这和老的client兼容,不强制唯一地使用UCP RDMA网络通讯。UCP RDMA的使能总是在ServerOptions中被指定。
class Server {public:int Start(const char* ip_port_str, const ServerOptions* opt);int Start(const butil::EndPoint& ip_port, const ServerOptions* opt);// Start on IP_ANY:port.int Start(int port, const ServerOptions* opt);// Start on `ip_str' + any useable port in `range'int Start(const char* ip_str, PortRange range, const ServerOptions *opt);int Start(const char* ip_port_str, const ServerOptions* opt);}
cd ~/git clone git@github.com:openucx/ucx.gitcd ucx./autogen.sh./configure --prefix=/usr/local/ucxmakesudo make install
2、BRPC 编译
cd ~/git clone git@github.com:opencurve/incubator-brpc.gitcd incubator-brpcgit checkout ucx_ammkdir bucd bucmake ..makecd ../example/multi_threaded_echo_c++mkdir bucd bucmake ..makecp ../*.pem .
修改我们对 BRPC 的multi_thread_echo_c++这个例子进行修改;使得它支持UCP RDMA网络。参考[1] 对client修改主要是用命令行参数设置Options中的use_tcp:
options.use_ucp = FLAGS_use_ucp;if (channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options) != 0) {LOG(ERROR) << "Fail to initialize channel";return -1;}
--use_ucp=true这个命令行参数启用UCP。对server的修改主要是:
options.enable_ucp = FLAGS_enable_ucp;options.ucp_address = FLAGS_ucp_address;options.ucp_port = FLAGS_ucp_port;if (server.Start(FLAGS_port, &options) != 0) {LOG(ERROR) << "Fail to start EchoServer";return -1;}
以下命令行参数启用UCP:--enable_ucp=true --ucp_address=127.0.0.1 --ucp_port=13339
[global]UCX_TCP_CM_REUSEADDR=yUCX_ASYNC_MAX_EVENTS=1000000UCX_RDMA_CM_REUSEADDR=yUCX_RNDV_THRESH=128KUCX_TCP_TX_SEG_SIZE=32KUCX_TCP_MAX_IOV=128UCX_TLS=^tcpUCX_USE_MT_MUTEX=y
服务器端
./multi_threaded_echo_server --brpc_ucp_worker_busy_poll=1I0811 11:29:09.381323 551185 home/incubator-brpc/src/brpc/ucp_ctx.cpp:35] Running with ucp library version: 1.14.0I0811 11:29:09.410902 551185 home/incubator-brpc/src/brpc/ucp_acceptor.cpp:321] Ucp server is listening on IP 0.0.0.0 port 13339I0811 11:29:09.410927 551185 home/incubator-brpc/src/brpc/server.cpp:1133] Server[example::EchoServiceImpl] is serving on port=8002.I0811 11:29:09.411169 551185 home/incubator-brpc/src/brpc/server.cpp:1136] Check out xxx in web browser.I0811 11:30:17.837671 551233 home/incubator-brpc/src/brpc/ucp_acceptor.cpp:399] UCP server received a connection request from client at address 10.187.0.6:53216E0811 11:30:22.988961 551248 /home/incubator-brpc/src/brpc/ucp_worker.cpp:732] Error occurred on remote side 10.187.0.6:53216 (Connection reset by remote peer)I0811 11:30:39.837903 551244 /home/incubator-brpc/src/brpc/ucp_acceptor.cpp:399] UCP server received a connection request from client at address 10.187.0.6:55760E0811 11:30:53.501705 551248 /home/incubator-brpc/src/brpc/ucp_worker.cpp:732] Error occurred on remote side 10.187.0.6:55760 (Connection reset by remote peer)
客户端
UCX_TLS=^tcp ./multi_threaded_echo_client --server=10.187.0.91:13339 --use_ucp=true --thread_num=1 --brpc_ucp_worker_busy_poll=1 --attachment_size=4096I0811 11:30:39.776002 963962 /home/incubator-brpc/src/brpc/ucp_ctx.cpp:35] Running with ucp library version: 1.14.0I0811 11:30:40.775541 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=19186 latency=50I0811 11:30:41.775643 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20026 latency=47I0811 11:30:42.775714 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=18512 latency=50I0811 11:30:43.775791 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=18512 latency=50I0811 11:30:44.775864 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=18406 latency=51I0811 11:30:45.775959 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20643 latency=46I0811 11:30:46.776042 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20873 latency=46I0811 11:30:47.776130 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20907 latency=46I0811 11:30:48.776197 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20901 latency=46I0811 11:30:49.776281 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20894 latency=46I0811 11:30:50.776350 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20967 latency=46I0811 11:30:51.776426 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20923 latency=46I0811 11:30:52.776510 963950 /home/incubator-brpc/example/multi_threaded_echo_c++/client.cpp:144] Sending EchoRequest at qps=20923 latency=46
Curve 也会将尽快把 RDMA 能力落地到后续版本中,Curve 在高性能硬件场景下将会进一步有更优异的性能表现,IO 时延会极大改善,敬请期待。
参考[1]:
https://github.com/opencurve/incubator-brpc/tree/ucx_am/example/multi_threaded_echo_c++

Curve Testing Camp Time|万元大奖等你来拿!

关于 Curve
Curve 亦可作为云存储中间件使用 S3 兼容的对象存储作为数据存储引擎,为公有云用户提供高性价比的共享文件存储。
GitHub:https://github.com/opencurve/curve 官网:https://opencurve.io/ 用户论坛:https://ask.opencurve.io/ 微信群:搜索群助手微信号 OpenCurve_bot




