以下是修复后的完整代码。该代码实现了一个基于 Java NIO 的主从 Reactor 多线程模型解决了原代码中 Boss 线程与 Worker 线程之间注册逻辑的时序问题并增加了完整的读写处理和异常管理机制。packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.*;importjava.util.Iterator;importjava.util.Set;importjava.util.concurrent.ConcurrentLinkedQueue;Slf4jpublicclassMultiThreadServerTest{// 工作线程数组模拟 Netty 的 workerGroupprivatestaticfinalWorker[]workersnewWorker[Runtime.getRuntime().availableProcessors()];privatestaticvolatileintindex0;publicstaticvoidmain(String[]args)throwsIOException{Thread.currentThread().setName(boss);// 初始化 Worker 线程池for(inti0;iworkers.length;i){workers[i]newWorker(worker-i);workers[i].start();}ServerSocketChannelsscServerSocketChannel.open();ssc.configureBlocking(false);SelectorbossSelectorSelector.open();// Boss 只关注 ACCEPT 事件ssc.register(bossSelector,SelectionKey.OP_ACCEPT);ssc.bind(newInetSocketAddress(8081));log.info(Server started on port 8081, Boss thread: {},Thread.currentThread().getName());while(true){try{bossSelector.select();SetSelectionKeyselectedKeysbossSelector.selectedKeys();IteratorSelectionKeyiteratorselectedKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();iterator.remove();if(key.isAcceptable()){handleAccept(key,ssc);}}}catch(IOExceptione){log.error(Boss selector error,e);}}}/** * 处理新连接接入 */privatestaticvoidhandleAccept(SelectionKeykey,ServerSocketChannelssc)throwsIOException{SocketChannelscssc.accept();if(sc!null){sc.configureBlocking(false);log.info(New connection accepted from: {},sc.getRemoteAddress());// 轮询选择一个 Worker 线程进行注册WorkerworkernextWorker();worker.register(sc);}}/** * 轮询获取下一个 Worker */privatestaticWorkernextWorker(){returnworkers[(index)%workers.length];}/** * Worker 线程类负责处理 IO 读写 */staticclassWorkerimplementsRunnable{privatefinalStringname;privateThreadthread;privateSelectorselector;// 用于串行化注册操作的队列privatefinalConcurrentLinkedQueueRunnableregisterQueuenewConcurrentLinkedQueue();privatevolatilebooleanstartedfalse;publicWorker(Stringname){this.namename;}publicvoidstart(){if(!started){synchronized(this){if(!started){try{this.selectorSelector.open();this.threadnewThread(this,name);this.thread.start();this.startedtrue;log.info(Worker {} started,name);}catch(IOExceptione){log.error(Failed to start worker {},name,e);}}}}}/** * 将 Channel 注册到当前 Worker 的 Selector 上 * 这是一个异步操作Boss 线程调用此方法后立即返回 */publicvoidregister(SocketChannelchannel){// 将注册任务放入队列registerQueue.add(()-{try{// 注册读事件并附加一个 ByteBuffer 作为 attachment 用于存储读取的数据// 注意实际生产中应使用对象池管理 ByteBufferchannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));log.debug(Channel registered to worker: {},channel.getRemoteAddress());}catch(ClosedChannelExceptione){log.warn(Channel closed before registration,e);}catch(Exceptione){log.error(Registration failed,e);}});// 唤醒 Selector使其从 select() 阻塞中返回以便尽快处理队列中的注册任务selector.wakeup();}Overridepublicvoidrun(){while(true){try{// 1. 处理注册队列中的任务// 在 select 之前处理确保新注册的 Channel 能参与下一轮的事件监听// 虽然本次 select 不会立即返回新 Channel 的事件但保证了注册动作的完成Runnabletask;while((taskregisterQueue.poll())!null){task.run();}// 2. 阻塞等待 IO 事件// 如果队列为空且无事件select() 会阻塞// 如果有 wakeup() 调用通常是因为有新注册任务select() 会立即返回intselectedCountselector.select();// 如果 select 返回 0可能是被 wakeup 唤醒但无实际 IO 事件继续循环检查队列if(selectedCount0){continue;}// 3. 处理 IO 事件SetSelectionKeyselectionKeysselector.selectedKeys();IteratorSelectionKeyiteratorselectionKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();// 必须移除防止重复处理iterator.remove();if(!key.isValid()){continue;}if(key.isReadable()){handleRead(key);}}}catch(IOExceptione){log.error(Worker {} selector error,name,e);// 生产环境中可能需要更复杂的错误恢复机制break;}}}/** * 处理读事件 */privatevoidhandleRead(SelectionKeykey)throwsIOException{SocketChannelchannel(SocketChannel)key.channel();// 获取 attachment 中的缓冲区ByteBufferbuffer(ByteBuffer)key.attachment();try{intbytesReadchannel.read(buffer);if(bytesRead0){buffer.flip();byte[]datanewbyte[buffer.remaining()];buffer.get(data);StringmessagenewString(data);log.info(Worker {} received from {}: {},name,channel.getRemoteAddress(),message.trim());// 模拟业务处理...// 清空缓冲区准备下一次读取buffer.clear();// 如果需要回复可以切换为 OP_WRITE 或直接写回// 这里简单演示直接写回StringresponseEcho: message.trim()\n;ByteBufferwriteBufferByteBuffer.wrap(response.getBytes());channel.write(writeBuffer);}elseif(bytesRead-1){// 对端关闭连接log.info(Client disconnected: {},channel.getRemoteAddress());key.cancel();channel.close();}}catch(IOExceptione){log.error(IO error on channel: {},channel.getRemoteAddress(),e);key.cancel();channel.close();}}}}代码实现功能与特点说明主从 Reactor 架构Boss 线程专门负责监听 OP_ACCEPT 事件处理新连接的接入。一旦连接建立立即将 SocketChannel 交给 Worker 线程处理自身不处理任何读写业务保证高并发下的接入效率。Worker 线程池根据 CPU 核心数创建多个 Worker 线程。每个 Worker 拥有独立的 Selector负责处理分配给它的连接的 OP_READ 和 OP_WRITE 事件。安全的跨线程注册机制使用了 ConcurrentLinkedQueue 来串行化注册操作。Boss 线程调用 worker.register(sc) 时只是将注册逻辑封装为任务放入队列并调用 selector.wakeup()。Worker 线程在 select() 之前先执行队列中的任务。这种设计避免了多线程直接操作同一个 Selector 可能导致的并发问题同时也确保了 wakeup() 能够及时唤醒 Worker 处理新连接。完善的 IO 事件处理读处理使用 attachment 机制为每个 Channel 绑定独立的 ByteBuffer避免了内存分配的频繁开销虽然示例中是每次新建但结构上支持优化。正确处理了粘包/拆包的基本逻辑flip/get/clear。连接关闭处理当 read 返回 -1 或发生 IOException 时正确取消 SelectionKey 并关闭 Channel防止资源泄漏。异常隔离单个连接的异常不会导致整个 Worker 线程崩溃保证了服务的稳定性。负载均衡Boss 线程通过轮询算法Round-Robin将新连接均匀分配给各个 Worker 线程避免单个 Worker 负载过高。日志与可观测性集成了 SLF4J 日志框架记录了服务启动、连接接入、数据接收、连接断开等关键生命周期事件便于调试和监控。
第一章Netty,多Worker,完整代码修正
发布时间:2026/7/3 2:17:04
以下是修复后的完整代码。该代码实现了一个基于 Java NIO 的主从 Reactor 多线程模型解决了原代码中 Boss 线程与 Worker 线程之间注册逻辑的时序问题并增加了完整的读写处理和异常管理机制。packagecom.example.demo;importlombok.extern.slf4j.Slf4j;importjava.io.IOException;importjava.net.InetSocketAddress;importjava.nio.ByteBuffer;importjava.nio.channels.*;importjava.util.Iterator;importjava.util.Set;importjava.util.concurrent.ConcurrentLinkedQueue;Slf4jpublicclassMultiThreadServerTest{// 工作线程数组模拟 Netty 的 workerGroupprivatestaticfinalWorker[]workersnewWorker[Runtime.getRuntime().availableProcessors()];privatestaticvolatileintindex0;publicstaticvoidmain(String[]args)throwsIOException{Thread.currentThread().setName(boss);// 初始化 Worker 线程池for(inti0;iworkers.length;i){workers[i]newWorker(worker-i);workers[i].start();}ServerSocketChannelsscServerSocketChannel.open();ssc.configureBlocking(false);SelectorbossSelectorSelector.open();// Boss 只关注 ACCEPT 事件ssc.register(bossSelector,SelectionKey.OP_ACCEPT);ssc.bind(newInetSocketAddress(8081));log.info(Server started on port 8081, Boss thread: {},Thread.currentThread().getName());while(true){try{bossSelector.select();SetSelectionKeyselectedKeysbossSelector.selectedKeys();IteratorSelectionKeyiteratorselectedKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();iterator.remove();if(key.isAcceptable()){handleAccept(key,ssc);}}}catch(IOExceptione){log.error(Boss selector error,e);}}}/** * 处理新连接接入 */privatestaticvoidhandleAccept(SelectionKeykey,ServerSocketChannelssc)throwsIOException{SocketChannelscssc.accept();if(sc!null){sc.configureBlocking(false);log.info(New connection accepted from: {},sc.getRemoteAddress());// 轮询选择一个 Worker 线程进行注册WorkerworkernextWorker();worker.register(sc);}}/** * 轮询获取下一个 Worker */privatestaticWorkernextWorker(){returnworkers[(index)%workers.length];}/** * Worker 线程类负责处理 IO 读写 */staticclassWorkerimplementsRunnable{privatefinalStringname;privateThreadthread;privateSelectorselector;// 用于串行化注册操作的队列privatefinalConcurrentLinkedQueueRunnableregisterQueuenewConcurrentLinkedQueue();privatevolatilebooleanstartedfalse;publicWorker(Stringname){this.namename;}publicvoidstart(){if(!started){synchronized(this){if(!started){try{this.selectorSelector.open();this.threadnewThread(this,name);this.thread.start();this.startedtrue;log.info(Worker {} started,name);}catch(IOExceptione){log.error(Failed to start worker {},name,e);}}}}}/** * 将 Channel 注册到当前 Worker 的 Selector 上 * 这是一个异步操作Boss 线程调用此方法后立即返回 */publicvoidregister(SocketChannelchannel){// 将注册任务放入队列registerQueue.add(()-{try{// 注册读事件并附加一个 ByteBuffer 作为 attachment 用于存储读取的数据// 注意实际生产中应使用对象池管理 ByteBufferchannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024));log.debug(Channel registered to worker: {},channel.getRemoteAddress());}catch(ClosedChannelExceptione){log.warn(Channel closed before registration,e);}catch(Exceptione){log.error(Registration failed,e);}});// 唤醒 Selector使其从 select() 阻塞中返回以便尽快处理队列中的注册任务selector.wakeup();}Overridepublicvoidrun(){while(true){try{// 1. 处理注册队列中的任务// 在 select 之前处理确保新注册的 Channel 能参与下一轮的事件监听// 虽然本次 select 不会立即返回新 Channel 的事件但保证了注册动作的完成Runnabletask;while((taskregisterQueue.poll())!null){task.run();}// 2. 阻塞等待 IO 事件// 如果队列为空且无事件select() 会阻塞// 如果有 wakeup() 调用通常是因为有新注册任务select() 会立即返回intselectedCountselector.select();// 如果 select 返回 0可能是被 wakeup 唤醒但无实际 IO 事件继续循环检查队列if(selectedCount0){continue;}// 3. 处理 IO 事件SetSelectionKeyselectionKeysselector.selectedKeys();IteratorSelectionKeyiteratorselectionKeys.iterator();while(iterator.hasNext()){SelectionKeykeyiterator.next();// 必须移除防止重复处理iterator.remove();if(!key.isValid()){continue;}if(key.isReadable()){handleRead(key);}}}catch(IOExceptione){log.error(Worker {} selector error,name,e);// 生产环境中可能需要更复杂的错误恢复机制break;}}}/** * 处理读事件 */privatevoidhandleRead(SelectionKeykey)throwsIOException{SocketChannelchannel(SocketChannel)key.channel();// 获取 attachment 中的缓冲区ByteBufferbuffer(ByteBuffer)key.attachment();try{intbytesReadchannel.read(buffer);if(bytesRead0){buffer.flip();byte[]datanewbyte[buffer.remaining()];buffer.get(data);StringmessagenewString(data);log.info(Worker {} received from {}: {},name,channel.getRemoteAddress(),message.trim());// 模拟业务处理...// 清空缓冲区准备下一次读取buffer.clear();// 如果需要回复可以切换为 OP_WRITE 或直接写回// 这里简单演示直接写回StringresponseEcho: message.trim()\n;ByteBufferwriteBufferByteBuffer.wrap(response.getBytes());channel.write(writeBuffer);}elseif(bytesRead-1){// 对端关闭连接log.info(Client disconnected: {},channel.getRemoteAddress());key.cancel();channel.close();}}catch(IOExceptione){log.error(IO error on channel: {},channel.getRemoteAddress(),e);key.cancel();channel.close();}}}}代码实现功能与特点说明主从 Reactor 架构Boss 线程专门负责监听 OP_ACCEPT 事件处理新连接的接入。一旦连接建立立即将 SocketChannel 交给 Worker 线程处理自身不处理任何读写业务保证高并发下的接入效率。Worker 线程池根据 CPU 核心数创建多个 Worker 线程。每个 Worker 拥有独立的 Selector负责处理分配给它的连接的 OP_READ 和 OP_WRITE 事件。安全的跨线程注册机制使用了 ConcurrentLinkedQueue 来串行化注册操作。Boss 线程调用 worker.register(sc) 时只是将注册逻辑封装为任务放入队列并调用 selector.wakeup()。Worker 线程在 select() 之前先执行队列中的任务。这种设计避免了多线程直接操作同一个 Selector 可能导致的并发问题同时也确保了 wakeup() 能够及时唤醒 Worker 处理新连接。完善的 IO 事件处理读处理使用 attachment 机制为每个 Channel 绑定独立的 ByteBuffer避免了内存分配的频繁开销虽然示例中是每次新建但结构上支持优化。正确处理了粘包/拆包的基本逻辑flip/get/clear。连接关闭处理当 read 返回 -1 或发生 IOException 时正确取消 SelectionKey 并关闭 Channel防止资源泄漏。异常隔离单个连接的异常不会导致整个 Worker 线程崩溃保证了服务的稳定性。负载均衡Boss 线程通过轮询算法Round-Robin将新连接均匀分配给各个 Worker 线程避免单个 Worker 负载过高。日志与可观测性集成了 SLF4J 日志框架记录了服务启动、连接接入、数据接收、连接断开等关键生命周期事件便于调试和监控。