illx10000

青春不是年华,而是心境

haproxy tcp代理实现原理简析

2018年9月3日 星期一, 发表于 深圳

如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)

haproxy tcp代理实现原理简析

1. 代理初始化

下面是一个典型的四层代理配置;

功能是 使得haproxy监听本机3307端口,并将该端口上的所有tcp数据包转发到本机的10000端口

global
        maxconn         500

defaults
        contimeout      1000
        clitimeout      5000
        srvtimeout      5000

listen  mysql_1
        bind :3307
        mode tcp
        server  srv1 127.0.0.1:10000

haproxy配置文件的解析步骤为

standard.c: struct sockaddr_storage * str2sa_range()
   cfgparse.c x: int str2listener(...)   
      cfgparse.c: int cfg_parse_listen(...)      
         cfgparse.c: int readcfgfile(const char *file)         
            haproxy.c: void init(int argc, char **argv)           
               haproxy.c: int main(int argc, char **argv)

上面的转发配置解析成一个listener结构体:

struct listener {
	enum obj_type obj_type;     /* object type = OBJ_TYPE_LISTENER */
	enum li_state state;        /* state: NEW, INIT, ASSIGNED, LISTEN, READY, FULL */
	int fd;				/* the listen socket */
	int luid;			/* listener universally unique ID, used for SNMP */
	struct protocol *proto;		/* protocol this listener belongs to */
   
	int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */
	/* warning: this struct is huge, keep it at the bottom */
	struct sockaddr_storage addr;	/* the address we listen to */
	struct {
		struct eb32_node id;	/* place in the tree of used IDs */
	} conf;				/* config information */
    ...
};

其中比较重要的是字段是protocol结构体(协议的处理),accept回调函数(处理客户端连接),tcp代理主要是proto_tcpv4结构体

void tcpv4_add_listener(struct listener *listener)
{
	if (listener->state != LI_INIT)
		return;
	listener->state = LI_ASSIGNED;
	listener->proto = &proto_tcpv4; //tcp协议的转发处理在此处
	LIST_ADDQ(&proto_tcpv4.listeners, &listener->proto_list);
	proto_tcpv4.nb_listeners++;
}

上面设置了listener的协议和状态,在初始化的时候,同时设置了listener的accept回调和handler回调

cfgparse.c:check_config_validity()
			listener->accept = session_accept_fd;
			listener->handler = process_stream;

##2. HAProxy事件处理

HAProxy在四层代理的情况下,只是在客户端和服务端转发了双向流量; HAProxy默认是单进程,事件驱动的,主程序通过事件循环驱动,将各个协议抽象成protocol类来处理,其中对于tcp协议转发的代码主要在proto_tcp.c中的proto_tcpv4结构体,上面在配置解析的时候已经将配置此结构体绑定;

static struct protocol proto_tcpv4 = {
	.name = "tcpv4",
	.sock_domain = AF_INET,
	.sock_type = SOCK_STREAM,
	.sock_prot = IPPROTO_TCP,
	.sock_family = AF_INET,
	.sock_addrlen = sizeof(struct sockaddr_in),
	.l3_addrlen = 32/8,
	.accept = &listener_accept, //接受客户端的请求
	.connect = tcp_connect_server, //连接远程服务器
	.bind = tcp_bind_listener, //绑定本地端口
	.bind_all = tcp_bind_listeners,
	.unbind_all = unbind_all_listeners,
	.enable_all = enable_all_listeners,
	.get_src = tcp_get_src,
	.get_dst = tcp_get_dst,
	.drain = tcp_drain,
	.pause = tcp_pause_listener,
	.listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
	.nb_listeners = 0,
};

其中比较重要的是几个回调函数,bind/accept/connect

2.1 HAProxy 消息封装

HAProxy默认单进程,事件驱动,所有的网络,信号事件的处理都在事件循环中执行的,下面看一下事件循环如何 处理网络事件,并且和协议中的回调函数做适配; HAProxy的事件循环在haproxy.c: run_poll_loop函数中

void run_poll_loop()
{
	int next;
	tv_update_date(0,1);
	while (1) {
		process_runnable_tasks(); /* Process a few tasks */
		signal_process_queue(); /* check if we caught some signals and process them */
		next = wake_expired_tasks(); /* Check if we can expire some tasks */
		if (jobs == 0) /* stop when there's nothing left to do */
			break;
			
		if (fd_cache_num || run_queue || signal_queue_len || !LIST_ISEMPTY(&applet_active_queue))
			next = now_ms; /* expire immediately if events are pending */

		/* The poller will ensure it returns around <next> */
		cur_poller.poll(&cur_poller, next);
		fd_process_cached_events();
		applet_run_active();
	}
}

HAProxy将IO多路复用的监听函数(select/epoll/poll等函数)通过poller 结构体对外屏蔽(有点类似于libevent),根据当前系统支持自动选出符合要求的复用函数,在介绍事件循环之前,继续深入了解一下HAProxy的poller

2.1.1 HAProxy poller封装

struct poller {
	void   *private;                                     /* any private data for the poller */
	void REGPRM1   (*clo)(const int fd);                 /* mark <fd> as closed */
    void REGPRM2   (*poll)(struct poller *p, int exp);   /* the poller itself */
	int  REGPRM1   (*init)(struct poller *p);            /* poller initialization */
	void REGPRM1   (*term)(struct poller *p);            /* termination of this poller */
	int  REGPRM1   (*test)(struct poller *p);            /* pre-init check of the poller */
	int  REGPRM1   (*fork)(struct poller *p);            /* post-fork re-opening */
	const char   *name;                                  /* poller name */
	int    pref;                                         /* try pollers with higher preference first */
};

HAProxy在ev_select.c/ev_kqueue.c/ev_poll.c/ev_select.c中封装了四种poller,以epoll为例

__attribute__((constructor)) 
static void _do_register(void)
{
	struct poller *p;

	if (nbpollers >= MAX_POLLERS)
		return;

	epoll_fd = -1;
	p = &pollers[nbpollers++];

	p->name = "epoll";
	p->pref = 300; //优先级
	p->private = NULL;

	p->clo  = __fd_clo;
	p->test = _do_test;
	p->init = _do_init;
	p->term = _do_term;
	p->poll = _do_poll;
	p->fork = _do_fork;
}

其中__attribute__((constructor)) 说明注册函数在main函数启动之前注册到全局变量pollers中, 那些多路复用函数可以使用,是在makefile中定义的(下面是节选的一段makefile)

ifeq ($(TARGET),linux26)
  # This is for standard Linux 2.6 with netfilter and standard epoll()
  USE_GETSOCKNAME = implicit
  USE_NETFILTER   = implicit
  USE_POLL        = implicit
  USE_EPOLL       = implicit //可以使用epoll
  USE_TPROXY      = implicit
  USE_LIBCRYPT    = implicit
  USE_FUTEX       = implicit
  EXTRA          += haproxy-systemd-wrapper
  USE_DL          = implicit
else

有一个小细节,epoll的优先级是300(pref 字段),select优先级是150,说明在支持epoll的系统中,epoll优先被使用,如果系统不支持某种poller,则将优先级设置为0(可搜索disable_poller函数);

以上是对poller的抽象,在ev_epoll.c的_do_poll中实际上也是调用epll_wait监听各个socket上的事件(到这个地方有没有觉得很熟悉了哈); 除了对poll进行统一封装之外,HAProxy也对文件描述符fd进行了统一的封装,抽象成fdtab结构体;

2.1.2 HAProxy fd封装

struct fdtab {
	void (*iocb)(int fd);                /* I/O handler */
	void *owner;                         /* the connection or listener associated with this fd, NULL if closed */
	unsigned int  cache;                 /* position+1 in the FD cache. 0=not in cache. */
	unsigned char state;                 /* FD state for read and write directions (2*3 bits) */
	unsigned char ev;                    /* event seen in return of poll() : FD_POLL_* */
	unsigned char new:1;                 /* 1 if this fd has just been created */
	unsigned char updated:1;             /* 1 if this fd is already in the update list */
	unsigned char linger_risk:1;         /* 1 if we must kill lingering before closing */
	unsigned char cloned:1;              /* 1 if a cloned socket, requires EPOLL_CTL_DEL on close */
};

fdtab不仅是一个结构体,也是一个全局数组

fd.c: struct fdtab *fdtab = NULL;     /* array of all the file descriptors */
haproxy.cfdtab = calloc(1, sizeof(struct fdtab) * (global.maxsock));

OK,有了以上基本了解,我们来看看对socket消息的处理;

2.2 HAProxy 消息处理

本文主要看看HAProxy对消息的处理,包括新连接的处理,可读可写消息的处理;

2.2.1 HAProxy 接受新连接

能够接收新连接,毫无疑问,首先需要bind本地端口,看看bind的流程,

haproxy.c: int main(int argc,char** argv)
   proxy.c: int start_proxies(int verbose)
     proxy.c: lerr = listener->proto->bind(listener, msg, sizeof(msg));
        proto_tcp.c: int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen)

在最开始已经介绍了,haporxy将配置文件解析成struct proxy 的结构体,在start_proxies中监听所有配置中的端口, 在tcp协议中,bind函数实际上设置为proto_tcp中的tcp_bind_listenner函数; 在tcp_bind_listenner函数中,将被动连接的fd放入fd全局列表中,并且设置accept的回调

	//tcp_bind_listenner.c
	/* the socket is ready */
	listener->fd = fd;
	listener->state = LI_LISTEN;
	fdtab[fd].owner = listener; /* reference the listener instead of a task */
	fdtab[fd].iocb = listener->proto->accept; //设置回调
	fd_insert(fd);

在fd_insert中,fdtab[fd]被初始化,

static inline void fd_insert(int fd)
{
	fdtab[fd].ev = 0;
	fdtab[fd].new = 1;
	fdtab[fd].linger_risk = 0;
	fdtab[fd].cloned = 0;
	if (fd + 1 > maxfd)
		maxfd = fd + 1;
}

上面都设置了监听套接字和一些回调函数;

#0  updt_fd_polling (fd=8) at include/proto/fd.h:93
#1  0x00000000004194c9 in fd_update_cache (fd=8) at include/proto/fd.h:163
#2  0x000000000041973a in fd_want_recv (fd=8) at include/proto/fd.h:319
#3  0x00000000004199ee in enable_listener (listener=0x756aa0) at src/listener.c:63
#4  0x0000000000419e97 in enable_all_listeners (proto=0x73c820) at src/listener.c:211
#5  0x0000000000407c2f in protocol_enable_all () at src/protocol.c:94
#6  0x0000000000407585 in main (argc=3, argv=0x7fffffffe0e8) at src/haproxy.c:2093

真正将监听套接字加入epoll事件监听的操作是,

  1. 在protocol_enable_all 中,将fd保存到fd_updt结构体中
  2. 在事件循环_do_poll中,会将fd接入epoll中;
    for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) {
     fd = fd_updt[updt_idx];
     eo = fdtab[fd].state;
     en = fd_compute_new_polled_status(eo);
     ...
     ev.data.fd = fd;
     epoll_ctl(epoll_fd, opcode, fd, &ev); //opcode=EPOLL_CTL_ADD
     ...
    }
    
  3. 当新的客户端连接到来的时候,触发epoll_wait,通过fd_may_recv和fd_update_cache函数更新fdtab[fd].cache ,在事件循环中统一处理cached事件
    void fd_process_cached_events()
    {
     int fd, entry, e;
     for (entry = 0; entry < fd_cache_num; ) {
         fd = fd_cache[entry];
         e = fdtab[fd].state;
    
         if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev)
             fdtab[fd].iocb(fd);
     }
    }
    

    这个iocb实际上在proto_tcp.c中的tcp_bind_listener中被设置为listner.c中的listener_accept,这里调用真正的accept就是listen中的accept; 我们在上文中已经提到了,在check_config_validity中,实际上被设置为了session_accept_fd; 在session_accept_fd会创建session,并且分配一个任务(task); 在stream_new中设置process回调为process_stream;

再梳理一下:

  1. 新请求到来的时候,触发proto_tcp.c中的proto_tcpv4.accept(构造时设置为tcp_bind_listener);
  2. tcp_bind_listener调用listen.c中的accept(构造时设置为session_accept_fd);
  3. 在session_accept_fd中创建session,并且和一个task绑定,在stream_new中唤醒task(task_wakeup),下一次事件循环中调用task的process_stream函数;
  4. 因为初始化的时候si_b->state设置为 SI_ST_REQ,更新为待连接状态
    2121            if (si_b->state == SI_ST_INI) {
    (gdb) 
    2122                    if (!(req->flags & CF_SHUTW)) {
    (gdb) 
    2123                            if ((req->flags & CF_AUTO_CONNECT) || !channel_is_empty(req)) {
    (gdb) 
    2128                                    si_b->state = SI_ST_REQ; /* new connection requested */
    

5 . 准备连接,状态更新为SI_ST_ASS;

sess_prepare_conn_req (s=0x77c030) at src/stream.c:998
998             struct stream_interface *si = &s->si[1];
(gdb) n
1008            if (si->state != SI_ST_REQ)
(gdb) 
1011            if (unlikely(obj_type(s->target) == OBJ_TYPE_APPLET)) {
(gdb) 
1044            if (srv_redispatch_connect(s) != 0) {
(gdb) 
1064            s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
(gdb) 
1065            si->state = SI_ST_ASS;

6.进入sess_update_stream_int中;

sess_update_stream_int:(stream.c:2156)
    connect_server:(stream.c:825)
        si_connect:(backend.c:1188)
            conn->ctrl->connect:(stream_interface.h:355);
             #其中conn->ctrl设置为proto_tcpv4(connnection.h:conn_prepare函数中)
             #所以此处也就是调用我们的proto_tcpv4.connect函
               tcp_connect_server:(proto_tcp.c:299);

连接成功后调用conn_ctrl_init(fd_insert)加入到epoll中,监听读写事件;

2.2.1 HAProxy 读写事件

读写事件不详细分析了,有兴趣的朋友可以看一下si_attach_conn,si_conn_cb,raw_sock几个回调函数 再结合事件循环来分析,加上gdb一步步,也能分析出来

总体来说,haproxy的代码并不十分好读,但是慢慢去看,也能令人感到启发