从零实现一个分布式消息中间件:Kafka核心设计 前言你有没有想过每天处理万亿级消息的Kafka它的核心设计到底是什么为什么它能做到高吞吐、低延迟、持久化、有序消费今天我们从零实现一个简化版Kafka· 消息存储日志分段 索引· 生产者消息追加· 消费者偏移量管理· 分区Partition与副本Replica· 消费组Consumer Group---一、Kafka核心原理1. 架构图┌─────────────────────────────────────────────────────────────┐│ Kafka集群 ││ ┌─────────┐ ┌─────────┐ ┌─────────┐ ││ │ Broker1 │ │ Broker2 │ │ Broker3 │ ││ │ Topic-A │ │ Topic-A │ │ Topic-B │ ││ │ Partition│ │ Partition│ │ Partition│ ││ │ 0 │ │ 1 │ │ 0 │ ││ └─────────┘ └─────────┘ └─────────┘ │└─────────────────────────────────────────────────────────────┘│ │ │▼ ▼ ▼┌─────────┐ ┌─────────┐ ┌─────────┐│ 生产者 │ │ 消费者1 │ │ 消费者2 ││ (Producer)│ │ (Group1)│ │ (Group2)│└─────────┘ └─────────┘ └─────────┘2. 核心概念概念 说明Broker Kafka节点Topic 消息主题Partition 分区顺序读写Offset 消息偏移量Consumer Group 消费者组组内竞争Log Segment 日志分段文件---二、完整代码实现1. 基础数据结构c#include stdio.h#include stdlib.h#include string.h#include unistd.h#include pthread.h#include time.h#include errno.h#include sys/stat.h#include fcntl.h#include dirent.h#define MAX_TOPICS 100#define MAX_PARTITIONS 100#define MAX_MSG_SIZE 65536#define MAX_BATCH_SIZE 1000#define SEGMENT_SIZE 1024 * 1024 // 1MB// 消息结构typedef struct kafka_message {long long offset;long long timestamp;int key_len;char *key;int value_len;char *value;} kafka_message_t;// 分区typedef struct partition {int id;char topic[64];int leader; // 分区Leaderlong long log_end_offset; // 最新偏移量long long log_start_offset; // 最早偏移量char *segment_file; // 当前日志文件int segment_fd;pthread_mutex_t mutex;struct partition *next;} partition_t;// 主题typedef struct topic {char name[64];partition_t *partitions;int partition_count;int replication_factor;struct topic *next;} topic_t;// 消费者偏移量typedef struct consumer_offset {char group_id[64];char topic[64];int partition_id;long long offset;struct consumer_offset *next;} consumer_offset_t;// Kafka Brokertypedef struct kafka_broker {int broker_id;char data_dir[256];topic_t *topics;consumer_offset_t *offsets;pthread_mutex_t mutex;int port;int running;} kafka_broker_t;2. 日志存储c// 创建Brokerkafka_broker_t *broker_create(int broker_id, const char *data_dir, int port) {kafka_broker_t *b malloc(sizeof(kafka_broker_t));memset(b, 0, sizeof(kafka_broker_t));b-broker_id broker_id;strcpy(b-data_dir, data_dir);b-port port;b-running 1;pthread_mutex_init(b-mutex, NULL);// 创建数据目录mkdir(data_dir, 0755);printf(Kafka Broker %d 启动数据目录: %s\n, broker_id, data_dir);return b;}// 创建分区日志文件int partition_create_log(partition_t *p) {char path[512];snprintf(path, sizeof(path), ./data/%s-%d.log, p-topic, p-id);p-segment_file strdup(path);p-segment_fd open(path, O_RDWR | O_CREAT | O_APPEND, 0644);if (p-segment_fd 0) return -1;// 获取当前文件大小p-log_end_offset lseek(p-segment_fd, 0, SEEK_END) / sizeof(long long);return 0;}// 创建分区partition_t *partition_create(int id, const char *topic) {partition_t *p malloc(sizeof(partition_t));memset(p, 0, sizeof(partition_t));p-id id;strcpy(p-topic, topic);p-leader 1;p-log_end_offset 0;p-log_start_offset 0;pthread_mutex_init(p-mutex, NULL);partition_create_log(p);return p;}// 追加消息到分区int partition_append_message(partition_t *p, const char *key, int key_len,const char *value, int value_len, long long *offset) {pthread_mutex_lock(p-mutex);// 构造消息简化直接写入// 格式: offset|timestamp|key_len|key|value_len|valuechar msg_buf[MAX_MSG_SIZE];int msg_len snprintf(msg_buf, sizeof(msg_buf),%lld|%lld|%d|%s|%d|%s,p-log_end_offset, (long long)time(NULL),key_len, key ? key : ,value_len, value ? value : );// 写入文件int written write(p-segment_fd, msg_buf, msg_len);if (written 0) {pthread_mutex_unlock(p-mutex);return -1;}*offset p-log_end_offset;p-log_end_offset;// 检查是否达到分段大小if (lseek(p-segment_fd, 0, SEEK_CUR) SEGMENT_SIZE) {// 滚动新文件close(p-segment_fd);free(p-segment_file);partition_create_log(p);}pthread_mutex_unlock(p-mutex);return 0;}3. 主题与分区管理c// 创建主题topic_t *broker_create_topic(kafka_broker_t *b, const char *name,int partitions, int replication_factor) {pthread_mutex_lock(b-mutex);topic_t *t malloc(sizeof(topic_t));strcpy(t-name, name);t-partition_count partitions;t-replication_factor replication_factor;t-partitions NULL;// 创建分区for (int i 0; i partitions; i) {partition_t *p partition_create(i, name);p-next t-partitions;t-partitions p;}t-next b-topics;b-topics t;pthread_mutex_unlock(b-mutex);printf([Kafka] 创建主题: %s (分区: %d, 副本: %d)\n, name, partitions, replication_factor);return t;}// 获取分区轮询partition_t *broker_get_partition(kafka_broker_t *b, const char *topic,long long key_hash) {pthread_mutex_lock(b-mutex);topic_t *t b-topics;while (t) {if (strcmp(t-name, topic) 0) {// 根据哈希选择分区int idx key_hash % t-partition_count;partition_t *p t-partitions;for (int i 0; i idx p; i) {p p-next;}pthread_mutex_unlock(b-mutex);return p;}t t-next;}pthread_mutex_unlock(b-mutex);return NULL;}4. 消费者偏移量c// 更新偏移量int broker_update_offset(kafka_broker_t *b, const char *group_id,const char *topic, int partition_id, long long offset) {pthread_mutex_lock(b-mutex);consumer_offset_t *co b-offsets;while (co) {if (strcmp(co-group_id, group_id) 0 strcmp(co-topic, topic) 0 co-partition_id partition_id) {co-offset offset;pthread_mutex_unlock(b-mutex);return 0;}co co-next;}// 新建偏移量co malloc(sizeof(consumer_offset_t));strcpy(co-group_id, group_id);strcpy(co-topic, topic);co-partition_id partition_id;co-offset offset;co-next b-offsets;b-offsets co;pthread_mutex_unlock(b-mutex);return 0;}// 获取偏移量long long broker_get_offset(kafka_broker_t *b, const char *group_id,const char *topic, int partition_id) {pthread_mutex_lock(b-mutex);consumer_offset_t *co b-offsets;while (co) {if (strcmp(co-group_id, group_id) 0 strcmp(co-topic, topic) 0 co-partition_id partition_id) {long long offset co-offset;pthread_mutex_unlock(b-mutex);return offset;}co co-next;}pthread_mutex_unlock(b-mutex);return 0; // 从头消费}5. 生产者c// 生产者typedef struct kafka_producer {kafka_broker_t *broker;char topic[64];int partition_id;int ack_level; // 0: 不等待, 1: Leader确认, -1: 所有副本确认} kafka_producer_t;kafka_producer_t *producer_create(kafka_broker_t *b, const char *topic) {kafka_producer_t *p malloc(sizeof(kafka_producer_t));p-broker b;strcpy(p-topic, topic);p-partition_id -1; // -1表示自动选择p-ack_level 1;return p;}// 发送消息int producer_send(kafka_producer_t *p, const char *key, const char *value) {partition_t *partition NULL;if (p-partition_id 0) {// 指定分区topic_t *t p-broker-topics;while (t) {if (strcmp(t-name, p-topic) 0) {partition_t *part t-partitions;for (int i 0; i p-partition_id part; i) {part part-next;}partition part;break;}t t-next;}} else {// 自动选择轮询或哈希long long hash key ? strhash(key) : rand();partition broker_get_partition(p-broker, p-topic, hash);}if (!partition) {printf([生产者] 分区不存在\n);return -1;}long long offset;int ret partition_append_message(partition, key, key ? strlen(key) : 0,value, strlen(value), offset);if (ret 0) {printf([生产者] 发送消息: key%s, value%s, offset%lld\n,key ? key : null, value, offset);}return ret;}// 字符串哈希unsigned long strhash(const char *str) {unsigned long hash 5381;int c;while ((c *str)) {hash ((hash 5) hash) c;}return hash;}6. 消费者c// 消费者typedef struct kafka_consumer {kafka_broker_t *broker;char group_id[64];char topic[64];int partition_id;long long current_offset;int running;pthread_t thread;void (*callback)(kafka_message_t *msg);} kafka_consumer_t;kafka_consumer_t *consumer_create(kafka_broker_t *b, const char *group_id,const char *topic, int partition_id) {kafka_consumer_t *c malloc(sizeof(kafka_consumer_t));c-broker b;strcpy(c-group_id, group_id);strcpy(c-topic, topic);c-partition_id partition_id;c-running 1;c-callback NULL;// 获取上次偏移量c-current_offset broker_get_offset(b, group_id, topic, partition_id);printf([消费者] 创建: group%s, topic%s, partition%d, offset%lld\n,group_id, topic, partition_id, c-current_offset);return c;}// 消费者工作线程void *consumer_worker(void *arg) {kafka_consumer_t *c (kafka_consumer_t*)arg;while (c-running) {// 获取分区partition_t *partition broker_get_partition(c-broker, c-topic, c-partition_id);if (!partition) {usleep(100000);continue;}pthread_mutex_lock(partition-mutex);// 检查是否有新消息if (c-current_offset partition-log_end_offset) {// 读取消息简化直接从文件读取char line[4096];lseek(partition-segment_fd, 0, SEEK_SET);// 跳过已消费的消息long long skip c-current_offset;while (skip 0 fgets(line, sizeof(line), partition-segment_fd)) {skip--;}if (fgets(line, sizeof(line), partition-segment_fd)) {// 解析消息kafka_message_t msg;// 简化解析// ...msg.offset c-current_offset;if (c-callback) {c-callback(msg);}c-current_offset;broker_update_offset(c-broker, c-group_id, c-topic,c-partition_id, c-current_offset);}}pthread_mutex_unlock(partition-mutex);usleep(10000);}return NULL;}void consumer_start(kafka_consumer_t *c) {pthread_create(c-thread, NULL, consumer_worker, c);}void consumer_stop(kafka_consumer_t *c) {c-running 0;pthread_join(c-thread, NULL);}7. 测试代码cvoid test_kafka() {printf( Kafka核心实现测试 \n\n);// 创建Brokerkafka_broker_t *broker broker_create(1, ./data, 9092);// 创建主题broker_create_topic(broker, test-topic, 3, 1);// 生产者kafka_producer_t *producer producer_create(broker, test-topic);// 发送消息printf(\n--- 发送消息 ---\n);for (int i 0; i 10; i) {char key[32], value[64];snprintf(key, sizeof(key), key-%d, i);snprintf(value, sizeof(value), Hello Kafka %d, i);producer_send(producer, key, value);usleep(10000);}// 消费者printf(\n--- 消费消息 ---\n);kafka_consumer_t *consumer consumer_create(broker, group-1, test-topic, 0);consumer-callback (void(*)(kafka_message_t*)) [](kafka_message_t *msg) {printf([消费者] offset%lld, value%s\n, msg-offset, (char*)msg-value);};consumer_start(consumer);sleep(2);consumer_stop(consumer);printf(\n测试完成\n);}int main() {srand(time(NULL));test_kafka();return 0;}---三、编译和运行bashgcc -o kafka kafka.c -lpthread./kafka---四、Kafka vs 本实现特性 本实现 Kafka消息存储 文件 顺序日志文件分区 ✅ ✅副本 ❌ ✅消费组 ✅ ✅消息压缩 ❌ ✅事务 ❌ ✅高吞吐 ✅ 基础 ✅ 极高性能---五、总结通过这篇文章你学会了· Kafka的核心设计日志分段、分区、偏移量· 消息的存储与索引· 生产者消息追加· 消费者偏移量管理· 消费组的实现Kafka是消息中间件的经典之作。掌握它你就理解了万亿级消息处理系统的核心设计。下一篇预告《从零实现一个分布式数据库LSM树存储引擎》---评论区分享一下你用Kafka解决过什么场景