在并发编程中除了池和流的方式外还存在一种基本形式就是事件驱动。事件驱动的思想是使用一个线程不断循环处理任务将任务分发给其他复用的线程这样就通过单线程处理大量任务。发展历程io密集任务可以使用多线程一个io使用一个线程。但是如果任务过多比如达到百万级此时线程数量也会达到百万级别此时系统支持不了会崩溃。或者说使用线程池但是线程数量也需要很大频繁的切换线程也会带来大量的开销。这里有一个前提是io任务的处理机制基本一致可以用一个函数处理大量任务。此时可以考虑一种方式使用少量线程完成大量io任务。使用一个死循环一个队列队列中存储io任务死循环中不断处理io,这样就可以通过单线程处理百万io。但是这样会有阻塞问题处理io会使cpu等待所以需要将请求io和处理io分离。这样cpu只处理请求io处理io交给其他线程处理能充分利用cpu资源。同时还有一个问题循环内部会查询io任务队里,但如果io队列里没有任务这个循环就没有意义此时cpu会空转浪费资源。所以需要一种机制当io队列为空时让cpu等待当有任务时再唤醒cpu继续处理任务。背景问题1.使用多线程处理io密集任务当线程数量过大带来系统支持线程数不足线程切换资源消耗大。2.cpu和io处理时间差大带来的阻塞问题。3.非阻塞循环带来cpu空转。解决方案1.循环使用一个死循环不断读取任务和将任务分配给处理器。解决多线程处理使用单线程实现。2.分离发起io请求后不等待io返回继续处理其他任务当io返回后再处理io返回结果。解决阻塞问题。3.监督循环内部监视任务当任务为空时让cpu等待当有任务时再唤醒cpu继续处理任务。解决cpu空转问题。实现将所有任务抽象为不同的数据和对应的处理方式这就带获得了各个组成部分。实现因素1.事件将类型和数据抽象为一个结构体这个结构体就是事件。2.任务队列将要处理的事件放入队列中循环从队列中获取事件处理事件。3.处理器队列将任务类型和处理方式抽象为一个map,key为任务类型value为处理方式,放置在一个队列中处理任务时搜索这个队列找到对应任务类型执行处理方式。4.注册将处理器注册到处理器队列中。5.循环死循环调度任务队列和处理器队列监督任务队列当任务队列为空时让cpu等待当有任务时再唤醒cpu继续处理任务。6.分发有任务时查找任务对应的处理器将数据输入处理器中处理数据。#includestring#includequeue#includemap#includevector#includefunctional#includemutexstructevent{inttype;std::string data;};// 事件驱动类用于处理事件驱动的程序classEventDrive{private:// 事件队列用于存储待处理的事件std::queueeventqueue_events;// 事件处理器映射表键为事件类型值为对应的事件处理函数列表std::mapint,std::vectorstd::functionvoid(constevent)map_event_handlers;// 互斥锁用于保证线程安全std::mutex mut;// 运行状态标志true表示正在运行false表示已停止boolis_running;public:// 注册处理器为特定类型的事件注册一个处理函数voidon(inttype,std::functionvoid(constevent)handler);// 循环处理事件持续从队列中取出事件并处理直到is_running为falsevoidrun();// 添加事件将新事件添加到事件队列中voidadd_event(inttype,std::string data);// 处理事件根据事件类型查找并执行对应的处理函数voidhandle_event(constevente);// 停止事件驱动voidstop();};// 注册事件voidEventDrive::on(inttype,std::functionvoid(constevent)handler){map_event_handlers[type].push_back(handler);}// 循环处理事件voidEventDrive::run(){is_runningtrue;while(is_running){if(queue_events.empty()){continue;}else{std::lock_guardstd::mutexlock(mut);event equeue_events.front();queue_events.pop();handle_event(e);}std::this_thread::sleep_for(std::chrono::milliseconds(1));}}// 添加事件voidEventDrive::add_event(inttype,std::string data){std::lock_guardstd::mutexlock(mut);event e;e.typetype;e.datadata;queue_events.push(e);}// 处理事件voidEventDrive::handle_event(constevente){autoitmap_event_handlers.find(e.type);if(it!map_event_handlers.end()){for(autohandler:it-second){handler(e);}}}// 停止事件驱动voidEventDrive::stop(){is_runningfalse;}使用1.注册将处理器以函数的方式注册到处理器队列中。2.分析线程循环将事件驱动的循环在主线程外部执行这样主线程用于输入任务。3.载入任务将任务以事件的方式载入任务队列中。#includeevent_drive.hpp#includeiostreamintmain(){EventDrive envent_drive;//注册事件envent_drive.on(1,[](constevente){std::couthandler 1,收到事件1e.datastd::endl;});envent_drive.on(2,[](constevente){std::couthandler 2,收到事件2e.datastd::endl;});envent_drive.on(1,[](constevente){std::couthandler 3,也收到事件1e.datastd::endl;});// 启动事件循环std::threadloop_thread([envent_drive](){envent_drive.run();});// 发送事件envent_drive.add_event(1,data 1);envent_drive.add_event(2,data 2);envent_drive.add_event(1,data 3);// 停止事件循环std::this_thread::sleep_for(std::chrono::milliseconds(100));envent_drive.stop();loop_thread.join();return0;}结果handler 1,收到事件1data 1 handler 3,也收到事件1data 1 handler 2,收到事件2data 2 handler 1,收到事件1data 3 handler 3,也收到事件1data 3总结使用事件驱动的方式将任务抽象为事件将处理方式抽象为处理器将任务和处理器注册到事件驱动中事件驱动循环处理任务。
并发编程(c++)——5.事件驱动
发布时间:2026/6/18 13:17:59
在并发编程中除了池和流的方式外还存在一种基本形式就是事件驱动。事件驱动的思想是使用一个线程不断循环处理任务将任务分发给其他复用的线程这样就通过单线程处理大量任务。发展历程io密集任务可以使用多线程一个io使用一个线程。但是如果任务过多比如达到百万级此时线程数量也会达到百万级别此时系统支持不了会崩溃。或者说使用线程池但是线程数量也需要很大频繁的切换线程也会带来大量的开销。这里有一个前提是io任务的处理机制基本一致可以用一个函数处理大量任务。此时可以考虑一种方式使用少量线程完成大量io任务。使用一个死循环一个队列队列中存储io任务死循环中不断处理io,这样就可以通过单线程处理百万io。但是这样会有阻塞问题处理io会使cpu等待所以需要将请求io和处理io分离。这样cpu只处理请求io处理io交给其他线程处理能充分利用cpu资源。同时还有一个问题循环内部会查询io任务队里,但如果io队列里没有任务这个循环就没有意义此时cpu会空转浪费资源。所以需要一种机制当io队列为空时让cpu等待当有任务时再唤醒cpu继续处理任务。背景问题1.使用多线程处理io密集任务当线程数量过大带来系统支持线程数不足线程切换资源消耗大。2.cpu和io处理时间差大带来的阻塞问题。3.非阻塞循环带来cpu空转。解决方案1.循环使用一个死循环不断读取任务和将任务分配给处理器。解决多线程处理使用单线程实现。2.分离发起io请求后不等待io返回继续处理其他任务当io返回后再处理io返回结果。解决阻塞问题。3.监督循环内部监视任务当任务为空时让cpu等待当有任务时再唤醒cpu继续处理任务。解决cpu空转问题。实现将所有任务抽象为不同的数据和对应的处理方式这就带获得了各个组成部分。实现因素1.事件将类型和数据抽象为一个结构体这个结构体就是事件。2.任务队列将要处理的事件放入队列中循环从队列中获取事件处理事件。3.处理器队列将任务类型和处理方式抽象为一个map,key为任务类型value为处理方式,放置在一个队列中处理任务时搜索这个队列找到对应任务类型执行处理方式。4.注册将处理器注册到处理器队列中。5.循环死循环调度任务队列和处理器队列监督任务队列当任务队列为空时让cpu等待当有任务时再唤醒cpu继续处理任务。6.分发有任务时查找任务对应的处理器将数据输入处理器中处理数据。#includestring#includequeue#includemap#includevector#includefunctional#includemutexstructevent{inttype;std::string data;};// 事件驱动类用于处理事件驱动的程序classEventDrive{private:// 事件队列用于存储待处理的事件std::queueeventqueue_events;// 事件处理器映射表键为事件类型值为对应的事件处理函数列表std::mapint,std::vectorstd::functionvoid(constevent)map_event_handlers;// 互斥锁用于保证线程安全std::mutex mut;// 运行状态标志true表示正在运行false表示已停止boolis_running;public:// 注册处理器为特定类型的事件注册一个处理函数voidon(inttype,std::functionvoid(constevent)handler);// 循环处理事件持续从队列中取出事件并处理直到is_running为falsevoidrun();// 添加事件将新事件添加到事件队列中voidadd_event(inttype,std::string data);// 处理事件根据事件类型查找并执行对应的处理函数voidhandle_event(constevente);// 停止事件驱动voidstop();};// 注册事件voidEventDrive::on(inttype,std::functionvoid(constevent)handler){map_event_handlers[type].push_back(handler);}// 循环处理事件voidEventDrive::run(){is_runningtrue;while(is_running){if(queue_events.empty()){continue;}else{std::lock_guardstd::mutexlock(mut);event equeue_events.front();queue_events.pop();handle_event(e);}std::this_thread::sleep_for(std::chrono::milliseconds(1));}}// 添加事件voidEventDrive::add_event(inttype,std::string data){std::lock_guardstd::mutexlock(mut);event e;e.typetype;e.datadata;queue_events.push(e);}// 处理事件voidEventDrive::handle_event(constevente){autoitmap_event_handlers.find(e.type);if(it!map_event_handlers.end()){for(autohandler:it-second){handler(e);}}}// 停止事件驱动voidEventDrive::stop(){is_runningfalse;}使用1.注册将处理器以函数的方式注册到处理器队列中。2.分析线程循环将事件驱动的循环在主线程外部执行这样主线程用于输入任务。3.载入任务将任务以事件的方式载入任务队列中。#includeevent_drive.hpp#includeiostreamintmain(){EventDrive envent_drive;//注册事件envent_drive.on(1,[](constevente){std::couthandler 1,收到事件1e.datastd::endl;});envent_drive.on(2,[](constevente){std::couthandler 2,收到事件2e.datastd::endl;});envent_drive.on(1,[](constevente){std::couthandler 3,也收到事件1e.datastd::endl;});// 启动事件循环std::threadloop_thread([envent_drive](){envent_drive.run();});// 发送事件envent_drive.add_event(1,data 1);envent_drive.add_event(2,data 2);envent_drive.add_event(1,data 3);// 停止事件循环std::this_thread::sleep_for(std::chrono::milliseconds(100));envent_drive.stop();loop_thread.join();return0;}结果handler 1,收到事件1data 1 handler 3,也收到事件1data 1 handler 2,收到事件2data 2 handler 1,收到事件1data 3 handler 3,也收到事件1data 3总结使用事件驱动的方式将任务抽象为事件将处理方式抽象为处理器将任务和处理器注册到事件驱动中事件驱动循环处理任务。