Reactor 是一种事件驱动的 I/O 多路复用架构如 epoll 非阻塞 socket 回调分发一般用于高并发服务端。只在 I/O 事件就绪时才执行对应回调无事可做时阻塞在 epoll_waitCPU 不会空转相比轮询占用极低。同时 Reactor 解耦了事件检测和业务处理主循环只负责分发事件具体逻辑全部交给回调在操作系统 I/O 层和业务层之间起到事件分发中间层的作用。从代码实现角度来看整体分为三部分结构体主循环和回调函数。本文均为个人观点如有错误请纠正。1.结构体typedefint(*RCALLBACK)(intfd);structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]{0};内部分别是socket的fd读写缓冲区读写的长度和回调函数。2.主循环intinit_server(unsignedshortport){//初始化部分intsockfdsocket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtons(INADDR_ANY);servaddr.sin_porthtons(port);if(-1bind(sockfd,(structsockaddr*)servaddr,sizeof(structsockaddr_in))){printf(bind error\n);return-1;}listen(sockfd,10);returnsockfd;}intmain(intargc,charconst*argv[]){unsignedshortport2000;intsockfdinit_server(port);epfdepoll_create(1);conn_list[sockfd].fdsockfd;conn_list[sockfd].r_action.accept_callbackaccept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]{0};intnreadyepoll_wait(epfd,events,1024,-1);inti0;for(i0;inready;i){intconnfdevents[i].data.fd;if(events[i].eventsEPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].eventsEPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}初始化函数的内容是socket连接部分——创建socket绑定监听这三步并返回socket连接的sockfd主循环中采用epoll来循环接收判断在while循环中死等事件事件来了就找对应的回调去处理处理完继续等。3.回调函数//设置事件intset_event(intfd,intevent,intflag){if(flag){structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,ev);}else{structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,ev);}}//注册事件//这部分代码可以直接放在accept_cb里。我为了代码简洁就把这部分单独拿出来了intevent_register(intfd,intevent){if(fd0)return-1;conn_list[fd].fdfd;conn_list[fd].r_action.recv_callbackrecv_cb;conn_list[fd].send_callbacksend_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength0;set_event(fd,event,1);}//accept回调函数intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlensizeof(client_addr);intclientfdaccept(fd,(structsockaddr*)client_addr,len);printf(accept finishd: %d\n,clientfd);if(clientfd0)return-1;event_register(clientfd,EPOLLIN);return0;}//recv回调函数intrecv_cb(intfd){intcountrecv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count0){printf(client disconnection: %d\n,fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlengthcount;printf(Recv: %s\n,conn_list[fd].rbuffer);conn_list[fd].wlengthconn_list[fd].rlengthcount;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}//semd回调函数intsend_cb(intfd){intcountsend(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}(1).accept_cb内部调用accept完成socket连接并注册事件。(2).event_register作用是把一个 fd注册到conn_list里设置它的回调函数清空读写缓冲器重置读写长度设置为可读。这里我为了代码简洁性单独写了这个函数。它内部的代码也可以直接放在acceept_cb里没什么区别。(3).set_event作用是设置事件类型。根据传进来的flag判断。非零则将传进来的fd添加进epoll监听队列中类型是传进来的event。零则是对传进来的fd进行修改。(4).recv_cb内部调用recv来接收数据并把数据及其长度写入写缓冲区和长度便于后续调用send并设置事件为可写使下一次循环进入send_cb。(5).send_cb内部调用send发送数据并将事件置成可读便于后续接收消息4.总结Reactor 本质是通过主循环不断等待就绪的 fd然后根据事件类型可读/可写/新连接执行对应的回调函数。下面是整体代码#includesys/socket.h#includenetinet/in.h#includestdio.h#includeerror.h#includepthread.h#includestring.h#includesys/select.h#includepoll.h#includesys/epoll.h#defineBUFFER_LENGTH1024#defineCONNECTION_SIZE1048576intsend_cb(intfd);intaccept_cb(intfd,intevent);intrecv_cb(intfd);typedefint(*RCALLBACK)(intfd);intepfd0;structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]{0};intset_event(intfd,intevent,intflag){if(flag){// no 0structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,ev);}else{structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,ev);}}intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlensizeof(client_addr);intclientfdaccept(fd,(structsockaddr*)client_addr,len);printf(accept finishd: %d\n,clientfd);if(clientfd0)return-1;event_register(clientfd,EPOLLIN);return0;}intrecv_cb(intfd){intcountrecv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count0){printf(client disconnection: %d\n,fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlengthcount;printf(Recv: %s\n,conn_list[fd].rbuffer);conn_list[fd].wlengthconn_list[fd].rlengthcount;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}intsend_cb(intfd){intcountsend(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}intinit_server(unsignedshortport){//chu shi hua serverintsockfdsocket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtons(INADDR_ANY);servaddr.sin_porthtons(port);if(-1bind(sockfd,(structsockaddr*)servaddr,sizeof(structsockaddr_in))){printf(bind error\n);return-1;}listen(sockfd,10);returnsockfd;}intevent_register(intfd,intevent){if(fd0)return-1;conn_list[fd].fdfd;conn_list[fd].r_action.recv_callbackrecv_cb;conn_list[fd].send_callbacksend_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength0;set_event(fd,event,1);}intmain(intargc,charconst*argv[]){unsignedshortport2000;intsockfdinit_server(port);epfdepoll_create(1);conn_list[sockfd].fdsockfd;conn_list[sockfd].r_action.accept_callbackaccept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]{0};intnreadyepoll_wait(epfd,events,1024,-1);inti0;for(i0;inready;i){intconnfdevents[i].data.fd;if(events[i].eventsEPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].eventsEPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}零声社区资源链接https:github.com/0voice
基于 epoll 的简易 Reactor 网络模型实现
发布时间:2026/6/16 1:46:34
Reactor 是一种事件驱动的 I/O 多路复用架构如 epoll 非阻塞 socket 回调分发一般用于高并发服务端。只在 I/O 事件就绪时才执行对应回调无事可做时阻塞在 epoll_waitCPU 不会空转相比轮询占用极低。同时 Reactor 解耦了事件检测和业务处理主循环只负责分发事件具体逻辑全部交给回调在操作系统 I/O 层和业务层之间起到事件分发中间层的作用。从代码实现角度来看整体分为三部分结构体主循环和回调函数。本文均为个人观点如有错误请纠正。1.结构体typedefint(*RCALLBACK)(intfd);structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]{0};内部分别是socket的fd读写缓冲区读写的长度和回调函数。2.主循环intinit_server(unsignedshortport){//初始化部分intsockfdsocket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtons(INADDR_ANY);servaddr.sin_porthtons(port);if(-1bind(sockfd,(structsockaddr*)servaddr,sizeof(structsockaddr_in))){printf(bind error\n);return-1;}listen(sockfd,10);returnsockfd;}intmain(intargc,charconst*argv[]){unsignedshortport2000;intsockfdinit_server(port);epfdepoll_create(1);conn_list[sockfd].fdsockfd;conn_list[sockfd].r_action.accept_callbackaccept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]{0};intnreadyepoll_wait(epfd,events,1024,-1);inti0;for(i0;inready;i){intconnfdevents[i].data.fd;if(events[i].eventsEPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].eventsEPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}初始化函数的内容是socket连接部分——创建socket绑定监听这三步并返回socket连接的sockfd主循环中采用epoll来循环接收判断在while循环中死等事件事件来了就找对应的回调去处理处理完继续等。3.回调函数//设置事件intset_event(intfd,intevent,intflag){if(flag){structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,ev);}else{structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,ev);}}//注册事件//这部分代码可以直接放在accept_cb里。我为了代码简洁就把这部分单独拿出来了intevent_register(intfd,intevent){if(fd0)return-1;conn_list[fd].fdfd;conn_list[fd].r_action.recv_callbackrecv_cb;conn_list[fd].send_callbacksend_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength0;set_event(fd,event,1);}//accept回调函数intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlensizeof(client_addr);intclientfdaccept(fd,(structsockaddr*)client_addr,len);printf(accept finishd: %d\n,clientfd);if(clientfd0)return-1;event_register(clientfd,EPOLLIN);return0;}//recv回调函数intrecv_cb(intfd){intcountrecv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count0){printf(client disconnection: %d\n,fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlengthcount;printf(Recv: %s\n,conn_list[fd].rbuffer);conn_list[fd].wlengthconn_list[fd].rlengthcount;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}//semd回调函数intsend_cb(intfd){intcountsend(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}(1).accept_cb内部调用accept完成socket连接并注册事件。(2).event_register作用是把一个 fd注册到conn_list里设置它的回调函数清空读写缓冲器重置读写长度设置为可读。这里我为了代码简洁性单独写了这个函数。它内部的代码也可以直接放在acceept_cb里没什么区别。(3).set_event作用是设置事件类型。根据传进来的flag判断。非零则将传进来的fd添加进epoll监听队列中类型是传进来的event。零则是对传进来的fd进行修改。(4).recv_cb内部调用recv来接收数据并把数据及其长度写入写缓冲区和长度便于后续调用send并设置事件为可写使下一次循环进入send_cb。(5).send_cb内部调用send发送数据并将事件置成可读便于后续接收消息4.总结Reactor 本质是通过主循环不断等待就绪的 fd然后根据事件类型可读/可写/新连接执行对应的回调函数。下面是整体代码#includesys/socket.h#includenetinet/in.h#includestdio.h#includeerror.h#includepthread.h#includestring.h#includesys/select.h#includepoll.h#includesys/epoll.h#defineBUFFER_LENGTH1024#defineCONNECTION_SIZE1048576intsend_cb(intfd);intaccept_cb(intfd,intevent);intrecv_cb(intfd);typedefint(*RCALLBACK)(intfd);intepfd0;structconn{intfd;charrbuffer[BUFFER_LENGTH];intrlength;charwbuffer[BUFFER_LENGTH];intwlength;RCALLBACK send_callback;union{RCALLBACK recv_callback;RCALLBACK accept_callback;}r_action;};structconnconn_list[CONNECTION_SIZE]{0};intset_event(intfd,intevent,intflag){if(flag){// no 0structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_ADD,fd,ev);}else{structepoll_eventev;ev.data.fdfd;ev.eventsevent;epoll_ctl(epfd,EPOLL_CTL_MOD,fd,ev);}}intaccept_cb(intfd){structsockaddr_inclient_addr;socklen_tlensizeof(client_addr);intclientfdaccept(fd,(structsockaddr*)client_addr,len);printf(accept finishd: %d\n,clientfd);if(clientfd0)return-1;event_register(clientfd,EPOLLIN);return0;}intrecv_cb(intfd){intcountrecv(fd,conn_list[fd].rbuffer,BUFFER_LENGTH,0);if(count0){printf(client disconnection: %d\n,fd);close(fd);epoll_ctl(epfd,EPOLL_CTL_DEL,fd,NULL);// un finishedreturn0;}conn_list[fd].rlengthcount;printf(Recv: %s\n,conn_list[fd].rbuffer);conn_list[fd].wlengthconn_list[fd].rlengthcount;memcpy(conn_list[fd].wbuffer,conn_list[fd].rbuffer,conn_list[fd].wlength);set_event(fd,EPOLLOUT,0);returncount;}intsend_cb(intfd){intcountsend(fd,conn_list[fd].wbuffer,conn_list[fd].wlength,0);set_event(fd,EPOLLIN,0);returncount;}intinit_server(unsignedshortport){//chu shi hua serverintsockfdsocket(AF_INET,SOCK_STREAM,0);structsockaddr_inservaddr;servaddr.sin_familyAF_INET;servaddr.sin_addr.s_addrhtons(INADDR_ANY);servaddr.sin_porthtons(port);if(-1bind(sockfd,(structsockaddr*)servaddr,sizeof(structsockaddr_in))){printf(bind error\n);return-1;}listen(sockfd,10);returnsockfd;}intevent_register(intfd,intevent){if(fd0)return-1;conn_list[fd].fdfd;conn_list[fd].r_action.recv_callbackrecv_cb;conn_list[fd].send_callbacksend_cb;memset(conn_list[fd].rbuffer,0,BUFFER_LENGTH);conn_list[fd].rlength0;memset(conn_list[fd].wbuffer,0,BUFFER_LENGTH);conn_list[fd].wlength0;set_event(fd,event,1);}intmain(intargc,charconst*argv[]){unsignedshortport2000;intsockfdinit_server(port);epfdepoll_create(1);conn_list[sockfd].fdsockfd;conn_list[sockfd].r_action.accept_callbackaccept_cb;set_event(sockfd,EPOLLIN,1);while(1){structepoll_eventevents[1024]{0};intnreadyepoll_wait(epfd,events,1024,-1);inti0;for(i0;inready;i){intconnfdevents[i].data.fd;if(events[i].eventsEPOLLIN){conn_list[connfd].r_action.recv_callback(connfd);}if(events[i].eventsEPOLLOUT){conn_list[connfd].send_callback(connfd);}}}return0;}零声社区资源链接https:github.com/0voice