illx10000

青春不是年华,而是心境

brpc之iobuf

2018年12月29日 星期六, 发表于 深圳

如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)


brpc iobuf类学习,官方文档:https://github.com/brpc/brpc/blob/master/docs/cn/iobuf.md

如文章有任何冒犯之处,例如侵权或者未标明引用,请邮件联系删除。
本人水平有限,如有错误之处,请不吝赐教。

iobuf是一种非连续零拷贝缓冲数据结构,具体用法可以参考上面的官方链接,本文主要介绍一下其中的组成类和一些实现方法

1. iobuf以及类构成

从数据结构上来看,iobuf包含了一个BigView和SmallView的联合体;匿名联合体

class IOBuf {
    struct SmallView { BlockRef refs[2]; };

    struct BigView {
        int32_t magic;
        uint32_t start;
        BlockRef* refs;
        uint32_t nref;
        uint32_t cap_mask;
        size_t nbytes;

        const BlockRef& ref_at(uint32_t i) const
        { return refs[(start + i) & cap_mask]; }
        
        BlockRef& ref_at(uint32_t i)
        { return refs[(start + i) & cap_mask]; }

        uint32_t capacity() const { return cap_mask + 1; }
    };

private:    
    union { //联合体
        BigView _bv;
        SmallView _sv;
    };
};

默认构造的时候,IOBuf的 _small()范围true,当SmallView中的两个BlockRef都存储了不同的block之后,会自动转换为 BigView;

c++ 的memory order可以参考这里 http://senlinzhan.github.io/2017/12/04/cpp-memory-order/

下面看一下各个函数的实现:

1.2 IOBuf构造

IOBuf构造函数的实现在 iobuf_inl.h 文件中,比较简单,大家自己看一下,分析一下IOBuf的移动构造:

struct Movable {
    explicit Movable(IOBuf& v) : _v(&v) { }
    IOBuf& value() const { return *_v; }
private:
    IOBuf *_v;
};

//移动构造
inline IOBuf::IOBuf(const Movable& rhs) {
    _sv = rhs.value()._sv;
    new (&rhs.value()) IOBuf; //placement new
}

实现的比较精巧,Movable结构体本身只保存原来IOBuf的指针,使用placement new的方式,将原来的IOBuf的内存拷贝过来;并且获取原来IOBuf的内存指针,这样就可以不用分配新的内存,老的IOBuf就可以不用析构;

1.3 IOBuf赋值

注意到代码里面有这样一段话:
Following push_back()/append() are just implemented for convenienceand occasional usages, they’re relatively slow because of the overhead of frequent BlockRef-management and reference-countings. If you get a lot of push_back/append to do, you should use IOBufAppender or IOBufBuilder instead, which reduce overhead by owning IOBuf::Block.

push back和append操作只是为了方便,不应该经常使用。

本文为了学习,还是继续看一下 iobuf.cpp 中赋值函数的实现:

int IOBuf::push_back(char c) {
    IOBuf::Block* b = iobuf::share_tls_block(); //从tls中获取一个满的block
    if (BAIDU_UNLIKELY(!b)) {
        return -1;
    }
    b->data[b->size] = c; //将数据放入获取到的block中
    const IOBuf::BlockRef r = { b->size, 1, b }; //生成到block的引用
    ++b->size;
    _push_back_ref(r); 
    return 0;
}

push_back char的实现:

  1. 先从tls 中获取一个没有满的block;
  2. 将数据放入该Block中;
  3. 将相应的Block增加引用计数(默认是IOBuf是small view,只有在两个refs种都存储了不同的的Block之后,才自动转为BigView,可以参考iobuf.cpp:601行);
int IOBuf::append(void const* data, size_t count) {
    if (BAIDU_UNLIKELY(!data)) {
        return -1;
    }
    if (count == 1) {
        return push_back(*((char const*)data));
    }
    size_t total_nc = 0;
    while (total_nc < count) {  // excluded count == 0
        IOBuf::Block* b = iobuf::share_tls_block();
        if (BAIDU_UNLIKELY(!b)) {
            return -1;
        }
        const size_t nc = std::min(count - total_nc, b->left_space());
        iobuf::cp(b->data + b->size, (char*)data + total_nc, nc);
        
        const IOBuf::BlockRef r = { (uint32_t)b->size, (uint32_t)nc, b };
        _push_back_ref(r);
        b->size += nc;
        total_nc += nc;
    }
    return 0;
}

append是分批存储,每次存储一个block可以存储的最大空间,存储完成之后,存储下一个,大致愿意和push_bach char相差不多;

1.3 IOBuf 弹出

IObuf的弹出分为 pop_frontpop_back,以pop_front来分析:

size_t IOBuf::pop_front(size_t n) {
    const size_t len = length();
    if (n >= len) {
        clear();
        return len;
    }
    const size_t saved_n = n;
    while (n) {  // length() == 0 does not enter
        IOBuf::BlockRef &r = _front_ref(); //获取当前的block_ref
        if (r.length > n) { //判断是否存储多余需要pop的内容
            r.offset += n;
            r.length -= n;
            if (!_small()) {
                _bv.nbytes -= n;
            }
            return saved_n;
        }
        n -= r.length;
        _pop_front_ref();
    }
    return saved_n;
}
  1. 判断整个IOBuf的存储内容是否比需要pop的多,如果小于需要pop的内容,则直接清空;
  2. 否则,先获取front_ref,获取front的blockRef,_bv.start在smallView转为BigView之后,默认为0;
  3. 判断当前的BlockRef中存储的内容是否比需要的多,第一个BlockRef内容大于需要获取的内容,直接处理第一个Block;
  4. 循环处理一下个BlockRef,直到pop够足够的字符;

1.4 IOBuf 剪切(cut)

size_t cutn(IOBuf* out, size_t n);
size_t cutn(void* out, size_t n);
size_t cutn(std::string* out, size_t n);

cutn操作,有点类似于pop操作,不同的地方在于,大家看一下代码就好了;

int cut_until(IOBuf* out, char const* delim);
// std::string version, `delim' could be binary
int cut_until(IOBuf* out, const std::string& delim);

cut_until,从前到后开始cut,直到遇到字符匹配到delim,如果没有匹配到返回-1; 需要注意的地方时,这个地方delim字符串的长度不能超过unsiged long的长度,因为brpc中将字符串hash到一个unsigned long型的整数,然后遍历匹配的时候,也将IOBuf中的字符串hash到一个unsigned long类型的整数,与delim进行比较;这个地方比较精巧的是,hash的时候使用位移操作,向左移,后面的字符串会把前面的字符串冲掉,因此复杂度从 O(N*M) 降到O(N),其中N为block中字符串的长度,M为delim的长度;

因为限制了字符串的长度,所以才可以这样进行hash,如果没有限制长度,可以考虑使用AC自动机的方式来匹配;

IOBuf的cut还有一些高级用法:

ssize_t cut_into_writer(IWriter* writer, size_t size_hint = 1024*1024);
ssize_t cut_into_file_descriptor(int fd, size_t size_hint = 1024*1024);
ssize_t cut_into_SSL_channel(struct ssl_st* ssl, int* ssl_error);
ssize_t cut_into_file_descriptor(int fd, size_t size_hint = 1024*1024);

写入IWriter,文件描述符fd,写入ssl管道等,大家可以看一下代码,原理和cut差不太多,获取到一个buf之后,调用对应的write函数进行写入;

2. IOPortal 分析

IOPortal是IOBuf的一个子集,用来从文件描述符中获取数据,通常是socket的一个读Buff;

IOPortal中提供了多个接口,

// Read at most `max_count' bytes from the reader and append to self.
ssize_t append_from_reader(IReader* reader, size_t max_count);

// Read at most `max_count' bytes from file descriptor `fd' and
// append to self.
ssize_t append_from_file_descriptor(int fd, size_t max_count);

// Read at most `max_count' bytes from file descriptor `fd' at a given
// offset and append to self. The file offset is not changed.
// If `offset' is negative, does exactly what append_from_file_descriptor does.
ssize_t pappend_from_file_descriptor(int fd, off_t offset, size_t max_count);

// Read as many bytes as possible from SSL channel `ssl', and stop until `max_count'.
// Returns total bytes read and the ssl error code will be filled into `ssl_error'
ssize_t append_from_SSL_channel(struct ssl_st* ssl, int* ssl_error,
                                size_t max_count = 1024*1024);

从reader,fd,ssl中读取数据,本文选取其中一个借口来分析学习;

2.1 append_from_file_descriptor 读fd

看一下实现代码,主要做了三件事:

  1. 循环block,准备好readv函数的参数地址(将block的可写首地址和可写长度依次获取,放入到iovec的数组中)
  2. 调用pread函数将读取到的内容写入到上面准备的地址,获取读取的总长度nr;
  3. 整理现有的block,因为现有的Block可能很多写满了。

3. 高阶用法(与pb,snappy等组合)

高阶用法,等用到再回来看具体实现;

IOBufAsZeroCopyInputStream
IOBufAsZeroCopyOutputStream
IOBufAsSnappySource
IOBufAsSnappySink
IOBufBuilder
IOBufAppender