kafka源码-@KafkaListener消费端的poll调用逻辑 单独展开项目中 KafkaListener 消费端从启动注册到 poll 拉取再到 listen() 被调用的完整源码调用链。版本对齐Spring Boot 2.7.18 → spring-kafka 2.8.11 → kafka-clients 3.1.x。一、与项目的对应关系消费代码// 监听test_topic主题 KafkaListener(topics test_topic) public void listen(String msg) { System.out.println(SpringBoot消费者收到消息 msg); }关键配置application.ymlgroup-id: test-boot-groupenable-auto-commit: trueauto-offset-reset: earliestStringDeserializer二、完整调用链二、完整调用链三、阶段 1启动时注册监听器还没到 poll3.1 启用KafkaListener处理KafkaAutoConfiguration通过内部类EnableKafka激活注解驱动// KafkaAnnotationDrivenConfiguration.EnableKafkaConfiguration EnableKafka // 导入 KafkaListenerAnnotationBeanPostProcessor 等 Bean同时创建默认工厂Bean ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(...) { ConcurrentKafkaListenerContainerFactory factory new ConcurrentKafkaListenerContainerFactory(); configurer.configure(factory, kafkaConsumerFactory); // 注入 ConsumerFactory、pollTimeout 等 return factory; }3.2 扫描KafkaMsgConsumer.listenKafkaListenerAnnotationBeanPostProcessor在 Bean 初始化后扫描KafkaListener// processKafkaListener MethodKafkaListenerEndpoint endpoint new MethodKafkaListenerEndpoint(); endpoint.setMethod(method); // listen(String msg) endpoint.setBean(bean); // KafkaMsgConsumer 实例 endpoint.setTopics([test_topic]); endpoint.setGroupId(...); // 未指定则用默认 KafkaListenerContainerFactory factory resolveContainerFactory(...); // kafkaListenerContainerFactory this.registrar.registerEndpoint(endpoint, factory);3.3 注册并启动容器所有单例 Bean 就绪后// KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated() this.registrar.afterPropertiesSet(); // → registerAllEndpoints() // KafkaListenerEndpointRegistrar.registerAllEndpoints() this.endpointRegistry.registerListenerContainer(descriptor.endpoint, factory, true); // autoStartuptrue → 立刻 start()四、阶段 2创建容器与消费者poll 前的准备4.1 工厂创建ConcurrentMessageListenerContainer// ConcurrentKafkaListenerContainerFactory.createContainerInstance() ContainerProperties properties new ContainerProperties(test_topic); return new ConcurrentMessageListenerContainer(consumerFactory, properties);默认concurrency 1所以只创建 1 个子容器KafkaMessageListenerContainer。4.2 绑定 MessageListener 适配器// MethodKafkaListenerEndpoint.createMessageListener() RecordMessagingMessageListenerAdapter adapter new RecordMessagingMessageListenerAdapter(bean, method, errorHandler); adapter.setHandlerMethod(new HandlerAdapter(invocableHandlerMethod)); // invocableHandlerMethod 对 KafkaMsgConsumer.listen(String) 的反射封装listen(String msg)参数只有 payload监听器类型为ListenerType.SIMPLE。4.3 启动容器提交消费线程// KafkaMessageListenerContainer.doStart() this.listenerConsumer new ListenerConsumer(listener, listenerType); consumerExecutor.submitListenable(this.listenerConsumer); // 异步线程非 HTTP 线程线程名类似org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1。4.4ListenerConsumer构造函数创建KafkaConsumer并订阅// ListenerConsumer 构造 this.autoCommit determineAutoCommit(consumerProperties); // true你的配置 this.consumer consumerFactory.createConsumer( this.consumerGroupId, // test-boot-group clientId, clientIdSuffix, consumerProperties); subscribeOrAssignTopics(this.consumer); // → consumer.subscribe([test_topic], rebalanceListener)底层// DefaultKafkaConsumerFactory.createRawConsumer() return new KafkaConsumer(configProps, keyDeserializerSupplier.get(), // StringDeserializer valueDeserializerSupplier.get()); // StringDeserializer此时消费者加入test-boot-group触发 ConsumerCoordinator 做 rebalance分配test_topic的分区。五、阶段 3消费主循环 —run()→pollAndInvoke()5.1 无限循环// ListenerConsumer.run() while (isRunning()) { try { pollAndInvoke(); // 每一轮 poll 处理 } catch (Exception e) { handleConsumerException(e); } }5.2 单轮 poll 流程// ListenerConsumer.pollAndInvoke() idleBetweenPollIfNecessary(); // 空闲时可能 sleep pauseConsumerIfNecessary(); // 容器 pause 时跳过 this.lastPoll System.currentTimeMillis(); ConsumerRecordsK, V records doPoll(); // 核心 invokeIfHaveRecords(records); // 有记录则调用监听器 // doPoll() → pollConsumer() private ConsumerRecordsK, V pollConsumer() { beforePoll(); try { return this.consumer.poll(this.pollTimeout); // 默认 5000ms } catch (WakeupException ex) { return ConsumerRecords.empty(); // 停止容器时唤醒 } }5.3 有消息时进入监听调用// invokeIfHaveRecords() if (records ! null records.count() 0) { invokeListener(records); // 非 batch → invokeRecordListener } // invokeRecordListener() → doInvokeWithRecords() IteratorConsumerRecordK, V iterator records.iterator(); while (iterator.hasNext()) { ConsumerRecordK, V record checkEarlyIntercept(iterator.next()); doInvokeRecordListener(record, iterator); }六、阶段 4从ConsumerRecord到你的listen(String msg)6.1 单条记录处理// doInvokeRecordListener() invokeOnMessage(record); // invokeOnMessage() doInvokeOnMessage(record); if (!isManualImmediateAck) { ackCurrent(record); // autoCommittrue 时此处基本不手动提交 }6.2 适配器分发ListenerType.SIMPLE// doInvokeOnMessage() switch (this.listenerType) { case SIMPLE: this.listener.onMessage(record); // RecordMessagingMessageListenerAdapter break; ... }6.3RecordMessagingMessageListenerAdapter→ 反射调用// RecordMessagingMessageListenerAdapter.onMessage(record, ack, consumer) Object result invokeHandler(record, acknowledgment, message, consumer); // MessagingMessageListenerAdapter.invokeHandler() return this.handlerMethod.invoke(message, data, acknowledgment, consumer); // data ConsumerRecordInvocableHandlerMethod 从 record.value() 提取 String // HandlerAdapter.invoke() return invocableHandlerMethod.invoke(message, providedArgs); // 最终调用 KafkaMsgConsumer.listen(hello)项目的listen(String msg)收到的msg就是ConsumerRecord.value()经StringDeserializer反序列化后的结果。七、阶段 5KafkaConsumer.poll()内部kafka-clientsSpring 调用的是// KafkaConsumer.poll(Duration timeout) private ConsumerRecordsK, V poll(Timer timer, boolean includeMetadataInTimeout) { do { // ① 协调器心跳、rebalance、更新分区分配 updateAssignmentMetadataIfNeeded(timer, false); // ② 拉取数据 MapTopicPartition, ListConsumerRecordK, V records pollForFetches(timer); if (!records.isEmpty()) { // ③ 预取下一批pipeline if (fetcher.sendFetches() 0) { client.transmitSends(); } // ④ 拦截器 返回 return interceptors.onConsume(new ConsumerRecords(records)); } } while (timer.notExpired()); return ConsumerRecords.empty(); // 超时无数据 }pollForFetches内部// 本地缓冲区有数据 → 直接返回已反序列化 Map... records fetcher.fetchedRecords(); if (!records.isEmpty()) return records; // 否则发 FetchRequest 到 BrokerNetworkClient.poll() 等待响应 fetcher.sendFetches(); client.poll(pollTimeout, ...); return fetcher.fetchedRecords();数据流Broker 磁盘→ FetchResponse字节→ Fetcher 解析→ StringDeserializer.deserialize() // value 变 hello→ ConsumerRecord(test_topic, partition, offset, hello)→ ConsumerRecords→ Spring ListenerConsumer→ KafkaMsgConsumer.listen(hello)八、offset 提交你的enable-auto-commit: true项目的配置下 不由 Spring 手动 commit而是由KafkaConsumer内部协调器按auto.commit.interval.ms默认 5s自动提交。// ackCurrent() 在 autoCommittrue 时 else if (... !this.autoCommit) { this.acks.add(record); // 不满足跳过 } // → 不进入 Spring 手动提交逻辑含义listen()正常返回后offset 不会立刻提交通常在接下来几秒内由消费者客户端后台提交若listen()抛异常默认错误处理下该条消息可能被重复消费取决于提交时机九、精简版调用栈对照源码用[启动]SpringApplication.run()└─ EnableKafka → KafkaListenerAnnotationBeanPostProcessor└─ processKafkaListener(KafkaMsgConsumer.listen)└─ registrar.registerEndpoint(MethodKafkaListenerEndpoint)└─ KafkaListenerEndpointRegistry.registerListenerContainer(..., autoStartuptrue)[容器启动]ConcurrentKafkaListenerContainerFactory.createListenerContainer()└─ ConcurrentMessageListenerContainer.doStart()└─ KafkaMessageListenerContainer.doStart()└─ SimpleAsyncTaskExecutor.submit(ListenerConsumer)[消费线程初始化]ListenerConsumer.init()├─ DefaultKafkaConsumerFactory.createConsumer(test-boot-group, ...)│ └─ new KafkaConsumer(configs, StringDeserializer, StringDeserializer)└─ consumer.subscribe([test_topic], rebalanceListener)[消费循环 - 独立线程]ListenerConsumer.run()└─ while (isRunning) pollAndInvoke()├─ consumer.poll(Duration.ofMillis(5000)) // KafkaConsumer│ ├─ ConsumerCoordinator.poll() // 心跳/rebalance│ ├─ Fetcher.sendFetches() → NetworkClient // 向 Broker 发 Fetch│ └─ Fetcher.fetchedRecords() // 反序列化 → ConsumerRecords└─ invokeListener(records)└─ doInvokeWithRecords()└─ doInvokeRecordListener(record)└─ invokeOnMessage(record)└─ listener.onMessage(record) // RecordMessagingMessageListenerAdapter└─ HandlerAdapter.invoke()└─ InvocableHandlerMethod.invoke()└─ KafkaMsgConsumer.listen(hello) // 你的业务方法十、与生产侧的对比同一进程维度生产侧 (KafkaTemplate.send)消费侧 (KafkaListener)触发线程Tomcat HTTP 线程ListenerConsumer专用线程是否阻塞等待send异步立即返回poll最多阻塞pollTimeout默认 5s核心 APIKafkaProducer.sendKafkaConsumer.poll你的业务入口KafkaTestController.sendKafkaMsgConsumer.listen与 Broker 交互ProducerRequestFetchRequest 心跳Producer 把消息写入 Broker 后消费线程在下一轮或当前轮poll中拉到listen()才会执行——两者完全异步、不同线程。十一、几个容易忽略的细节1. 首次poll可能为空Rebalance 完成前poll可能返回空集合auto-offset-reset: earliest决定从最早还是最新 offset 开始读。2. 同一分区内顺序消费concurrency1时同一 partition 的消息在doInvokeWithRecords的 while 循环中串行调用listen()。3.poll超时不是错误5s 内无新消息 → 返回ConsumerRecords.empty()→ 进入 idle 检测 → 继续下一轮 poll。4. 停止应用时doStop()→listenerConsumer.wakeup()→poll抛WakeupException→ 返回空记录 → 循环退出。5. 与KafkaTemplate.send的衔接点唯一的交汇点是 Kafka Broker 上的test_topicSpring 生产链路和消费链路在代码层没有直接调用关系。源码对照KafkaMessageListenerContainerListenerConsumer.run/pollAndInvokeRecordMessagingMessageListenerAdapterKafkaConsumer.poll