illx10000

青春不是年华,而是心境

protobuf service学习

2019年2月15日 星期五, 发表于 深圳

如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)

brpc中实现了google protobuf作为rpc通讯方式,学习一下了解其内部原理;

如文章有任何冒犯之处,例如侵权或者未标明引用,请联系删除。
本人水平有限,如有错误之处,请不吝赐教。

1. protobuf rpc实现

protobuf主要作为序列化和反序列化的工具,并没有提供完整的rpc实现,但是可以基于protobuf实现一些rpc,例如brpc;

protobuf(后续简称pb) 本身没有rpc实现,但是预留了rpc接口,只需要实现其网络交互的部分即可;

实现相关原理可以参考下面文章: https://izualzhy.cn/demo-protobuf-rpc

本文着重看一下brpc是如何实现的;

2. brpc rpc简析

brpc实现了pb rpc,其中有一个echo 的例子,代码在 example/echo_c++中,分别实现了其中客户端和服务端,以这两个例子来学习和追踪rpc的流程;

2.1 client跟踪

强烈建议先阅读一下brpc的官方文档:https://github.com/brpc/brpc/blob/master/docs/cn/client.md

client的核心代码如下:

//step1.
brpc::Channel channel;
    
// Initialize the channel, NULL means using default options.
brpc::ChannelOptions options;
options.protocol = FLAGS_protocol;
channel.Init(FLAGS_server.c_str(), FLAGS_load_balancer.c_str(), &options);

//step2.
example::EchoService_Stub stub(&channel);
example::EchoRequest request;
example::EchoResponse response;
brpc::Controller cntl;
request.set_message("hello world");

cntl.set_log_id(log_id ++);  // set by user
cntl.request_attachment().append(FLAGS_attachment);
//step3.
stub.Echo(&cntl, &request, &response, NULL);
if (!cntl.Failed()){   }

客户端通过Stub类发送请求;
其中channel作为参数传入,是用来和服务端建立网络连接的;
controller作为辅助类,用来记录一些网络错误;

其实官方文档将client的流程讲的非常清楚(文档最后):

client基本流程

主要步骤:(来自于官方文档)

  1. 创建一个bthread_id作为本次RPC的correlation_id。
  2. 根据Channel的创建方式,从进程级的SocketMap中或从LoadBalancer中选择一台下游server作为本次RPC发送的目的地。
  3. 根据连接方式(单连接、连接池、短连接),选择一个Socket
  4. 如果开启验证且当前Socket没有被验证过时,第一个请求进入验证分支,其余请求会阻塞直到第一个包含认证信息的请求写入Socket。server端只对第一个请求进行验证。
  5. 根据Channel的协议,选择对应的序列化函数把request序列化至IOBuf
  6. 如果配置了超时,设置定时器。从这个点开始要避免使用Controller对象,因为在设定定时器后随时可能触发超时->调用到用户的超时回调->用户在回调中析构Controller。
  7. 发送准备阶段结束,若上述任何步骤出错,会调用Channel::HandleSendFailed。
  8. 将之前序列化好的IOBuf写出到Socket上,同时传入回调Channel::HandleSocketFailed,当连接断开、写失败等错误发生时会调用此回调。
  9. 如果是同步发送,Join correlation_id;否则至此CallMethod结束。
  10. 网络上发消息+收消息。
  11. 收到response后,提取出其中的correlation_id,在O(1)时间内找到对应的Controller。这个过程中不需要查找全局哈希表,有良好的多核扩展性。
  12. 根据协议格式反序列化response。
  13. 调用Controller::OnRPCReturned,可能会根据错误码判断是否需要重试,或让RPC结束。如果是异步发送,调用用户回调。最后摧毁correlation_id唤醒Join着的线程。

2.1.1 Channel

https://izualzhy.cn/demo-protobuf-rpc这篇文章里面,我们了解到,实现基于pb的rpc,需要继承pb的Channel类,实现CallMethod方法,下面brpc中channel的实现;

2.1.1.1 Channel 初始化

可以通过Init函数来初始化Channel,可以初始化Channel,连接到一台服务器或者一个集群;

//options为NULL时取默认值
//初始化到特定机器 
int Init(EndPoint server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr_and_port, const ChannelOptions* options);
int Init(const char* server_addr, int port, const ChannelOptions* options);

//初始化到集群
int Init(const char* naming_service_url,const char* load_balancer_name,
         const ChannelOptions* options);

channel有两种大类的初始化,一种是初始化到特定机器,一种是初始化到集群。

初始化到集群可以传入下面的控制变量:具体可参考https://github.com/brpc/brpc/blob/master/docs/cn/load_balancing.md

  1. 名词服务:channel可以传入一个名词服务来查询下游机器,类似于DNS,一个名词可以代表服务集群所拥有的机器,例如www.qq.com可以通过DNS协议解析到具体的IP;

  2. 负载均衡:当后端的服务器多余一个的时候,也需要一定的算法分割流量;

初始化到集群还是单台,在获取IP有一些区别,可以关注一下有SingleServer()判断的分支,官方例子给的是初始化到单台的例子,本文后续用初始化到单台来分析整个流程,两者在整体的业务流程区别不太大;

使用单台服务器初始的Init函数,最终都会调用到 InitSingle 函数

//channel.cpp : InitSingle函数
SocketMapInsert(SocketMapKey(server_addr_and_port, sig),
                        &_server_id, ssl_ctx) != 0)

会在此地方调用SocketMap::Insert创建socket信息,并且将server信息和创建socket信息关联起来后面使用;

2.1.1.2 Channel rpc流程

Channel继承RpcChannel,实现CallMethod方法,通过Stub类调用,实际上就是调用到此方法; 具体代码不详细贴,看一下其中的一些关键代码;

// 获取本地调用的correlation_id
const CallId correlation_id = cntl->call_id(); 
const int rc = bthread_id_lock_and_reset_range(correlation_id, NULL, 2 + cntl->max_retry());

//选取对应的socket(单服务器,使用Init选中的服务器,否则使用lb以获得服务器)
if (SingleServer()) {
    cntl->_single_server_id = _server_id;
    cntl->_remote_side = _server_address;
}
cntl->_lb = _lb; 

//将请求序列化到_request_buf这个IOBuf的结构体中
_serialize_request(&cntl->_request_buf, cntl, request);

//通过socket发送真正的请求
cntl->IssueRPC(start_send_real_us);

//如果是done为空,同步调用(等待返回结果)
if (done == NULL) {
    // MUST wait for response when sending synchronous RPC. It will
    // be woken up by callback when RPC finishes (succeeds or still
    // fails after retry)
    Join(correlation_id);
    if (cntl->_span) {
        cntl->SubmitSpan();
    }
    cntl->OnRPCEnd(butil::gettimeofday_us());
}

2.1.2 Controller

依然是先看官方文档:https://github.com/illx10000/brpc/blob/master/docs/cn/client.md

Controller包含了request中没有的数据和选项。server端和client端的Controller结构体是一样的,但使用的字段可能是不同的,你需要仔细阅读Controller中的注释,明确哪些字段可以在server端使用,哪些可以在client端使用。

一个Controller对应一次RPC。一个Controller可以在Reset()后被另一个RPC复用,但一个Controller不能被多个RPC同时使用(不论是否在同一个线程发起)。

Controller的特点:

一个Controller只能有一个使用者,没有特殊说明的话,Controller中的方法默认线程不安全。 因为不能被共享,所以一般不会用共享指针管理Controller,如果你用共享指针了,很可能意味着出错了。 Controller创建于开始RPC前,析构于RPC结束后,常见几种模式: 同步RPC前Controller放栈上,出作用域后自行析构。注意异步RPC的Controller绝对不能放栈上,否则其析构时异步调用很可能还在进行中,从而引发未定义行为。 异步RPC前new Controller,done中删除。


可以理解为crontroller储存了一次rpc调用的上下文信息,将各种rpc过程中的信息存放到crontroller结构体中,例如调用起始时间戳,序列化协议等; ***

在RPC的调用过程中,CallMethod通过 Controller::IssueRPC 将rpc流程委托给crontroller,