protobuf service学习
2019年2月15日 星期五, 发表于 深圳
如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)
brpc中实现了google protobuf作为rpc通讯方式,学习一下了解其内部原理;
如文章有任何冒犯之处,例如侵权或者未标明引用,请联系删除。
本人水平有限,如有错误之处,请不吝赐教。
1. protobuf rpc实现
protobuf主要作为序列化和反序列化的工具,并没有提供完整的rpc实现,但是可以基于protobuf实现一些rpc,例如brpc;
protobuf(后续简称pb) 本身没有rpc实现,但是预留了rpc接口,只需要实现其网络交互的部分即可;
实现相关原理可以参考下面文章: https://izualzhy.cn/demo-protobuf-rpc
本文着重看一下brpc是如何实现的;
2. brpc rpc简析
brpc实现了pb rpc,其中有一个echo 的例子,代码在 example/echo_c++中,分别实现了其中客户端和服务端,以这两个例子来学习和追踪rpc的流程;
2.1 client跟踪
强烈建议先阅读一下brpc的官方文档:https://github.com/brpc/brpc/blob/master/docs/cn/client.md
client的核心代码如下:
//step1.
brpc::Channel channel;
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options);
//step2.
example::EchoService_Stub stub(&channel);
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;
request.set_message("hello world");
cntl.set_log_id(log_id ++); // set by user
cntl.request_attachment().append(FLAGS_attachment);
//step3.
stub.Echo(&cntl, &request, &response, NULL);
if (!cntl.Failed()){ }
客户端通过Stub类发送请求;
其中channel作为参数传入,是用来和服务端建立网络连接的;
controller作为辅助类,用来记录一些网络错误;
其实官方文档将client的流程讲的非常清楚(文档最后):
主要步骤:(来自于官方文档)
- 创建一个bthread_id作为本次RPC的correlation_id。
- 根据Channel的创建方式,从进程级的SocketMap中或从LoadBalancer中选择一台下游server作为本次RPC发送的目的地。
- 根据连接方式(单连接、连接池、短连接),选择一个Socket。
- 如果开启验证且当前Socket没有被验证过时,第一个请求进入验证分支,其余请求会阻塞直到第一个包含认证信息的请求写入Socket。server端只对第一个请求进行验证。
- 根据Channel的协议,选择对应的序列化函数把request序列化至IOBuf。
- 如果配置了超时,设置定时器。从这个点开始要避免使用Controller对象,因为在设定定时器后随时可能触发超时->调用到用户的超时回调->用户在回调中析构Controller。
- 发送准备阶段结束,若上述任何步骤出错,会调用Channel::HandleSendFailed。
- 将之前序列化好的IOBuf写出到Socket上,同时传入回调Channel::HandleSocketFailed,当连接断开、写失败等错误发生时会调用此回调。
- 如果是同步发送,Join correlation_id;否则至此CallMethod结束。
- 网络上发消息+收消息。
- 收到response后,提取出其中的correlation_id,在O(1)时间内找到对应的Controller。这个过程中不需要查找全局哈希表,有良好的多核扩展性。
- 根据协议格式反序列化response。
- 调用Controller::OnRPCReturned,可能会根据错误码判断是否需要重试,或让RPC结束。如果是异步发送,调用用户回调。最后摧毁correlation_id唤醒Join着的线程。
2.1.1 Channel
在https://izualzhy.cn/demo-protobuf-rpc这篇文章里面,我们了解到,实现基于pb的rpc,需要继承pb的Channel类,实现CallMethod方法,下面brpc中channel的实现;
2.1.1.1 Channel 初始化
可以通过Init函数来初始化Channel,可以初始化Channel,连接到一台服务器或者一个集群;
//options为NULL时取默认值
//初始化到特定机器
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);
//初始化到集群
int Init(const char* naming_service_url,const char* load_balancer_name,
const ChannelOptions* options);
channel有两种大类的初始化,一种是初始化到特定机器,一种是初始化到集群。
初始化到集群可以传入下面的控制变量:具体可参考https://github.com/brpc/brpc/blob/master/docs/cn/load_balancing.md
-
名词服务:channel可以传入一个名词服务来查询下游机器,类似于DNS,一个名词可以代表服务集群所拥有的机器,例如www.qq.com可以通过DNS协议解析到具体的IP;
-
负载均衡:当后端的服务器多余一个的时候,也需要一定的算法分割流量;
初始化到集群还是单台,在获取IP有一些区别,可以关注一下有SingleServer()判断的分支,官方例子给的是初始化到单台的例子,本文后续用初始化到单台来分析整个流程,两者在整体的业务流程区别不太大;
使用单台服务器初始的Init函数,最终都会调用到 InitSingle 函数
//channel.cpp : InitSingle函数
SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
&_server_id, ssl_ctx) != 0)
会在此地方调用SocketMap::Insert创建socket信息,并且将server信息和创建socket信息关联起来后面使用;
2.1.1.2 Channel rpc流程
Channel继承RpcChannel,实现CallMethod方法,通过Stub类调用,实际上就是调用到此方法; 具体代码不详细贴,看一下其中的一些关键代码;
// 获取本地调用的correlation_id
const CallId correlation_id = cntl->call_id();
const int rc = bthread_id_lock_and_reset_range(correlation_id, NULL, 2 + cntl->max_retry());
//选取对应的socket(单服务器,使用Init选中的服务器,否则使用lb以获得服务器)
if (SingleServer()) {
cntl->_single_server_id = _server_id;
cntl->_remote_side = _server_address;
}
cntl->_lb = _lb;
//将请求序列化到_request_buf这个IOBuf的结构体中
_serialize_request(&cntl->_request_buf, cntl, request);
//通过socket发送真正的请求
cntl->IssueRPC(start_send_real_us);
//如果是done为空,同步调用(等待返回结果)
if (done == NULL) {
// MUST wait for response when sending synchronous RPC. It will
// be woken up by callback when RPC finishes (succeeds or still
// fails after retry)
Join(correlation_id);
if (cntl->_span) {
cntl->SubmitSpan();
}
cntl->OnRPCEnd(butil::gettimeofday_us());
}
2.1.2 Controller
依然是先看官方文档:https://github.com/illx10000/brpc/blob/master/docs/cn/client.md
Controller包含了request中没有的数据和选项。server端和client端的Controller结构体是一样的,但使用的字段可能是不同的,你需要仔细阅读Controller中的注释,明确哪些字段可以在server端使用,哪些可以在client端使用。
一个Controller对应一次RPC。一个Controller可以在Reset()后被另一个RPC复用,但一个Controller不能被多个RPC同时使用(不论是否在同一个线程发起)。
Controller的特点:
一个Controller只能有一个使用者,没有特殊说明的话,Controller中的方法默认线程不安全。 因为不能被共享,所以一般不会用共享指针管理Controller,如果你用共享指针了,很可能意味着出错了。 Controller创建于开始RPC前,析构于RPC结束后,常见几种模式: 同步RPC前Controller放栈上,出作用域后自行析构。注意异步RPC的Controller绝对不能放栈上,否则其析构时异步调用很可能还在进行中,从而引发未定义行为。 异步RPC前new Controller,done中删除。
可以理解为crontroller储存了一次rpc调用的上下文信息,将各种rpc过程中的信息存放到crontroller结构体中,例如调用起始时间戳,序列化协议等; ***
在RPC的调用过程中,CallMethod通过 Controller::IssueRPC 将rpc流程委托给crontroller,