haproxy tcp代理实现原理简析
2018年9月3日 星期一, 发表于 深圳
如果你对本文有任何的建议或者疑问, 可以在 这里给我提 Issues, 谢谢! :)
haproxy tcp代理实现原理简析
1. 代理初始化
下面是一个典型的四层代理配置;
功能是 使得haproxy监听本机3307端口,并将该端口上的所有tcp数据包转发到本机的10000端口
global
maxconn 500
defaults
contimeout 1000
clitimeout 5000
srvtimeout 5000
listen mysql_1
bind :3307
mode tcp
server srv1 127.0.0.1:10000
haproxy配置文件的解析步骤为
standard.c: struct sockaddr_storage * str2sa_range()
cfgparse.c x: int str2listener(...)
cfgparse.c: int cfg_parse_listen(...)
cfgparse.c: int readcfgfile(const char *file)
haproxy.c: void init(int argc, char **argv)
haproxy.c: int main(int argc, char **argv)
上面的转发配置解析成一个listener结构体:
struct listener {
enum obj_type obj_type; /* object type = OBJ_TYPE_LISTENER */
enum li_state state; /* state: NEW, INIT, ASSIGNED, LISTEN, READY, FULL */
int fd; /* the listen socket */
int luid; /* listener universally unique ID, used for SNMP */
struct protocol *proto; /* protocol this listener belongs to */
int (*accept)(struct listener *l, int fd, struct sockaddr_storage *addr); /* upper layer's accept() */
/* warning: this struct is huge, keep it at the bottom */
struct sockaddr_storage addr; /* the address we listen to */
struct {
struct eb32_node id; /* place in the tree of used IDs */
} conf; /* config information */
...
};
其中比较重要的是字段是protocol结构体(协议的处理),accept回调函数(处理客户端连接),tcp代理主要是proto_tcpv4结构体
void tcpv4_add_listener(struct listener *listener)
{
if (listener->state != LI_INIT)
return;
listener->state = LI_ASSIGNED;
listener->proto = &proto_tcpv4; //tcp协议的转发处理在此处
LIST_ADDQ(&proto_tcpv4.listeners, &listener->proto_list);
proto_tcpv4.nb_listeners++;
}
上面设置了listener的协议和状态,在初始化的时候,同时设置了listener的accept回调和handler回调
cfgparse.c:check_config_validity()
listener->accept = session_accept_fd;
listener->handler = process_stream;
##2. HAProxy事件处理
HAProxy在四层代理的情况下,只是在客户端和服务端转发了双向流量; HAProxy默认是单进程,事件驱动的,主程序通过事件循环驱动,将各个协议抽象成protocol类来处理,其中对于tcp协议转发的代码主要在proto_tcp.c中的proto_tcpv4结构体,上面在配置解析的时候已经将配置此结构体绑定;
static struct protocol proto_tcpv4 = {
.name = "tcpv4",
.sock_domain = AF_INET,
.sock_type = SOCK_STREAM,
.sock_prot = IPPROTO_TCP,
.sock_family = AF_INET,
.sock_addrlen = sizeof(struct sockaddr_in),
.l3_addrlen = 32/8,
.accept = &listener_accept, //接受客户端的请求
.connect = tcp_connect_server, //连接远程服务器
.bind = tcp_bind_listener, //绑定本地端口
.bind_all = tcp_bind_listeners,
.unbind_all = unbind_all_listeners,
.enable_all = enable_all_listeners,
.get_src = tcp_get_src,
.get_dst = tcp_get_dst,
.drain = tcp_drain,
.pause = tcp_pause_listener,
.listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
.nb_listeners = 0,
};
其中比较重要的是几个回调函数,bind/accept/connect
2.1 HAProxy 消息封装
HAProxy默认单进程,事件驱动,所有的网络,信号事件的处理都在事件循环中执行的,下面看一下事件循环如何 处理网络事件,并且和协议中的回调函数做适配; HAProxy的事件循环在haproxy.c: run_poll_loop函数中
void run_poll_loop()
{
int next;
tv_update_date(0,1);
while (1) {
process_runnable_tasks(); /* Process a few tasks */
signal_process_queue(); /* check if we caught some signals and process them */
next = wake_expired_tasks(); /* Check if we can expire some tasks */
if (jobs == 0) /* stop when there's nothing left to do */
break;
if (fd_cache_num || run_queue || signal_queue_len || !LIST_ISEMPTY(&applet_active_queue))
next = now_ms; /* expire immediately if events are pending */
/* The poller will ensure it returns around <next> */
cur_poller.poll(&cur_poller, next);
fd_process_cached_events();
applet_run_active();
}
}
HAProxy将IO多路复用的监听函数(select/epoll/poll等函数)通过poller 结构体对外屏蔽(有点类似于libevent),根据当前系统支持自动选出符合要求的复用函数,在介绍事件循环之前,继续深入了解一下HAProxy的poller
2.1.1 HAProxy poller封装
struct poller {
void *private; /* any private data for the poller */
void REGPRM1 (*clo)(const int fd); /* mark <fd> as closed */
void REGPRM2 (*poll)(struct poller *p, int exp); /* the poller itself */
int REGPRM1 (*init)(struct poller *p); /* poller initialization */
void REGPRM1 (*term)(struct poller *p); /* termination of this poller */
int REGPRM1 (*test)(struct poller *p); /* pre-init check of the poller */
int REGPRM1 (*fork)(struct poller *p); /* post-fork re-opening */
const char *name; /* poller name */
int pref; /* try pollers with higher preference first */
};
HAProxy在ev_select.c/ev_kqueue.c/ev_poll.c/ev_select.c中封装了四种poller,以epoll为例
__attribute__((constructor))
static void _do_register(void)
{
struct poller *p;
if (nbpollers >= MAX_POLLERS)
return;
epoll_fd = -1;
p = &pollers[nbpollers++];
p->name = "epoll";
p->pref = 300; //优先级
p->private = NULL;
p->clo = __fd_clo;
p->test = _do_test;
p->init = _do_init;
p->term = _do_term;
p->poll = _do_poll;
p->fork = _do_fork;
}
其中__attribute__((constructor)) 说明注册函数在main函数启动之前注册到全局变量pollers中, 那些多路复用函数可以使用,是在makefile中定义的(下面是节选的一段makefile)
ifeq ($(TARGET),linux26)
# This is for standard Linux 2.6 with netfilter and standard epoll()
USE_GETSOCKNAME = implicit
USE_NETFILTER = implicit
USE_POLL = implicit
USE_EPOLL = implicit //可以使用epoll
USE_TPROXY = implicit
USE_LIBCRYPT = implicit
USE_FUTEX = implicit
EXTRA += haproxy-systemd-wrapper
USE_DL = implicit
else
有一个小细节,epoll的优先级是300(pref 字段),select优先级是150,说明在支持epoll的系统中,epoll优先被使用,如果系统不支持某种poller,则将优先级设置为0(可搜索disable_poller函数);
以上是对poller的抽象,在ev_epoll.c的_do_poll中实际上也是调用epll_wait监听各个socket上的事件(到这个地方有没有觉得很熟悉了哈); 除了对poll进行统一封装之外,HAProxy也对文件描述符fd进行了统一的封装,抽象成fdtab结构体;
2.1.2 HAProxy fd封装
struct fdtab {
void (*iocb)(int fd); /* I/O handler */
void *owner; /* the connection or listener associated with this fd, NULL if closed */
unsigned int cache; /* position+1 in the FD cache. 0=not in cache. */
unsigned char state; /* FD state for read and write directions (2*3 bits) */
unsigned char ev; /* event seen in return of poll() : FD_POLL_* */
unsigned char new:1; /* 1 if this fd has just been created */
unsigned char updated:1; /* 1 if this fd is already in the update list */
unsigned char linger_risk:1; /* 1 if we must kill lingering before closing */
unsigned char cloned:1; /* 1 if a cloned socket, requires EPOLL_CTL_DEL on close */
};
fdtab不仅是一个结构体,也是一个全局数组
fd.c: struct fdtab *fdtab = NULL; /* array of all the file descriptors */
haproxy.c:fdtab = calloc(1, sizeof(struct fdtab) * (global.maxsock));
OK,有了以上基本了解,我们来看看对socket消息的处理;
2.2 HAProxy 消息处理
本文主要看看HAProxy对消息的处理,包括新连接的处理,可读可写消息的处理;
2.2.1 HAProxy 接受新连接
能够接收新连接,毫无疑问,首先需要bind本地端口,看看bind的流程,
haproxy.c: int main(int argc,char** argv)
proxy.c: int start_proxies(int verbose)
proxy.c: lerr = listener->proto->bind(listener, msg, sizeof(msg));
proto_tcp.c: int tcp_bind_listener(struct listener *listener, char *errmsg, int errlen)
在最开始已经介绍了,haporxy将配置文件解析成struct proxy 的结构体,在start_proxies中监听所有配置中的端口, 在tcp协议中,bind函数实际上设置为proto_tcp中的tcp_bind_listenner函数; 在tcp_bind_listenner函数中,将被动连接的fd放入fd全局列表中,并且设置accept的回调
//tcp_bind_listenner.c
/* the socket is ready */
listener->fd = fd;
listener->state = LI_LISTEN;
fdtab[fd].owner = listener; /* reference the listener instead of a task */
fdtab[fd].iocb = listener->proto->accept; //设置回调
fd_insert(fd);
在fd_insert中,fdtab[fd]被初始化,
static inline void fd_insert(int fd)
{
fdtab[fd].ev = 0;
fdtab[fd].new = 1;
fdtab[fd].linger_risk = 0;
fdtab[fd].cloned = 0;
if (fd + 1 > maxfd)
maxfd = fd + 1;
}
上面都设置了监听套接字和一些回调函数;
#0 updt_fd_polling (fd=8) at include/proto/fd.h:93
#1 0x00000000004194c9 in fd_update_cache (fd=8) at include/proto/fd.h:163
#2 0x000000000041973a in fd_want_recv (fd=8) at include/proto/fd.h:319
#3 0x00000000004199ee in enable_listener (listener=0x756aa0) at src/listener.c:63
#4 0x0000000000419e97 in enable_all_listeners (proto=0x73c820) at src/listener.c:211
#5 0x0000000000407c2f in protocol_enable_all () at src/protocol.c:94
#6 0x0000000000407585 in main (argc=3, argv=0x7fffffffe0e8) at src/haproxy.c:2093
真正将监听套接字加入epoll事件监听的操作是,
- 在protocol_enable_all 中,将fd保存到fd_updt结构体中
- 在事件循环_do_poll中,会将fd接入epoll中;
for (updt_idx = 0; updt_idx < fd_nbupdt; updt_idx++) { fd = fd_updt[updt_idx]; eo = fdtab[fd].state; en = fd_compute_new_polled_status(eo); ... ev.data.fd = fd; epoll_ctl(epoll_fd, opcode, fd, &ev); //opcode=EPOLL_CTL_ADD ... }
- 当新的客户端连接到来的时候,触发epoll_wait,通过fd_may_recv和fd_update_cache函数更新fdtab[fd].cache ,在事件循环中统一处理cached事件
void fd_process_cached_events() { int fd, entry, e; for (entry = 0; entry < fd_cache_num; ) { fd = fd_cache[entry]; e = fdtab[fd].state; if (fdtab[fd].iocb && fdtab[fd].owner && fdtab[fd].ev) fdtab[fd].iocb(fd); } }
这个iocb实际上在proto_tcp.c中的tcp_bind_listener中被设置为listner.c中的listener_accept,这里调用真正的accept就是listen中的accept; 我们在上文中已经提到了,在check_config_validity中,实际上被设置为了session_accept_fd; 在session_accept_fd会创建session,并且分配一个任务(task); 在stream_new中设置process回调为process_stream;
再梳理一下:
- 新请求到来的时候,触发proto_tcp.c中的proto_tcpv4.accept(构造时设置为tcp_bind_listener);
- tcp_bind_listener调用listen.c中的accept(构造时设置为session_accept_fd);
- 在session_accept_fd中创建session,并且和一个task绑定,在stream_new中唤醒task(task_wakeup),下一次事件循环中调用task的process_stream函数;
- 因为初始化的时候si_b->state设置为 SI_ST_REQ,更新为待连接状态
2121 if (si_b->state == SI_ST_INI) { (gdb) 2122 if (!(req->flags & CF_SHUTW)) { (gdb) 2123 if ((req->flags & CF_AUTO_CONNECT) || !channel_is_empty(req)) { (gdb) 2128 si_b->state = SI_ST_REQ; /* new connection requested */
5 . 准备连接,状态更新为SI_ST_ASS;
sess_prepare_conn_req (s=0x77c030) at src/stream.c:998
998 struct stream_interface *si = &s->si[1];
(gdb) n
1008 if (si->state != SI_ST_REQ)
(gdb)
1011 if (unlikely(obj_type(s->target) == OBJ_TYPE_APPLET)) {
(gdb)
1044 if (srv_redispatch_connect(s) != 0) {
(gdb)
1064 s->logs.t_queue = tv_ms_elapsed(&s->logs.tv_accept, &now);
(gdb)
1065 si->state = SI_ST_ASS;
6.进入sess_update_stream_int中;
sess_update_stream_int:(stream.c:2156)
connect_server:(stream.c:825)
si_connect:(backend.c:1188)
conn->ctrl->connect:(stream_interface.h:355);
#其中conn->ctrl设置为proto_tcpv4(connnection.h:conn_prepare函数中)
#所以此处也就是调用我们的proto_tcpv4.connect函
tcp_connect_server:(proto_tcp.c:299);
连接成功后调用conn_ctrl_init(fd_insert)加入到epoll中,监听读写事件;
2.2.1 HAProxy 读写事件
读写事件不详细分析了,有兴趣的朋友可以看一下si_attach_conn,si_conn_cb,raw_sock几个回调函数 再结合事件循环来分析,加上gdb一步步,也能分析出来
总体来说,haproxy的代码并不十分好读,但是慢慢去看,也能令人感到启发