Kafka消费者组深度解析 Kafka消费者组深度解析引言Kafka消费者组是实现消息并行消费和负载均衡的核心机制。在分布式系统中合理使用消费者组能够显著提高消息处理吞吐量实现水平扩展同时保证消息的可靠消费。本文将深入探讨消费者组的工作原理、配置方法、最佳实践以及常见问题的解决方案。消费者组基础概念1.1 什么是消费者组消费者组Consumer Group是由多个消费者实例组成的逻辑消费者它们共同消费一个或多个主题的消息。消费者组具有以下特点组内负载均衡同一个分区内的消息只会被组内的一个消费者消费组间独立消费不同消费者组独立消费互不影响动态扩展可以在运行时添加或移除消费者实例自动重平衡消费者变更时自动重新分配分区import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import java.time.Duration; import java.util.*; public class ConsumerGroupBasics { public static void main(String[] args) { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); // 订阅单个主题 consumer.subscribe(Collections.singletonList(my-topic)); // 或者订阅多个主题 // consumer.subscribe(Arrays.asList(topic1, topic2, topic3)); // 或者使用正则表达式订阅 // consumer.subscribe(Pattern.compile(topic-.*)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(100)); for (ConsumerRecordString, String record : records) { System.out.printf(topic%s, partition%d, offset%d, key%s, value%s%n, record.topic(), record.partition(), record.offset(), record.key(), record.value()); } } } finally { consumer.close(); } } }1.2 消费者组工作原理┌──────────────────────────────────────────────────────────┐ │ Topic: orders │ │ ┌────────────┐ ┌────────────┐ ┌────────────┐ │ │ │ Partition 0 │ │ Partition 1 │ │ Partition 2 │ │ │ │ ─── │ │ ─── │ │ ─── │ │ │ │ 0,1,2,3... │ │ 0,1,2,3... │ │ 0,1,2,3... │ │ │ └─────┬──────┘ └─────┬──────┘ └─────┬──────┘ │ └────────┼────────────────┼────────────────┼────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────────────────────────────────────────┐ │ Consumer Group: order-processors │ │ ┌─────────────────┐ ┌─────────────────┐ │ │ │ Consumer-1 │ │ Consumer-2 │ │ │ │ (处理P0) │ │ (处理P1,P2) │ │ │ │ offset1000 │ │ offset500,600 │ │ │ └─────────────────┘ └─────────────────┘ │ └─────────────────────────────────────────────────────┘1.3 分区分配策略Kafka支持多种分区分配策略每种策略适用于不同的场景public class PartitionAssignmentStrategies { public static void demonstrateStrategies() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, demo-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 配置分区分配策略 // 1. Range策略将每个主题的分区按范围分配给消费者 // 缺点可能导致分配不均 // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, // org.apache.kafka.clients.consumer.RangeAssignor); // 2. RoundRobin策略轮询分配所有主题的分区 // 优点分配更均匀 // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, // org.apache.kafka.clients.consumer.RoundRobinAssignor); // 3. StickyAssignor策略尽量保持原有的分配不变 // 优点减少不必要的分区重新分配 // props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, // org.apache.kafka.clients.consumer.StickyAssignor); // 4. CooperativeStickyAssignor策略协作式粘性分配 // 优点支持增量重平衡不中断消费 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.CooperativeStickyAssignor); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(demo-topic)); } }偏移量管理2.1 自动提交Kafka默认使用自动提交偏移量简化了消费者的开发public class AutoCommitConsumer { public static Properties createAutoCommitConfig() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, auto-commit-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 启用自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); // 自动提交间隔 props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 5000); return props; } public static void consumeWithAutoCommit() { KafkaConsumerString, String consumer new KafkaConsumer(createAutoCommitConfig()); consumer.subscribe(Collections.singletonList(my-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { processRecord(record); } // 偏移量会在后台自动提交 } } finally { consumer.close(); } } private static void processRecord(ConsumerRecordString, String record) { // 处理消息 } }2.2 手动提交手动提交提供了更精确的偏移量控制适用于需要确保消息处理完成后才提交的场景public class ManualCommitConsumer { public static Properties createManualCommitConfig() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, manual-commit-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 禁用自动提交 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); return props; } public static void consumeWithSyncCommit() { KafkaConsumerString, String consumer new KafkaConsumer(createManualCommitConfig()); consumer.subscribe(Collections.singletonList(my-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { processRecord(record); } // 同步提交阻塞直到提交成功 consumer.commitSync(); } } catch (CommitFailedException e) { System.err.println(提交失败: e.getMessage()); } finally { consumer.close(); } } public static void consumeWithAsyncCommit() { KafkaConsumerString, String consumer new KafkaConsumer(createManualCommitConfig()); consumer.subscribe(Collections.singletonList(my-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { processRecord(record); } // 异步提交不阻塞 consumer.commitAsync((offsets, exception) - { if (exception ! null) { System.err.println(异步提交失败: exception.getMessage()); } else { System.out.println(偏移量提交成功: offsets); } }); } } finally { consumer.close(); } } public static void consumeWithBatchCommit() { KafkaConsumerString, String consumer new KafkaConsumer(createManualCommitConfig()); consumer.subscribe(Collections.singletonList(my-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); if (records.count() 0) { for (ConsumerRecordString, String record : records) { processRecord(record); } // 处理完一批后提交 consumer.commitSync(); System.out.println(批次提交成功: records.count() 条消息); } } } finally { consumer.close(); } } private static void processRecord(ConsumerRecordString, String record) { // 处理单条消息 } }2.3 指定偏移量消费可以手动控制消费位置实现从特定位置开始消费public class SeekConsumer { public static void seekToBeginning() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, seek-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(my-topic)); // 等待分区分配完成 consumer.poll(Duration.ofMillis(100)); // 获取分配的分区 SetTopicPartition assignment consumer.assignment(); // 定位到分区开始位置 consumer.seekToBeginning(assignment); // 或者定位到特定偏移量 for (TopicPartition partition : assignment) { // 跳过前100条消息 consumer.seek(partition, 100); } while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { System.out.println(record.value()); } } } public static void resumeFromCheckpoint() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, checkpoint-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(my-topic)); consumer.poll(Duration.ofMillis(100)); MapTopicPartition, Long checkpoints loadCheckpoints(); SetTopicPartition assignment consumer.assignment(); for (TopicPartition partition : assignment) { Long offset checkpoints.get(partition); if (offset ! null) { consumer.seek(partition, offset); } } while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { processRecord(record); saveCheckpoint(record.partition(), record.offset() 1); } } } private static MapTopicPartition, Long loadCheckpoints() { // 从数据库或文件加载检查点 MapTopicPartition, Long checkpoints new HashMap(); return checkpoints; } private static void saveCheckpoint(TopicPartition partition, long offset) { // 保存检查点到数据库或文件 } private static void processRecord(ConsumerRecordString, String record) { // 处理消息 } }消费者组管理3.1 查看消费者组状态import org.apache.kafka.clients.admin.*; public class ConsumerGroupManagement { public static void listConsumerGroups() throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { // 列出所有消费者组 ListConsumerGroupsResult groupsResult adminClient.listConsumerGroups(); CollectionConsumerGroupListing groups groupsResult.all().get(); System.out.println( 消费者组列表 ); for (ConsumerGroupListing group : groups) { System.out.println(Group ID: group.groupId() , State: group.state()); } } } public static void describeConsumerGroup(String groupId) throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { // 获取消费者组详情 DescribeConsumerGroupsResult result adminClient.describeConsumerGroups( Collections.singletonList(groupId)); MapString, ConsumerGroupDescription groups result.all().get(); ConsumerGroupDescription groupDesc groups.get(groupId); if (groupDesc ! null) { System.out.println( 消费者组详情 ); System.out.println(Group ID: groupDesc.groupId()); System.out.println(State: groupDesc.state()); System.out.println(Coordinator: groupDesc.coordinator()); System.out.println(Members:); for (MemberDescription member : groupDesc.members()) { System.out.println( - Host: member.host() , Client ID: member.clientId() , Assignment: member.assignment()); } } } } public static void getConsumerGroupOffsets(String groupId) throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { // 获取消费者组偏移量 ListConsumerGroupOffsetsResult offsetsResult adminClient.listConsumerGroupOffsets(groupId); MapTopicPartition, OffsetAndMetadata offsets offsetsResult.partitionsToOffsetAndMetadata().get(); System.out.println( 消费者组偏移量 ); for (Map.EntryTopicPartition, OffsetAndMetadata entry : offsets.entrySet()) { System.out.println(entry.getKey() - offset entry.getValue().offset() , committed entry.getValue().lastCommitTime()); } } } }3.2 重置消费者组偏移量public class ResetConsumerGroupOffsets { public static void resetToEarliest(String groupId) throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { // 创建重置配置 MapTopicPartition, Long timestampsToSearch new HashMap(); timestampsToSearch.put(new TopicPartition(my-topic, 0), OffsetSpec.EARLIEST.timestamp()); // 获取目标偏移量 ListOffsetsResult offsetResult adminClient.listOffsets( timestampsToSearch); MapTopicPartition, OffsetAndMetadata resetOffsets new HashMap(); for (Map.EntryTopicPartition, ListOffsetsResult.Info entry : offsetResult.partitionsToOffsetAndTimestamp().get().entrySet()) { resetOffsets.put( entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()) ); } // 执行偏移量重置 AlterConsumerGroupOffsetsResult result adminClient.alterConsumerGroupOffsets(groupId, resetOffsets); result.all().get(); System.out.println(偏移量已重置到最早位置); } } public static void resetToLatest(String groupId) throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { MapTopicPartition, Long timestampsToSearch new HashMap(); timestampsToSearch.put(new TopicPartition(my-topic, 0), OffsetSpec.LATEST.timestamp()); ListOffsetsResult offsetResult adminClient.listOffsets( timestampsToSearch); MapTopicPartition, OffsetAndMetadata resetOffsets new HashMap(); for (Map.EntryTopicPartition, ListOffsetsResult.Info entry : offsetResult.partitionsToOffsetAndTimestamp().get().entrySet()) { resetOffsets.put( entry.getKey(), new OffsetAndMetadata(entry.getValue().offset()) ); } AlterConsumerGroupOffsetsResult result adminClient.alterConsumerGroupOffsets(groupId, resetOffsets); result.all().get(); System.out.println(偏移量已重置到最新位置); } } public static void resetToSpecificOffsets(String groupId, MapTopicPartition, Long specificOffsets) throws Exception { Properties adminProps new Properties(); adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); try (AdminClient adminClient AdminClient.create(adminProps)) { MapTopicPartition, OffsetAndMetadata resetOffsets new HashMap(); for (Map.EntryTopicPartition, Long entry : specificOffsets.entrySet()) { resetOffsets.put( entry.getKey(), new OffsetAndMetadata(entry.getValue()) ); } AlterConsumerGroupOffsetsResult result adminClient.alterConsumerGroupOffsets(groupId, resetOffsets); result.all().get(); System.out.println(偏移量已重置到指定位置); } } }并行消费4.1 多线程消费者import java.util.concurrent.*; import java.util.List; public class MultiThreadedConsumer { private final KafkaConsumerString, String consumer; private final ExecutorService executor; private final int numThreads; public MultiThreadedConsumer(int numThreads) { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, multi-threaded-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); this.consumer new KafkaConsumer(props); this.executor Executors.newFixedThreadPool(numThreads); this.numThreads numThreads; } public void startConsuming() { consumer.subscribe(Collections.singletonList(my-topic)); final CountDownLatch latch new CountDownLatch(numThreads); for (int i 0; i numThreads; i) { executor.submit(() - { try { runConsumer(); } finally { latch.countDown(); } }); } // 添加关闭钩子 Runtime.getRuntime().addShutdownHook(new Thread(() - { consumer.wakeup(); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } })); } private void runConsumer() { try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); if (records.isEmpty()) { continue; } // 提取分区数据 MapTopicPartition, ListConsumerRecordString, String recordsByPartition records.recordsByPartition(); for (Map.EntryTopicPartition, ListConsumerRecordString, String entry : recordsByPartition.entrySet()) { TopicPartition partition entry.getKey(); ListConsumerRecordString, String partitionRecords entry.getValue(); processPartition(partition, partitionRecords); // 提交偏移量 long lastOffset partitionRecords.get( partitionRecords.size() - 1).offset(); consumer.commitSync(Collections.singletonMap( partition, new OffsetAndMetadata(lastOffset 1) )); } } } catch (WakeupException e) { // 忽略关闭信号 } finally { consumer.close(); } } private void processPartition(TopicPartition partition, ListConsumerRecordString, String records) { String threadName Thread.currentThread().getName(); System.out.println(threadName processing records.size() records from partition); for (ConsumerRecordString, String record : records) { // 处理消息 processMessage(record); } } private void processMessage(ConsumerRecordString, String record) { // 消息处理逻辑 } }4.2 消费者池模式public class ConsumerPool { private final ListKafkaConsumerString, String consumers; private final MapTopicPartition, KafkaConsumerString, String partitionOwnership; private final ExecutorService executor; public ConsumerPool(int poolSize) { Properties baseProps new Properties(); baseProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); baseProps.put(ConsumerConfig.GROUP_ID_CONFIG, pool-group); baseProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); baseProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); baseProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); this.consumers new ArrayList(); this.partitionOwnership new ConcurrentHashMap(); this.executor Executors.newFixedThreadPool(poolSize); for (int i 0; i poolSize; i) { Properties props (Properties) baseProps.clone(); props.put(ConsumerConfig.CLIENT_ID_CONFIG, pool-consumer- i); consumers.add(new KafkaConsumer(props)); } } public void start() { for (KafkaConsumerString, String consumer : consumers) { executor.submit(() - runConsumer(consumer)); } } private void runConsumer(KafkaConsumerString, String consumer) { consumer.subscribe(Collections.singletonList(my-topic)); try { while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); if (!records.isEmpty()) { processRecords(records); } } } catch (WakeupException e) { // 忽略 } finally { consumer.close(); } } private void processRecords(ConsumerRecordsString, String records) { // 记录处理逻辑 } }消费拦截器5.1 拦截器实现import org.apache.kafka.clients.consumer.*; public class ConsumerInterceptorExample implements ConsumerInterceptorString, String { private static final Logger logger LoggerFactory.getLogger( ConsumerInterceptorExample.class); Override public ConsumerRecordsString, String onConsume( ConsumerRecordsString, String records) { for (ConsumerRecordString, String record : records) { // 添加处理时间戳 long now System.currentTimeMillis(); long lag now - record.timestamp(); logger.debug(消费消息: topic{}, partition{}, offset{}, lag{}ms, record.topic(), record.partition(), record.offset(), lag); } return records; } Override public void onCommit(MapTopicPartition, OffsetAndMetadata offsets) { logger.info(提交偏移量: {}, offsets); } Override public void close() {} Override public void configure(MapString, ? configs) {} }5.2 拦截器配置public class InterceptorConsumer { public static void main(String[] args) { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, interceptor-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 配置拦截器 props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, com.example.ConsumerInterceptorExample); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(my-topic)); while (true) { ConsumerRecordsString, String records consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecordString, String record : records) { System.out.println(record.value()); } } } }最佳实践6.1 消费者配置建议public class BestPracticeConfig { public static Properties createOptimalConfig() { Properties props new Properties(); // 基础配置 props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka1:9092,kafka2:9092,kafka3:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, optimal-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); // 可靠性配置 props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest); // 性能配置 props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 10485760); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 心跳配置 props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 3000); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000); // 分配策略 props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.CooperativeStickyAssignor); return props; } }6.2 常见问题处理public class ConsumerTroubleshooting { public void handleRebalance() { Properties props new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092); props.put(ConsumerConfig.GROUP_ID_CONFIG, troubleshoot-group); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); KafkaConsumerString, String consumer new KafkaConsumer(props); consumer.subscribe(Collections.singletonList(my-topic), new ConsumerRebalanceListener() { Override public void onPartitionsRevoked( CollectionTopicPartition partitions) { System.out.println(分区被回收: partitions); // 在分区回收前提交偏移量 commitOffsets(partitions); } Override public void onPartitionsAssigned( CollectionTopicPartition partitions) { System.out.println(分区被分配: partitions); // 可以在这里进行初始化操作 } }); } private void commitOffsets(CollectionTopicPartition partitions) { // 提交偏移量逻辑 } }总结Kafka消费者组是实现高效、可靠消息消费的核心机制。通过深入理解消费者组的工作原理、合理配置参数、设计健壮的处理逻辑可以构建出满足生产环境需求的消费者应用。本文详细介绍了消费者组的基础概念、偏移量管理、消费者组管理、并行消费模式、拦截器机制以及最佳实践希望能够帮助读者更好地应用Kafka消费者组。