管理端Server初始化MultiThreadIoEventLoopGroup1.根据传入的线程数初始化事件执行器MultithreadEventExecutorGroup构造器// 初始化时间线程数childrennewEventExecutor[nThreads];// 遍历生成for(inti0;inThreads;i){booleansuccessfalse;try{// 根据传入的类型来确认 执行器的类型children[i]newChild(executor,args);successtrue;}catch(Exceptione){...}finally{// 不成功优雅关闭if(!success){...}}}// 所有 EventLoop 创建完成后再创建选择器。// 当有新任务/新连接到来时chooser 决定把它分配给哪一个 EventLoop。chooserchooserFactory.newChooser(children);...readonlyChildrenCollections.unmodifiableSet(childrenSet);MultiThreadIoEventLoopGroup的newChild()方法protectedIoEventLoopnewChild(Executorexecutor,IoHandlerFactoryioHandlerFactory,SuppressWarnings(unused)Object...args){returnnewSingleThreadIoEventLoop(this,executor,ioHandlerFactory);}publicSingleThreadIoEventLoop(IoEventLoopGroupparent,Executorexecutor,IoHandlerFactoryioHandlerFactory){super(parent,executor,false,ObjectUtil.checkNotNull(ioHandlerFactory,ioHandlerFactory).isChangingThreadSupported());this.maxTaskProcessingQuantumNsDEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;// io 处理层EventLoop 怎么去等待 I/O 事件以及事件来了之后发给谁。this.ioHandlerioHandlerFactory.newHandler(this);}看一下SingleThreadIoEventLoop的构造器/** * parent 线程组归属 * executor 线程的启动方式 * addTaskWakesUp 入队唤醒策略 * supportSuspension 状态机是否含暂停状态 * maxPendingTasks 队列容量上限背压 * rejectedHandler 队列满时的兜底策略 */protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor,booleanaddTaskWakesUp,booleansupportSuspension,intmaxPendingTasks,RejectedExecutionHandlerrejectedHandler){super(parent);this.addTaskWakesUpaddTaskWakesUp;this.supportSuspensionsupportSuspension;this.maxPendingTasksMath.max(16,maxPendingTasks);this.executorThreadExecutorMap.apply(executor,this);taskQueuenewTaskQueue(this.maxPendingTasks);rejectedExecutionHandlerObjectUtil.checkNotNull(rejectedHandler,rejectedHandler);lastActivityTimeNanosticker().nanoTime();}初始化ServerBootstrapServerBootstrapbnewServerBootstrap();// 简化写法同一个 EventLoopGroup 同时承担 boss 和 worker 的职责b.group(group).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializerSocketChannel(){OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepch.pipeline();if(sslCtx!null){p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler);}});// group 初始化处理线程组。b.group(group) 会把同一个 group 同时赋给 boss 和 worker。// 这是简化用法一个线程池既接受连接也处理子连接上的 I/O。// 生产环境中更常见的是显式拆分 bossGroup 和 workerGroup。// EventLoopGroup bossGroup new NioEventLoopGroup(1); // ① boss 线程池只接受连接通常 1~2 个线程// EventLoopGroup workerGroup new NioEventLoopGroup(); // ② worker 线程池处理每个连接上的 I/O默认 CPU 核数*2// ServerBootstrap b new ServerBootstrap();// 显式指定两个线程池第一个是 boss第二个是 worker// b.group(bossGroup, workerGroup) // ← parentGroupbossGroup, childGroupworkerGroup// channel 初始化创建channel的类型使用工厂设计模式// option 作用于 ServerChannelNioServerSocketChannel也就是监听端口、接受连接的那个 Channel。// 整个服务通常只有 1 个 ServerChannel。// handler 作用于 ServerChannel 的 Pipeline只创建 1 次贯穿整个服务器生命周期。// childHandler 作用于每个接入的客户端 SocketChannel。// 每个新连接都有自己的 SocketChannel 和 Pipeline都会执行一次 childHandler 初始化。启动服务// 同步启动ChannelFuturefb.bind(PORT).sync();// bind 绑定本地端口号启动相关任务主要逻辑我们需要看AbstractBootstrap.doBindprivateChannelFuturedoBind(finalSocketAddresslocalAddress){// 初始化服务端 ServerChannel并将它注册到 parent/boss EventLoopGroupfinalChannelFutureregFutureinitAndRegister();...if(regFuture.isDone()){// 此时我们知道注册已经完成并且成功了。ChannelPromisepromisechannel.newPromise();doBind0(regFuture,channel,localAddress,promise);returnpromise;}else{...// 异常处理returnpromise;}}我们看一下initAndRegister方法finalChannelFutureinitAndRegister(){Channelchannelnull;try{// 1.创建 Channel 对象此时还没注册到 EventLoopchannelchannelFactory.newChannel();// 2.调用 init() 方法init(channel);}catch(Throwablet){...// 异常处理}// 3.注册到 EventLoopGroup —— 对 ServerBootstrap 来说这里注册的是 ServerChannel 到 parent/boss groupfinalChannelFutureregFutureconfig().group().register(channel);...returnregFuture;}ServerBootstrap.init方法voidinit(Channelchannel)throwsThrowable{// 配置 ChannelOption影响底层 Channel 行为例如 SO_BACKLOG、SO_REUSEADDR 等setChannelOptions(channel,newOptionsArray(),logger);// 存储用户自定义属性可以通过 AttributeKey 在 Channel 上保存上下文数据setAttributes(channel,newAttributesArray());// 得到pipelinepipeline的初始化是在创建 Channel 时完成ChannelPipelinepchannel.pipeline();// 保存子通道配置的引用避免后续被修改finalEventLoopGroupcurrentChildGroupchildGroup;finalChannelHandlercurrentChildHandlerchildHandler;finalEntryChannelOption?,Object[]currentChildOptionsnewOptionsArray(childOptions);finalEntryAttributeKey?,Object[]currentChildAttrsnewAttributesArray(childAttrs);// 加载拓展插件finalCollectionChannelInitializerExtensionextensionsgetInitializerExtensions();// 给 ServerChannel 添加初始化器。这里初始化的是 ServerChannel 的 Pipeline。p.addLast(newChannelInitializerChannel(){OverridepublicvoidinitChannel(finalChannelch){finalChannelPipelinepipelinech.pipeline();// 添加用户配置的处理器如果存在ChannelHandlerhandlerconfig.handler();if(handler!null){pipeline.addLast(handler);}// 这里的 ch 是 ServerChannel所以任务提交到 boss/parent EventLoop 执行。// 这样可以保证 ServerBootstrapAcceptor 在 handlerAdded 之后再加入 Pipeline。ch.eventLoop().execute(newRunnable(){Overridepublicvoidrun(){// ServerBootstrapAcceptor 是 ServerChannel Pipeline 中的入站处理器。// 它在接收到 accepted SocketChannel 后// 1. 给子 Channel 添加 childHandler// 2. 设置 childOptions 和 childAttrs// 3. 执行扩展点// 4. 将子 Channel 注册到 childGroup也就是 worker EventLoopGroup。pipeline.addLast(newServerBootstrapAcceptor(ch,currentChildGroup,currentChildHandler,currentChildOptions,currentChildAttrs,extensions));}});}});...}关于AbstractChannel的注册流程publicfinalvoidregister(EventLoopeventLoop,finalChannelPromisepromise){ObjectUtil.checkNotNull(eventLoop,eventLoop);// 检查...// 设置关联的 EventLoopAbstractChannel.this.eventLoopeventLoop;// 将通道里面的执行器设置为空AbstractChannelHandlerContextcontextpipeline.tail;do{context.contextExecutornull;contextcontext.prev;}while(context!null);// 如果当前线程就是 EventLoop 线程直接注册否则提交任务到 EventLoop 执行...register0(promise);...}privatevoidregister0(ChannelPromisepromise){...// 创建内部注册 Promise用于监听底层注册结果ChannelPromiseregisterPromisenewPromise();booleanfirstRegistrationneverRegistered;// 添加监听器处理注册成功后的回调registerPromise.addListener(future-{if(future.isSuccess()){// 更新注册状态neverRegisteredfalse;registeredtrue;// 确保在通知 promise 之前调用 handlerAdded防止用户在监听器中触发事件时 Handler 还未添加pipeline.invokeHandlerAddedIfNeeded();// 通知 register() 的调用者成功了safeSetSuccess(promise);// 触发注册事件Pipeline 传播pipeline.fireChannelRegistered();// 仅在首次注册且 Channel 活跃时触发 channelActiveif(isActive()){if(firstRegistration){// 通知我已经准备好了可以进行通信了pipeline.fireChannelActive();}elseif(config().isAutoRead()){// 重新注册且 autoRead 开启时需要重新开始读取数据beginRead();}}}else{// 注册失败关闭 Channel 避免资源泄漏...}});// 底层 I/O 注册。以 NIO 为例这里会把 Channel 注册到 Selector// 并完成 Channel 与当前 EventLoop 的绑定。doRegister(registerPromise);}
1.netty源码阅读-管理端Server启动
发布时间:2026/6/19 18:07:17
管理端Server初始化MultiThreadIoEventLoopGroup1.根据传入的线程数初始化事件执行器MultithreadEventExecutorGroup构造器// 初始化时间线程数childrennewEventExecutor[nThreads];// 遍历生成for(inti0;inThreads;i){booleansuccessfalse;try{// 根据传入的类型来确认 执行器的类型children[i]newChild(executor,args);successtrue;}catch(Exceptione){...}finally{// 不成功优雅关闭if(!success){...}}}// 所有 EventLoop 创建完成后再创建选择器。// 当有新任务/新连接到来时chooser 决定把它分配给哪一个 EventLoop。chooserchooserFactory.newChooser(children);...readonlyChildrenCollections.unmodifiableSet(childrenSet);MultiThreadIoEventLoopGroup的newChild()方法protectedIoEventLoopnewChild(Executorexecutor,IoHandlerFactoryioHandlerFactory,SuppressWarnings(unused)Object...args){returnnewSingleThreadIoEventLoop(this,executor,ioHandlerFactory);}publicSingleThreadIoEventLoop(IoEventLoopGroupparent,Executorexecutor,IoHandlerFactoryioHandlerFactory){super(parent,executor,false,ObjectUtil.checkNotNull(ioHandlerFactory,ioHandlerFactory).isChangingThreadSupported());this.maxTaskProcessingQuantumNsDEFAULT_MAX_TASK_PROCESSING_QUANTUM_NS;// io 处理层EventLoop 怎么去等待 I/O 事件以及事件来了之后发给谁。this.ioHandlerioHandlerFactory.newHandler(this);}看一下SingleThreadIoEventLoop的构造器/** * parent 线程组归属 * executor 线程的启动方式 * addTaskWakesUp 入队唤醒策略 * supportSuspension 状态机是否含暂停状态 * maxPendingTasks 队列容量上限背压 * rejectedHandler 队列满时的兜底策略 */protectedSingleThreadEventExecutor(EventExecutorGroupparent,Executorexecutor,booleanaddTaskWakesUp,booleansupportSuspension,intmaxPendingTasks,RejectedExecutionHandlerrejectedHandler){super(parent);this.addTaskWakesUpaddTaskWakesUp;this.supportSuspensionsupportSuspension;this.maxPendingTasksMath.max(16,maxPendingTasks);this.executorThreadExecutorMap.apply(executor,this);taskQueuenewTaskQueue(this.maxPendingTasks);rejectedExecutionHandlerObjectUtil.checkNotNull(rejectedHandler,rejectedHandler);lastActivityTimeNanosticker().nanoTime();}初始化ServerBootstrapServerBootstrapbnewServerBootstrap();// 简化写法同一个 EventLoopGroup 同时承担 boss 和 worker 的职责b.group(group).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG,100).handler(newLoggingHandler(LogLevel.INFO)).childHandler(newChannelInitializerSocketChannel(){OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepch.pipeline();if(sslCtx!null){p.addLast(sslCtx.newHandler(ch.alloc()));}//p.addLast(new LoggingHandler(LogLevel.INFO));p.addLast(serverHandler);}});// group 初始化处理线程组。b.group(group) 会把同一个 group 同时赋给 boss 和 worker。// 这是简化用法一个线程池既接受连接也处理子连接上的 I/O。// 生产环境中更常见的是显式拆分 bossGroup 和 workerGroup。// EventLoopGroup bossGroup new NioEventLoopGroup(1); // ① boss 线程池只接受连接通常 1~2 个线程// EventLoopGroup workerGroup new NioEventLoopGroup(); // ② worker 线程池处理每个连接上的 I/O默认 CPU 核数*2// ServerBootstrap b new ServerBootstrap();// 显式指定两个线程池第一个是 boss第二个是 worker// b.group(bossGroup, workerGroup) // ← parentGroupbossGroup, childGroupworkerGroup// channel 初始化创建channel的类型使用工厂设计模式// option 作用于 ServerChannelNioServerSocketChannel也就是监听端口、接受连接的那个 Channel。// 整个服务通常只有 1 个 ServerChannel。// handler 作用于 ServerChannel 的 Pipeline只创建 1 次贯穿整个服务器生命周期。// childHandler 作用于每个接入的客户端 SocketChannel。// 每个新连接都有自己的 SocketChannel 和 Pipeline都会执行一次 childHandler 初始化。启动服务// 同步启动ChannelFuturefb.bind(PORT).sync();// bind 绑定本地端口号启动相关任务主要逻辑我们需要看AbstractBootstrap.doBindprivateChannelFuturedoBind(finalSocketAddresslocalAddress){// 初始化服务端 ServerChannel并将它注册到 parent/boss EventLoopGroupfinalChannelFutureregFutureinitAndRegister();...if(regFuture.isDone()){// 此时我们知道注册已经完成并且成功了。ChannelPromisepromisechannel.newPromise();doBind0(regFuture,channel,localAddress,promise);returnpromise;}else{...// 异常处理returnpromise;}}我们看一下initAndRegister方法finalChannelFutureinitAndRegister(){Channelchannelnull;try{// 1.创建 Channel 对象此时还没注册到 EventLoopchannelchannelFactory.newChannel();// 2.调用 init() 方法init(channel);}catch(Throwablet){...// 异常处理}// 3.注册到 EventLoopGroup —— 对 ServerBootstrap 来说这里注册的是 ServerChannel 到 parent/boss groupfinalChannelFutureregFutureconfig().group().register(channel);...returnregFuture;}ServerBootstrap.init方法voidinit(Channelchannel)throwsThrowable{// 配置 ChannelOption影响底层 Channel 行为例如 SO_BACKLOG、SO_REUSEADDR 等setChannelOptions(channel,newOptionsArray(),logger);// 存储用户自定义属性可以通过 AttributeKey 在 Channel 上保存上下文数据setAttributes(channel,newAttributesArray());// 得到pipelinepipeline的初始化是在创建 Channel 时完成ChannelPipelinepchannel.pipeline();// 保存子通道配置的引用避免后续被修改finalEventLoopGroupcurrentChildGroupchildGroup;finalChannelHandlercurrentChildHandlerchildHandler;finalEntryChannelOption?,Object[]currentChildOptionsnewOptionsArray(childOptions);finalEntryAttributeKey?,Object[]currentChildAttrsnewAttributesArray(childAttrs);// 加载拓展插件finalCollectionChannelInitializerExtensionextensionsgetInitializerExtensions();// 给 ServerChannel 添加初始化器。这里初始化的是 ServerChannel 的 Pipeline。p.addLast(newChannelInitializerChannel(){OverridepublicvoidinitChannel(finalChannelch){finalChannelPipelinepipelinech.pipeline();// 添加用户配置的处理器如果存在ChannelHandlerhandlerconfig.handler();if(handler!null){pipeline.addLast(handler);}// 这里的 ch 是 ServerChannel所以任务提交到 boss/parent EventLoop 执行。// 这样可以保证 ServerBootstrapAcceptor 在 handlerAdded 之后再加入 Pipeline。ch.eventLoop().execute(newRunnable(){Overridepublicvoidrun(){// ServerBootstrapAcceptor 是 ServerChannel Pipeline 中的入站处理器。// 它在接收到 accepted SocketChannel 后// 1. 给子 Channel 添加 childHandler// 2. 设置 childOptions 和 childAttrs// 3. 执行扩展点// 4. 将子 Channel 注册到 childGroup也就是 worker EventLoopGroup。pipeline.addLast(newServerBootstrapAcceptor(ch,currentChildGroup,currentChildHandler,currentChildOptions,currentChildAttrs,extensions));}});}});...}关于AbstractChannel的注册流程publicfinalvoidregister(EventLoopeventLoop,finalChannelPromisepromise){ObjectUtil.checkNotNull(eventLoop,eventLoop);// 检查...// 设置关联的 EventLoopAbstractChannel.this.eventLoopeventLoop;// 将通道里面的执行器设置为空AbstractChannelHandlerContextcontextpipeline.tail;do{context.contextExecutornull;contextcontext.prev;}while(context!null);// 如果当前线程就是 EventLoop 线程直接注册否则提交任务到 EventLoop 执行...register0(promise);...}privatevoidregister0(ChannelPromisepromise){...// 创建内部注册 Promise用于监听底层注册结果ChannelPromiseregisterPromisenewPromise();booleanfirstRegistrationneverRegistered;// 添加监听器处理注册成功后的回调registerPromise.addListener(future-{if(future.isSuccess()){// 更新注册状态neverRegisteredfalse;registeredtrue;// 确保在通知 promise 之前调用 handlerAdded防止用户在监听器中触发事件时 Handler 还未添加pipeline.invokeHandlerAddedIfNeeded();// 通知 register() 的调用者成功了safeSetSuccess(promise);// 触发注册事件Pipeline 传播pipeline.fireChannelRegistered();// 仅在首次注册且 Channel 活跃时触发 channelActiveif(isActive()){if(firstRegistration){// 通知我已经准备好了可以进行通信了pipeline.fireChannelActive();}elseif(config().isAutoRead()){// 重新注册且 autoRead 开启时需要重新开始读取数据beginRead();}}}else{// 注册失败关闭 Channel 避免资源泄漏...}});// 底层 I/O 注册。以 NIO 为例这里会把 Channel 注册到 Selector// 并完成 Channel 与当前 EventLoop 的绑定。doRegister(registerPromise);}