【Kafka源码解读和使用指南】第14篇:Kafka分区器源码解析——消息去哪个分区,有学问! 上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题下一篇【第15篇】Kafka集群元数据源码解析——生产者如何认识整个集群摘要消息经过序列化变成byte[]之后下一步就是决定发往哪个分区。这一决定看似简单实则影响深远——分对了负载均衡吞吐翻倍分错了热点分区全线崩溃。Kafka的默认分区策略用HashRoundRobin双剑合璧2.4版本推出的Sticky Partitioner更是在延迟和批量之间找到了精细平衡。本文将深入源码剖析分区器的工作原理从DefaultPartitioner到StickyPartitioner再到手把手教你实现一个按业务Key路由的自定义分区器。读完这篇分区不再看运气。一、分区器在KafkaProducer中的位置先回顾分区器在整个发送链路中的位置——它在消息序列化之后、进入RecordAccumulator之前KafkaProducer.send() 调用链 Interceptors.onSend() // ① 拦截器处理 │ ▼ waitOnMetadata() // ② 等待集群元数据就绪 │ ▼ Serializer.serialize() // ③ 序列化Key和Value │ ▼ Partitioner.partition() // ④ 选择目标分区 ← 本文主角 │ ▼ RecordAccumulator.append() // ⑤ 放入缓冲区从调用链可以看出分区器需要依赖两个输入已序列化的Keybyte[]用于计算Hash值集群元数据Cluster对象需要知道Topic有多少个分区二、Partitioner接口——只需要实现partition()方法publicinterfacePartitionerextendsConfigurable,Closeable{/** * 选择目标分区 * param topic Topic名称 * param key 消息KeyJava对象未序列化 * param keyBytes 已序列化的Keybyte数组 * param value 消息Value未序列化 * param valueBytes 已序列化的Value * param cluster 集群元数据快照 * return 分区编号 */intpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster);voidclose();}注意区分两个概念keyObject原始Key对象还没经过序列化keyBytesbyte[]已经序列化好的Key可直接用于Hash计算KafkaProducer调用时如果ProducerRecord指定了partition字段即record.partition() ! null就直接用指定的分区不会调用Partitioner。只有没指定分区时才会走Partitioner.partition()。三、DefaultPartitioner源码解析——经典的双模式策略3.1 核心源码publicclassDefaultPartitionerimplementsPartitioner{// Counter初始化为随机数避免重启后所有消息都去同一个分区privatefinalAtomicIntegercounternewAtomicInteger(newRandom().nextInt());// 并发安全的StickyPartition缓存privatefinalConcurrentMapString,IntegerstickyPartitionCachenewConcurrentHashMap();publicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){// 获取Topic的分区信息ListPartitionInfopartitionscluster.partitionsForTopic(topic);intnumPartitionspartitions.size();if(keyBytesnull){// 情况一消息没有Key —— Sticky分区策略(2.4)returnstickyPartitionCache.computeIfAbsent(topic,t-{// 先找可用分区有Leader的分区ListPartitionInfoavailablePartitionscluster.availablePartitionsForTopic(t);if(availablePartitions.isEmpty()){// 没有可用分区退化为RoundRobinintnextValuecounter.getAndIncrement();returnDefaultPartitioner.toPositive(nextValue)%numPartitions;}else{// 选择一个可用分区并粘住intpartDefaultPartitioner.toPositive(counter.getAndIncrement())%availablePartitions.size();returnavailablePartitions.get(part).partition();}});}else{// 情况二消息有Key —— Hash取模// murmur2是一种高效的、低碰撞率的哈希算法returnDefaultPartitioner.toPositive(Utils.murmur2(keyBytes))%numPartitions;}}// 将负数转为正数取绝对值的等价操作staticinttoPositive(intnumber){returnnumber0x7fffffff;}}3.2 两种策略图解【DefaultPartitioner 分区策略】 消息有Key ──► murmur2(Key) % 分区数 ──► 固定分区 相同Key → 相同分区 → 顺序保证 消息无Key ──► Sticky策略 ──► 同一个批次粘在同一个可用分区 2.4 批次满后切换到新分区 ──► RoundRobin ──► counter % 分区数逐条轮询 2.3及之前 无批量优化可能产生大量小批次3.3 为什么counter要用AtomicIntegerKafkaProducer是线程安全的多个业务线程可能同时调用send()。DefaultPartitioner必须也是线程安全的。这就是为什么用AtomicInteger而不是普通的int——两个线程并发调用counter.getAndIncrement()时不会出现计数错误。3.4 toPositive()方法负数转正数number 0x7fffffff这个位掩码操作是为了把负数转成正数。murmur2()可能返回负数因为返回类型是int包含符号位但分区编号必须是≥0的整数。负数: 1xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx 掩码: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ────────────────────────────────────────── 结果: 0xxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx ← 永远是正数四、Sticky Partitioner——2.4版本的性能优化利器4.1 问题老版本RoundRobin的痛点在Kafka 2.3及之前没有Key的消息使用RoundRobin策略——每条消息随机选一个分区。这会导致什么问题【RoundRobin策略产生大量小批次】 Topic: orders (3个分区) msg1 → P0 msg2 → P1 RecordAccumulator中的状态 msg3 → P2 P0: [msg1] ← 只有1条消息就凑满一个批次 msg4 → P0 P1: [msg2] ← 每条消息单独开Batch msg5 → P1 P2: [msg3] msg6 → P2 结果每个分区的Batch都只有少量消息 → 发送许多小请求 → 网络开销大4.2 Sticky策略的优化Sticky策略的思想是粘在同一个分区上直到当前Batch满了再换下一个分区。【Sticky策略批量优化效果】 msg1 → P0 msg2 → P0 ← 粘住P0 RecordAccumulator中的状态 msg3 → P0 ← 继续粘 P0: [msg1, msg2, msg3, msg4] ← 大Batch msg4 → P0 ← Batch满了 P1: [msg5, msg6, msg7] msg5 → P1 ← 切换到P1 P2: [msg8, msg9] msg6 → P1 msg7 → P1 msg8 → P2 msg9 → P2 结果每个分区攒了更大的Batch → 减少网络请求 → 吞吐量提升StickyPartitionCache的具体实现中就一个ConcurrentHashMapString, IntegerKey是Topic名Value是粘住的Partition编号。当Batch满了被Sender取走之后下次再append新消息时会重新选一个分区。4.3 对比总结对比维度RoundRobin (旧)Sticky (新,2.4)分区选择逐条轮询粘住分区Batch满后切换Batch填充率低每个分区各攒一点高每个分区攒满再走请求数量多小Batch多少大Batch少网络开销大小消息延迟低及时发送略高等待凑Batch适用场景低延迟要求高吞吐要求五、自定义分区器实战——按业务Key路由5.1 场景用户消息优先处理分区假设你有一个topic叫user-events有6个分区。你希望VIP用户的消息发往低编号分区P0-P1普通用户消息发往高编号分区P4-P5中间分区用于系统消息。/** * 自定义分区器VIP用户优先分区 * VIP用户 → P0, P1 * 系统消息 → P2, P3 * 普通用户 → P4, P5 */publicclassVipAwarePartitionerimplementsPartitioner{privatestaticfinalSetStringVIP_USERSnewHashSet(Arrays.asList(vip_001,vip_002,vip_003// VIP用户白名单));privatestaticfinalStringSYSTEM_KEY__SYSTEM__;Overridepublicintpartition(Stringtopic,Objectkey,byte[]keyBytes,Objectvalue,byte[]valueBytes,Clustercluster){ListPartitionInfopartitionscluster.partitionsForTopic(topic);intnumPartitionspartitions.size();// 把Key转成字符串StringkeyStr(keyBytes!null)?newString(keyBytes):;if(SYSTEM_KEY.equals(keyStr)){// 系统消息 → P2, P3// 用简单的随机分配intbase2;intoffsetThreadLocalRandom.current().nextInt(2);returnbaseoffset;}elseif(VIP_USERS.contains(keyStr)){// VIP用户 → P0, P1// 用Hash保证同一VIP用户消息有序inthashMath.abs(Utils.murmur2(keyBytes));returnhash%2;// P0或P1}else{// 普通用户 → P4, P5// 也用Hash同一用户的消息在同一分区inthashMath.abs(Utils.murmur2(keyBytes));return4(hash%2);// P4或P5}}Overridepublicvoidclose(){}Overridepublicvoidconfigure(MapString,?configs){}}5.2 配置使用PropertiespropsnewProperties();props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,com.example.VipAwarePartitioner);// 指定自定义分区器props.put(bootstrap.servers,localhost:9092);// ... 其他配置KafkaProducerString,StringproducernewKafkaProducer(props);// VIP用户消息自动路由到P0或P1producer.send(newProducerRecord(user-events,vip_001,VIP用户登录));// 普通用户消息自动路由到P4或P5producer.send(newProducerRecord(user-events,normal_user_123,普通用户点击));六、分区数与吞吐量的关系——数学不小了【分区数与吞吐量的关系图】 吞吐量(TPS) ▲ │ ┌────────────────────── │ ┌─────┘ ← 达到瓶颈磁盘/网络 │ ┌─────┘ │ ┌────┘ ← 线性增长区间 │ ┌────┘ │ ┌──┘ └─┴────┬────┬────┬────┬────┬────┬────► 分区数 1 3 6 9 12 15 18 分区太少 ──► 无法充分利用集群能力 分区太多 ──► 元数据开销大、文件句柄多、Leader选举慢经验法则分区数 max(总吞吐量需求 / 单分区吞吐量, 消费者实例数)单分区吞吐量一般~10MB/s 写~50MB/s 读分区总数所有Topic建议不超过Broker数量的4000倍七、分区器选型决策场景推荐策略配置需要消息顺序Key HashDefaultPartitioner 带Key的消息高吞吐、不关心顺序StickyDefaultPartitioner(默认)按业务规则路由自定义Partitionerpartitioner.classxxx指定分区发送直接指定分区ProducerRecord中指定partition均匀分布无Key消息RoundRobin需实现自定义Partitioner本篇小结分区器看似简单实则内涵丰富DefaultPartitioner是双模式有Key走murmur2哈希保证同Key顺序无Key走Sticky保证批量效率。Kafka 2.4的Sticky优化是一个典型的用稍高延迟换更高吞吐的trade-off案例自定义分区器的关键是理解输入参数——你拿到的是已序列化的keyBytes和集群元数据足以实现任意复杂的分区逻辑分区数量不是越多越好需要根据吞吐量需求和消费者并发数综合计算尽量让分区在各个Broker上均匀分布避免热点——下一篇我们讲集群元数据看看Producer是怎么知道这些拓扑信息的上一篇【第13篇】Kafka序列化器深度解析——自定义Serializer不再是难题下一篇【第15篇】Kafka集群元数据源码解析——生产者如何认识整个集群