客户端 Client客户端启动入口一般从Bootstrap.connect()开始。和服务端ServerBootstrap不同客户端只有一个EventLoopGroup这个 group 同时负责客户端 Channel 的注册、连接、读写等 I/O 事件服务端才有 parent/boss 和 child/worker 的分工。...// 初始化线程组EventLoopGroupgroupnewMultiThreadIoEventLoopGroup(NioIoHandler.newFactory());try{BootstrapbnewBootstrap();// 设置处理线程组b.group(group)// 设置 Channel 类型.channel(NioSocketChannel.class)// 设置 ChannelOption.option(ChannelOption.TCP_NODELAY,true)// 初始化客户端 Channel 的 Pipeline.handler(newChannelInitializerSocketChannel(){OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepch.pipeline();...p.addLast(newEchoClientHandler());}});// 连接服务端ChannelFuturefb.connect(HOST,PORT).sync();// 等待直到关闭f.channel().closeFuture().sync();}finally{// 优雅关闭...}整体流程客户端连接主线Bootstrap.connect - doResolveAndConnect - initAndRegister - 创建 Channel - Bootstrap.init(channel) - EventLoopGroup.register(channel) - doResolveAndConnect0 - 地址解析 - doConnect - channel.connect - pipeline.connect - HeadContext.connect - Unsafe.connect - doConnect - 立即成功或等待 OP_CONNECTBootstrap.connect()Bootstrap.connect()最终会进入doResolveAndConnect()。privateChannelFuturedoResolveAndConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress){// 创建 Channel、初始化 Pipeline并注册到 EventLoopfinalChannelFutureregFutureinitAndRegister();finalChannelchannelregFuture.channel();// 这里判断的是注册 Future 是否已经完成if(regFuture.isDone()){if(!regFuture.isSuccess()){returnregFuture;}returndoResolveAndConnect0(channel,remoteAddress,localAddress,channel.newPromise());}else{// 注册尚未完成时添加监听器等注册成功后再继续连接...}}这里要注意initAndRegister()做了初始化和注册但regFuture表示的是注册结果。只有 Channel 注册到 EventLoop 后后续的连接动作才会提交到对应的 EventLoop 中执行。initAndRegister()initAndRegister()的职责可以拆成三步创建客户端 Channel例如NioSocketChannel。调用Bootstrap.init(channel)初始化客户端 Channel。调用config().group().register(channel)把 Channel 注册到 EventLoop。客户端的Bootstrap.init(channel)和服务端的ServerBootstrap.init(channel)不完全一样。客户端没有childGroup、childHandler、ServerBootstrapAcceptor这些逻辑主要做下面几件事voidinit(Channelchannel){// 设置 ChannelOptionsetChannelOptions(channel,newOptionsArray(),logger);// 设置 AttributesetAttributes(channel,newAttributesArray());ChannelPipelinepchannel.pipeline();// 加入用户配置的 handler通常就是 ChannelInitializerp.addLast(config.handler());// 执行客户端初始化扩展点...}以 NIO 为例注册阶段最终会把 Channel 绑定到某个 EventLoop并注册到底层 Selector。doResolveAndConnect0()注册完成后连接流程进入doResolveAndConnect0()。privateChannelFuturedoResolveAndConnect0(finalChannelchannel,SocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){try{// disableResolver 为 true 表示禁用地址解析不会再走 AddressResolverif(disableResolver){doConnect(remoteAddress,localAddress,promise);returnpromise;}// 如果地址已经解析或者 resolver 不支持该地址类型就直接连接// 否则先异步解析地址解析成功后再 doConnect。...}catch(Throwablecause){promise.tryFailure(cause);}returnpromise;}disableResolver true表示禁用解析不是启用解析。禁用后最好传入已经解析好的地址否则底层连接可能失败。doConnect()privatestaticvoiddoConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromiseconnectPromise){finalChannelchannelconnectPromise.channel();channel.eventLoop().execute(()-{if(localAddressnull){channel.connect(remoteAddress,connectPromise);}else{channel.connect(remoteAddress,localAddress,connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);});}这里没有直接在当前线程执行channel.connect()而是提交到 Channel 绑定的 EventLoop 中执行。这样可以保证连接动作和后续 I/O 操作都在同一个 EventLoop 线程内完成同时也给channelRegistered()这类事件先触发的机会。Pipeline 中的 connectchannel.connect()会进入ChannelPipeline.connect - AbstractChannelHandlerContext.connect - HeadContext.connect - Unsafe.connect补充一下 Pipeline 事件传播方向出站事件例如connect、bind、write从Tail - Head方向传播也就是沿prev指针移动。入站事件例如channelRead、channelActive从Head - Tail方向传播也就是沿next指针移动。publicChannelFutureconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,ChannelPromisepromise){// 检查参数和 Promise 状态...// 从当前节点向前查找下一个能处理 connect 的出站处理器finalAbstractChannelHandlerContextnextfindContextOutbound(MASK_CONNECT);EventExecutorexecutornext.executor();if(executor.inEventLoop()){if(next.invokeHandler()){promiseensurePromiseUseCorrectExecutor(promise);try{finalChannelHandlerhandlernext.handler();finalDefaultChannelPipeline.HeadContextheadContextpipeline.head;if(handlerheadContext){headContext.connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelDuplexHandler){((ChannelDuplexHandler)handler).connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelOutboundHandlerAdapter){((ChannelOutboundHandlerAdapter)handler).connect(next,remoteAddress,localAddress,promise);}else{((ChannelOutboundHandler)handler).connect(next,remoteAddress,localAddress,promise);}}catch(Throwablet){notifyOutboundHandlerException(t,promise);}}else{next.connect(remoteAddress,localAddress,promise);}}else{// 如果当前线程不是该 executor 的线程则提交任务到对应 executor...}returnpromise;}AbstractNioUnsafe.connect()真正执行 NIO 连接的是AbstractNioChannel内部的AbstractNioUnsafe.connect()不是外层AbstractNioChannel.connect()。publicfinalvoidconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){// 检查 Promise 是否可设置、Channel 是否 open...try{// 防止重复连接...booleanwasActiveisActive();// 调用具体 Channel 的底层连接逻辑例如 NioSocketChannel.doConnect()if(doConnect(remoteAddress,localAddress)){// 连接立即成功fulfillConnectPromise(promise,wasActive);}else{// 连接尚未完成等待 OP_CONNECT 事件connectPromisepromise;requestedRemoteAddressremoteAddress;finalintconnectTimeoutMillisconfig().getConnectTimeoutMillis();if(connectTimeoutMillis0){connectTimeoutFutureeventLoop().schedule(newRunnable(){Overridepublicvoidrun(){// 超时后如果连接还没完成就设置失败并关闭 Channel...}},connectTimeoutMillis,TimeUnit.MILLISECONDS);}promise.addListener(newChannelFutureListener(){OverridepublicvoidoperationComplete(ChannelFuturefuture){// 如果 connect future 被取消则取消超时定时器并关闭底层 socket...}});}}catch(Throwablet){// 异常处理...}}这里要注意connectTimeoutFuture不是“延时后判断成功并继续往下走”。它只负责超时失败处理。异步连接真正成功的路径是Selector 监听到 OP_CONNECT - NioHandler 处理 connect-ready 事件 - unsafe.finishConnect() - doFinishConnect() - fulfillConnectPromise() - 如果 Channel 从 inactive 变为 active则触发 pipeline.fireChannelActive()所以客户端连接有两种结果路径doConnect()立即返回true说明连接同步完成直接fulfillConnectPromise()。doConnect()返回false说明连接异步进行中注册OP_CONNECT等 Selector 通知连接完成后再finishConnect()。
2.启动客户端client
发布时间:2026/6/23 10:57:04
客户端 Client客户端启动入口一般从Bootstrap.connect()开始。和服务端ServerBootstrap不同客户端只有一个EventLoopGroup这个 group 同时负责客户端 Channel 的注册、连接、读写等 I/O 事件服务端才有 parent/boss 和 child/worker 的分工。...// 初始化线程组EventLoopGroupgroupnewMultiThreadIoEventLoopGroup(NioIoHandler.newFactory());try{BootstrapbnewBootstrap();// 设置处理线程组b.group(group)// 设置 Channel 类型.channel(NioSocketChannel.class)// 设置 ChannelOption.option(ChannelOption.TCP_NODELAY,true)// 初始化客户端 Channel 的 Pipeline.handler(newChannelInitializerSocketChannel(){OverridepublicvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepch.pipeline();...p.addLast(newEchoClientHandler());}});// 连接服务端ChannelFuturefb.connect(HOST,PORT).sync();// 等待直到关闭f.channel().closeFuture().sync();}finally{// 优雅关闭...}整体流程客户端连接主线Bootstrap.connect - doResolveAndConnect - initAndRegister - 创建 Channel - Bootstrap.init(channel) - EventLoopGroup.register(channel) - doResolveAndConnect0 - 地址解析 - doConnect - channel.connect - pipeline.connect - HeadContext.connect - Unsafe.connect - doConnect - 立即成功或等待 OP_CONNECTBootstrap.connect()Bootstrap.connect()最终会进入doResolveAndConnect()。privateChannelFuturedoResolveAndConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress){// 创建 Channel、初始化 Pipeline并注册到 EventLoopfinalChannelFutureregFutureinitAndRegister();finalChannelchannelregFuture.channel();// 这里判断的是注册 Future 是否已经完成if(regFuture.isDone()){if(!regFuture.isSuccess()){returnregFuture;}returndoResolveAndConnect0(channel,remoteAddress,localAddress,channel.newPromise());}else{// 注册尚未完成时添加监听器等注册成功后再继续连接...}}这里要注意initAndRegister()做了初始化和注册但regFuture表示的是注册结果。只有 Channel 注册到 EventLoop 后后续的连接动作才会提交到对应的 EventLoop 中执行。initAndRegister()initAndRegister()的职责可以拆成三步创建客户端 Channel例如NioSocketChannel。调用Bootstrap.init(channel)初始化客户端 Channel。调用config().group().register(channel)把 Channel 注册到 EventLoop。客户端的Bootstrap.init(channel)和服务端的ServerBootstrap.init(channel)不完全一样。客户端没有childGroup、childHandler、ServerBootstrapAcceptor这些逻辑主要做下面几件事voidinit(Channelchannel){// 设置 ChannelOptionsetChannelOptions(channel,newOptionsArray(),logger);// 设置 AttributesetAttributes(channel,newAttributesArray());ChannelPipelinepchannel.pipeline();// 加入用户配置的 handler通常就是 ChannelInitializerp.addLast(config.handler());// 执行客户端初始化扩展点...}以 NIO 为例注册阶段最终会把 Channel 绑定到某个 EventLoop并注册到底层 Selector。doResolveAndConnect0()注册完成后连接流程进入doResolveAndConnect0()。privateChannelFuturedoResolveAndConnect0(finalChannelchannel,SocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){try{// disableResolver 为 true 表示禁用地址解析不会再走 AddressResolverif(disableResolver){doConnect(remoteAddress,localAddress,promise);returnpromise;}// 如果地址已经解析或者 resolver 不支持该地址类型就直接连接// 否则先异步解析地址解析成功后再 doConnect。...}catch(Throwablecause){promise.tryFailure(cause);}returnpromise;}disableResolver true表示禁用解析不是启用解析。禁用后最好传入已经解析好的地址否则底层连接可能失败。doConnect()privatestaticvoiddoConnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromiseconnectPromise){finalChannelchannelconnectPromise.channel();channel.eventLoop().execute(()-{if(localAddressnull){channel.connect(remoteAddress,connectPromise);}else{channel.connect(remoteAddress,localAddress,connectPromise);}connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);});}这里没有直接在当前线程执行channel.connect()而是提交到 Channel 绑定的 EventLoop 中执行。这样可以保证连接动作和后续 I/O 操作都在同一个 EventLoop 线程内完成同时也给channelRegistered()这类事件先触发的机会。Pipeline 中的 connectchannel.connect()会进入ChannelPipeline.connect - AbstractChannelHandlerContext.connect - HeadContext.connect - Unsafe.connect补充一下 Pipeline 事件传播方向出站事件例如connect、bind、write从Tail - Head方向传播也就是沿prev指针移动。入站事件例如channelRead、channelActive从Head - Tail方向传播也就是沿next指针移动。publicChannelFutureconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,ChannelPromisepromise){// 检查参数和 Promise 状态...// 从当前节点向前查找下一个能处理 connect 的出站处理器finalAbstractChannelHandlerContextnextfindContextOutbound(MASK_CONNECT);EventExecutorexecutornext.executor();if(executor.inEventLoop()){if(next.invokeHandler()){promiseensurePromiseUseCorrectExecutor(promise);try{finalChannelHandlerhandlernext.handler();finalDefaultChannelPipeline.HeadContextheadContextpipeline.head;if(handlerheadContext){headContext.connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelDuplexHandler){((ChannelDuplexHandler)handler).connect(next,remoteAddress,localAddress,promise);}elseif(handlerinstanceofChannelOutboundHandlerAdapter){((ChannelOutboundHandlerAdapter)handler).connect(next,remoteAddress,localAddress,promise);}else{((ChannelOutboundHandler)handler).connect(next,remoteAddress,localAddress,promise);}}catch(Throwablet){notifyOutboundHandlerException(t,promise);}}else{next.connect(remoteAddress,localAddress,promise);}}else{// 如果当前线程不是该 executor 的线程则提交任务到对应 executor...}returnpromise;}AbstractNioUnsafe.connect()真正执行 NIO 连接的是AbstractNioChannel内部的AbstractNioUnsafe.connect()不是外层AbstractNioChannel.connect()。publicfinalvoidconnect(finalSocketAddressremoteAddress,finalSocketAddresslocalAddress,finalChannelPromisepromise){// 检查 Promise 是否可设置、Channel 是否 open...try{// 防止重复连接...booleanwasActiveisActive();// 调用具体 Channel 的底层连接逻辑例如 NioSocketChannel.doConnect()if(doConnect(remoteAddress,localAddress)){// 连接立即成功fulfillConnectPromise(promise,wasActive);}else{// 连接尚未完成等待 OP_CONNECT 事件connectPromisepromise;requestedRemoteAddressremoteAddress;finalintconnectTimeoutMillisconfig().getConnectTimeoutMillis();if(connectTimeoutMillis0){connectTimeoutFutureeventLoop().schedule(newRunnable(){Overridepublicvoidrun(){// 超时后如果连接还没完成就设置失败并关闭 Channel...}},connectTimeoutMillis,TimeUnit.MILLISECONDS);}promise.addListener(newChannelFutureListener(){OverridepublicvoidoperationComplete(ChannelFuturefuture){// 如果 connect future 被取消则取消超时定时器并关闭底层 socket...}});}}catch(Throwablet){// 异常处理...}}这里要注意connectTimeoutFuture不是“延时后判断成功并继续往下走”。它只负责超时失败处理。异步连接真正成功的路径是Selector 监听到 OP_CONNECT - NioHandler 处理 connect-ready 事件 - unsafe.finishConnect() - doFinishConnect() - fulfillConnectPromise() - 如果 Channel 从 inactive 变为 active则触发 pipeline.fireChannelActive()所以客户端连接有两种结果路径doConnect()立即返回true说明连接同步完成直接fulfillConnectPromise()。doConnect()返回false说明连接异步进行中注册OP_CONNECT等 Selector 通知连接完成后再finishConnect()。