brpc之iobuf
2018年12月29日 星期六, 发表于 深圳
如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)
brpc iobuf类学习,官方文档:https://github.com/brpc/brpc/blob/master/docs/cn/iobuf.md
如文章有任何冒犯之处,例如侵权或者未标明引用,请邮件联系删除。
本人水平有限,如有错误之处,请不吝赐教。
iobuf是一种非连续零拷贝缓冲数据结构,具体用法可以参考上面的官方链接,本文主要介绍一下其中的组成类和一些实现方法
1. iobuf以及类构成
从数据结构上来看,iobuf包含了一个BigView和SmallView的联合体;匿名联合体
class IOBuf {
struct SmallView { BlockRef refs[2]; };
struct BigView {
int32_t magic;
uint32_t start;
BlockRef* refs;
uint32_t nref;
uint32_t cap_mask;
size_t nbytes;
const BlockRef& ref_at(uint32_t i) const
{ return refs[(start + i) & cap_mask]; }
BlockRef& ref_at(uint32_t i)
{ return refs[(start + i) & cap_mask]; }
uint32_t capacity() const { return cap_mask + 1; }
};
private:
union { //联合体
BigView _bv;
SmallView _sv;
};
};
默认构造的时候,IOBuf的 _small()范围true,当SmallView中的两个BlockRef都存储了不同的block之后,会自动转换为 BigView;
c++ 的memory order可以参考这里 http://senlinzhan.github.io/2017/12/04/cpp-memory-order/
下面看一下各个函数的实现:
1.2 IOBuf构造
IOBuf构造函数的实现在 iobuf_inl.h 文件中,比较简单,大家自己看一下,分析一下IOBuf的移动构造:
struct Movable {
explicit Movable(IOBuf& v) : _v(&v) { }
IOBuf& value() const { return *_v; }
private:
IOBuf *_v;
};
//移动构造
inline IOBuf::IOBuf(const Movable& rhs) {
_sv = rhs.value()._sv;
new (&rhs.value()) IOBuf; //placement new
}
实现的比较精巧,Movable结构体本身只保存原来IOBuf的指针,使用placement new的方式,将原来的IOBuf的内存拷贝过来;并且获取原来IOBuf的内存指针,这样就可以不用分配新的内存,老的IOBuf就可以不用析构;
1.3 IOBuf赋值
注意到代码里面有这样一段话:
Following push_back()/append() are just implemented for convenienceand occasional usages, they’re relatively slow because of the overhead
of frequent BlockRef-management and reference-countings. If you get
a lot of push_back/append to do, you should use IOBufAppender or
IOBufBuilder instead, which reduce overhead by owning IOBuf::Block.
push back和append操作只是为了方便,不应该经常使用。
本文为了学习,还是继续看一下 iobuf.cpp 中赋值函数的实现:
int IOBuf::push_back(char c) {
IOBuf::Block* b = iobuf::share_tls_block(); //从tls中获取一个满的block
if (BAIDU_UNLIKELY(!b)) {
return -1;
}
b->data[b->size] = c; //将数据放入获取到的block中
const IOBuf::BlockRef r = { b->size, 1, b }; //生成到block的引用
++b->size;
_push_back_ref(r);
return 0;
}
push_back char的实现:
- 先从tls 中获取一个没有满的block;
- 将数据放入该Block中;
- 将相应的Block增加引用计数(默认是IOBuf是small view,只有在两个refs种都存储了不同的的Block之后,才自动转为BigView,可以参考iobuf.cpp:601行);
int IOBuf::append(void const* data, size_t count) {
if (BAIDU_UNLIKELY(!data)) {
return -1;
}
if (count == 1) {
return push_back(*((char const*)data));
}
size_t total_nc = 0;
while (total_nc < count) { // excluded count == 0
IOBuf::Block* b = iobuf::share_tls_block();
if (BAIDU_UNLIKELY(!b)) {
return -1;
}
const size_t nc = std::min(count - total_nc, b->left_space());
iobuf::cp(b->data + b->size, (char*)data + total_nc, nc);
const IOBuf::BlockRef r = { (uint32_t)b->size, (uint32_t)nc, b };
_push_back_ref(r);
b->size += nc;
total_nc += nc;
}
return 0;
}
append是分批存储,每次存储一个block可以存储的最大空间,存储完成之后,存储下一个,大致愿意和push_bach char相差不多;
1.3 IOBuf 弹出
IObuf的弹出分为 pop_front 和 pop_back,以pop_front来分析:
size_t IOBuf::pop_front(size_t n) {
const size_t len = length();
if (n >= len) {
clear();
return len;
}
const size_t saved_n = n;
while (n) { // length() == 0 does not enter
IOBuf::BlockRef &r = _front_ref(); //获取当前的block_ref
if (r.length > n) { //判断是否存储多余需要pop的内容
r.offset += n;
r.length -= n;
if (!_small()) {
_bv.nbytes -= n;
}
return saved_n;
}
n -= r.length;
_pop_front_ref();
}
return saved_n;
}
- 判断整个IOBuf的存储内容是否比需要pop的多,如果小于需要pop的内容,则直接清空;
- 否则,先获取front_ref,获取front的blockRef,_bv.start在smallView转为BigView之后,默认为0;
- 判断当前的BlockRef中存储的内容是否比需要的多,第一个BlockRef内容大于需要获取的内容,直接处理第一个Block;
- 循环处理一下个BlockRef,直到pop够足够的字符;
1.4 IOBuf 剪切(cut)
size_t cutn(IOBuf* out, size_t n);
size_t cutn(void* out, size_t n);
size_t cutn(std::string* out, size_t n);
cutn操作,有点类似于pop操作,不同的地方在于,大家看一下代码就好了;
int cut_until(IOBuf* out, char const* delim);
// std::string version, `delim' could be binary
int cut_until(IOBuf* out, const std::string& delim);
cut_until,从前到后开始cut,直到遇到字符匹配到delim,如果没有匹配到返回-1; 需要注意的地方时,这个地方delim字符串的长度不能超过unsiged long的长度,因为brpc中将字符串hash到一个unsigned long型的整数,然后遍历匹配的时候,也将IOBuf中的字符串hash到一个unsigned long类型的整数,与delim进行比较;这个地方比较精巧的是,hash的时候使用位移操作,向左移,后面的字符串会把前面的字符串冲掉,因此复杂度从 O(N*M) 降到O(N),其中N为block中字符串的长度,M为delim的长度;
因为限制了字符串的长度,所以才可以这样进行hash,如果没有限制长度,可以考虑使用AC自动机的方式来匹配;
IOBuf的cut还有一些高级用法:
ssize_t cut_into_writer(IWriter* writer, size_t size_hint = 1024*1024);
ssize_t cut_into_file_descriptor(int fd, size_t size_hint = 1024*1024);
ssize_t cut_into_SSL_channel(struct ssl_st* ssl, int* ssl_error);
ssize_t cut_into_file_descriptor(int fd, size_t size_hint = 1024*1024);
写入IWriter,文件描述符fd,写入ssl管道等,大家可以看一下代码,原理和cut差不太多,获取到一个buf之后,调用对应的write函数进行写入;
2. IOPortal 分析
IOPortal是IOBuf的一个子集,用来从文件描述符中获取数据,通常是socket的一个读Buff;
IOPortal中提供了多个接口,
// Read at most `max_count' bytes from the reader and append to self.
ssize_t append_from_reader(IReader* reader, size_t max_count);
// Read at most `max_count' bytes from file descriptor `fd' and
// append to self.
ssize_t append_from_file_descriptor(int fd, size_t max_count);
// Read at most `max_count' bytes from file descriptor `fd' at a given
// offset and append to self. The file offset is not changed.
// If `offset' is negative, does exactly what append_from_file_descriptor does.
ssize_t pappend_from_file_descriptor(int fd, off_t offset, size_t max_count);
// Read as many bytes as possible from SSL channel `ssl', and stop until `max_count'.
// Returns total bytes read and the ssl error code will be filled into `ssl_error'
ssize_t append_from_SSL_channel(struct ssl_st* ssl, int* ssl_error,
size_t max_count = 1024*1024);
从reader,fd,ssl中读取数据,本文选取其中一个借口来分析学习;
2.1 append_from_file_descriptor 读fd
看一下实现代码,主要做了三件事:
- 循环block,准备好readv函数的参数地址(将block的可写首地址和可写长度依次获取,放入到iovec的数组中)
- 调用pread函数将读取到的内容写入到上面准备的地址,获取读取的总长度nr;
- 整理现有的block,因为现有的Block可能很多写满了。
3. 高阶用法(与pb,snappy等组合)
高阶用法,等用到再回来看具体实现;
IOBufAsZeroCopyInputStream
IOBufAsZeroCopyOutputStream
IOBufAsSnappySource
IOBufAsSnappySink
IOBufBuilder
IOBufAppender