Go语言实现消息队列从RabbitMQ到Kafka的完整指南引言消息队列是分布式系统中解耦、异步处理和削峰填谷的关键组件。Go语言提供了丰富的消息队列客户端库支持RabbitMQ、Kafka等主流消息队列。本文将深入探讨Go语言实现消息队列的实践。一、消息队列基础1.1 消息队列模式┌─────────────────────────────────────────────────────────────┐ │ 消息队列架构 │ ├─────────────────────────────────────────────────────────────┤ │ Producer │ Queue │ Consumer │ │ ┌─────────┐ │ ┌───────────┐ │ ┌─────────┐│ │ │发送消息 │───────│──│ 消息队列 │───│──────│处理消息 ││ │ └─────────┘ │ └───────────┘ │ └─────────┘│ │ │ │ │ │ Producer │ │ Consumer │ │ ┌─────────┐ │ │ ┌─────────┐│ │ │发送消息 │───────│── │──────│处理消息 ││ │ └─────────┘ │ │ └─────────┘│ └─────────────────────────────────────────────────────────────┘1.2 常见消息队列对比特性RabbitMQKafkaRedis协议AMQPKafka ProtocolRESP持久化支持支持可选吞吐量中等高高延迟低中等低消息顺序保证分区内保证不保证适用场景任务队列日志收集缓存队列二、RabbitMQ实战2.1 安装依赖go get github.com/streadway/amqp2.2 生产者实现package rabbitmq import ( log github.com/streadway/amqp ) type Producer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewProducer(uri, queueName string) (*Producer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } queue, err : channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Producer{ conn: conn, channel: channel, queue: queue, }, nil } func (p *Producer) Publish(message []byte) error { return p.channel.Publish( , // exchange p.queue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: application/json, Body: message, DeliveryMode: amqp.Persistent, }, ) } func (p *Producer) Close() { p.channel.Close() p.conn.Close() }2.3 消费者实现type Consumer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewConsumer(uri, queueName string) (*Consumer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } queue, err : channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Consumer{ conn: conn, channel: channel, queue: queue, }, nil } func (c *Consumer) Consume(handler func([]byte) error) error { messages, err : c.channel.Consume( c.queue.Name, , // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err ! nil { return err } forever : make(chan bool) go func() { for d : range messages { err : handler(d.Body) if err ! nil { log.Printf(Handler error: %v, err) d.Nack(false, true) // 重新入队 } else { d.Ack(false) } } }() -forever return nil }2.4 发布/订阅模式func NewPublisher(uri, exchangeName string) (*Producer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } err channel.ExchangeDeclare( exchangeName, fanout, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Producer{ conn: conn, channel: channel, queue: amqp.Queue{}, }, nil } func (p *Producer) PublishToExchange(exchange, routingKey string, message []byte) error { return p.channel.Publish( exchange, routingKey, false, false, amqp.Publishing{ ContentType: application/json, Body: message, }, ) }三、Kafka实战3.1 安装依赖go get github.com/segmentio/kafka-go3.2 生产者实现package kafka import ( context github.com/segmentio/kafka-go ) type Producer struct { writer *kafka.Writer } func NewProducer(brokers []string, topic string) *Producer { writer : kafka.NewWriter(kafka.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: kafka.LeastBytes{}, }) return Producer{writer: writer} } func (p *Producer) Produce(message []byte) error { return p.writer.WriteMessages(context.Background(), kafka.Message{ Value: message, }, ) } func (p *Producer) Close() error { return p.writer.Close() }3.3 消费者实现type Consumer struct { reader *kafka.Reader } func NewConsumer(brokers []string, topic, groupID string) *Consumer { reader : kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, StartOffset: kafka.FirstOffset, }) return Consumer{reader: reader} } func (c *Consumer) Consume(handler func([]byte) error) error { for { msg, err : c.reader.ReadMessage(context.Background()) if err ! nil { return err } err handler(msg.Value) if err ! nil { // 处理失败消息会被重新消费 continue } } } func (c *Consumer) Close() error { return c.reader.Close() }3.4 分区消费func NewPartitionConsumer(brokers []string, topic string, partition int) *Consumer { reader : kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, Partition: partition, StartOffset: kafka.FirstOffset, }) return Consumer{reader: reader} }四、Redis队列4.1 生产者实现package redisqueue import ( context github.com/go-redis/redis/v8 ) type Producer struct { client *redis.Client queue string } func NewProducer(client *redis.Client, queue string) *Producer { return Producer{ client: client, queue: queue, } } func (p *Producer) Enqueue(ctx context.Context, message []byte) error { return p.client.RPush(ctx, p.queue, message).Err() } func (p *Producer) EnqueuePriority(ctx context.Context, message []byte, priority int) error { // 使用有序集合实现优先级队列 return p.client.ZAdd(ctx, p.queue:priority, redis.Z{ Score: float64(priority), Member: message, }).Err() }4.2 消费者实现type Consumer struct { client *redis.Client queue string } func NewConsumer(client *redis.Client, queue string) *Consumer { return Consumer{ client: client, queue: queue, } } func (c *Consumer) Dequeue(ctx context.Context) ([]byte, error) { result, err : c.client.BLPop(ctx, 0, c.queue).Result() if err ! nil { return nil, err } return []byte(result[1]), nil } func (c *Consumer) DequeuePriority(ctx context.Context) ([]byte, error) { result, err : c.client.ZPopMax(ctx, c.queue:priority).Result() if err ! nil { return nil, err } if len(result) 0 { return nil, nil } return []byte(result[0].Member.(string)), nil }五、消息队列最佳实践5.1 消息持久化// RabbitMQ: 消息持久化 err channel.Publish( , queue.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久化消息 Body: message, }, ) // Kafka: 默认持久化 // 消息写入后会持久化到磁盘5.2 消息重试func (c *Consumer) Consume(handler func([]byte) error) error { messages, err : c.channel.Consume(...) if err ! nil { return err } for d : range messages { maxRetries : 3 var lastErr error for i : 0; i maxRetries; i { err : handler(d.Body) if err nil { d.Ack(false) break } lastErr err time.Sleep(time.Duration(i1) * time.Second) } if lastErr ! nil { // 死信队列 c.channel.Publish(, dead-letter-queue, false, false, amqp.Publishing{ Body: d.Body, }) d.Ack(false) } } }5.3 流量控制type ThrottledProducer struct { producer *Producer limiter *rate.Limiter } func NewThrottledProducer(producer *Producer, rate rate.Limit) *ThrottledProducer { return ThrottledProducer{ producer: producer, limiter: rate.NewLimiter(rate, 100), // 每秒100条 } } func (p *ThrottledProducer) Publish(message []byte) error { p.limiter.Wait(context.Background()) return p.producer.Publish(message) }六、实战异步任务处理系统type TaskQueue struct { producer *kafka.Producer consumer *kafka.Consumer handlers map[string]func([]byte) error } func NewTaskQueue(brokers []string) *TaskQueue { return TaskQueue{ producer: NewProducer(brokers, tasks), consumer: NewConsumer(brokers, tasks, task-consumers), handlers: make(map[string]func([]byte) error), } } func (tq *TaskQueue) RegisterHandler(taskType string, handler func([]byte) error) { tq.handlers[taskType] handler } func (tq *TaskQueue) Enqueue(taskType string, payload []byte) error { message, _ : json.Marshal(map[string]interface{}{ type: taskType, payload: payload, }) return tq.producer.Produce(message) } func (tq *TaskQueue) StartConsuming() error { return tq.consumer.Consume(func(message []byte) error { var msg map[string]interface{} if err : json.Unmarshal(message, msg); err ! nil { return err } taskType : msg[type].(string) payload : []byte(msg[payload].(string)) handler, exists : tq.handlers[taskType] if !exists { return fmt.Errorf(no handler for task type: %s, taskType) } return handler(payload) }) }结论消息队列是构建分布式系统的关键组件Go语言提供了丰富的客户端库支持各种消息队列。在实际项目中需要根据业务需求选择合适的消息队列RabbitMQ适合任务队列场景Kafka适合高吞吐量日志收集Redis适合简单的缓存队列场景。同时需要注意消息持久化、重试机制和流量控制以保证系统的可靠性和稳定性。
Go语言实现消息队列:从RabbitMQ到Kafka的完整指南
发布时间:2026/5/20 18:21:26
Go语言实现消息队列从RabbitMQ到Kafka的完整指南引言消息队列是分布式系统中解耦、异步处理和削峰填谷的关键组件。Go语言提供了丰富的消息队列客户端库支持RabbitMQ、Kafka等主流消息队列。本文将深入探讨Go语言实现消息队列的实践。一、消息队列基础1.1 消息队列模式┌─────────────────────────────────────────────────────────────┐ │ 消息队列架构 │ ├─────────────────────────────────────────────────────────────┤ │ Producer │ Queue │ Consumer │ │ ┌─────────┐ │ ┌───────────┐ │ ┌─────────┐│ │ │发送消息 │───────│──│ 消息队列 │───│──────│处理消息 ││ │ └─────────┘ │ └───────────┘ │ └─────────┘│ │ │ │ │ │ Producer │ │ Consumer │ │ ┌─────────┐ │ │ ┌─────────┐│ │ │发送消息 │───────│── │──────│处理消息 ││ │ └─────────┘ │ │ └─────────┘│ └─────────────────────────────────────────────────────────────┘1.2 常见消息队列对比特性RabbitMQKafkaRedis协议AMQPKafka ProtocolRESP持久化支持支持可选吞吐量中等高高延迟低中等低消息顺序保证分区内保证不保证适用场景任务队列日志收集缓存队列二、RabbitMQ实战2.1 安装依赖go get github.com/streadway/amqp2.2 生产者实现package rabbitmq import ( log github.com/streadway/amqp ) type Producer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewProducer(uri, queueName string) (*Producer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } queue, err : channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Producer{ conn: conn, channel: channel, queue: queue, }, nil } func (p *Producer) Publish(message []byte) error { return p.channel.Publish( , // exchange p.queue.Name, // routing key false, // mandatory false, // immediate amqp.Publishing{ ContentType: application/json, Body: message, DeliveryMode: amqp.Persistent, }, ) } func (p *Producer) Close() { p.channel.Close() p.conn.Close() }2.3 消费者实现type Consumer struct { conn *amqp.Connection channel *amqp.Channel queue amqp.Queue } func NewConsumer(uri, queueName string) (*Consumer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } queue, err : channel.QueueDeclare( queueName, true, // durable false, // delete when unused false, // exclusive false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Consumer{ conn: conn, channel: channel, queue: queue, }, nil } func (c *Consumer) Consume(handler func([]byte) error) error { messages, err : c.channel.Consume( c.queue.Name, , // consumer false, // auto-ack false, // exclusive false, // no-local false, // no-wait nil, // args ) if err ! nil { return err } forever : make(chan bool) go func() { for d : range messages { err : handler(d.Body) if err ! nil { log.Printf(Handler error: %v, err) d.Nack(false, true) // 重新入队 } else { d.Ack(false) } } }() -forever return nil }2.4 发布/订阅模式func NewPublisher(uri, exchangeName string) (*Producer, error) { conn, err : amqp.Dial(uri) if err ! nil { return nil, err } channel, err : conn.Channel() if err ! nil { return nil, err } err channel.ExchangeDeclare( exchangeName, fanout, // type true, // durable false, // auto-deleted false, // internal false, // no-wait nil, // arguments ) if err ! nil { return nil, err } return Producer{ conn: conn, channel: channel, queue: amqp.Queue{}, }, nil } func (p *Producer) PublishToExchange(exchange, routingKey string, message []byte) error { return p.channel.Publish( exchange, routingKey, false, false, amqp.Publishing{ ContentType: application/json, Body: message, }, ) }三、Kafka实战3.1 安装依赖go get github.com/segmentio/kafka-go3.2 生产者实现package kafka import ( context github.com/segmentio/kafka-go ) type Producer struct { writer *kafka.Writer } func NewProducer(brokers []string, topic string) *Producer { writer : kafka.NewWriter(kafka.WriterConfig{ Brokers: brokers, Topic: topic, Balancer: kafka.LeastBytes{}, }) return Producer{writer: writer} } func (p *Producer) Produce(message []byte) error { return p.writer.WriteMessages(context.Background(), kafka.Message{ Value: message, }, ) } func (p *Producer) Close() error { return p.writer.Close() }3.3 消费者实现type Consumer struct { reader *kafka.Reader } func NewConsumer(brokers []string, topic, groupID string) *Consumer { reader : kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, GroupID: groupID, StartOffset: kafka.FirstOffset, }) return Consumer{reader: reader} } func (c *Consumer) Consume(handler func([]byte) error) error { for { msg, err : c.reader.ReadMessage(context.Background()) if err ! nil { return err } err handler(msg.Value) if err ! nil { // 处理失败消息会被重新消费 continue } } } func (c *Consumer) Close() error { return c.reader.Close() }3.4 分区消费func NewPartitionConsumer(brokers []string, topic string, partition int) *Consumer { reader : kafka.NewReader(kafka.ReaderConfig{ Brokers: brokers, Topic: topic, Partition: partition, StartOffset: kafka.FirstOffset, }) return Consumer{reader: reader} }四、Redis队列4.1 生产者实现package redisqueue import ( context github.com/go-redis/redis/v8 ) type Producer struct { client *redis.Client queue string } func NewProducer(client *redis.Client, queue string) *Producer { return Producer{ client: client, queue: queue, } } func (p *Producer) Enqueue(ctx context.Context, message []byte) error { return p.client.RPush(ctx, p.queue, message).Err() } func (p *Producer) EnqueuePriority(ctx context.Context, message []byte, priority int) error { // 使用有序集合实现优先级队列 return p.client.ZAdd(ctx, p.queue:priority, redis.Z{ Score: float64(priority), Member: message, }).Err() }4.2 消费者实现type Consumer struct { client *redis.Client queue string } func NewConsumer(client *redis.Client, queue string) *Consumer { return Consumer{ client: client, queue: queue, } } func (c *Consumer) Dequeue(ctx context.Context) ([]byte, error) { result, err : c.client.BLPop(ctx, 0, c.queue).Result() if err ! nil { return nil, err } return []byte(result[1]), nil } func (c *Consumer) DequeuePriority(ctx context.Context) ([]byte, error) { result, err : c.client.ZPopMax(ctx, c.queue:priority).Result() if err ! nil { return nil, err } if len(result) 0 { return nil, nil } return []byte(result[0].Member.(string)), nil }五、消息队列最佳实践5.1 消息持久化// RabbitMQ: 消息持久化 err channel.Publish( , queue.Name, false, false, amqp.Publishing{ DeliveryMode: amqp.Persistent, // 持久化消息 Body: message, }, ) // Kafka: 默认持久化 // 消息写入后会持久化到磁盘5.2 消息重试func (c *Consumer) Consume(handler func([]byte) error) error { messages, err : c.channel.Consume(...) if err ! nil { return err } for d : range messages { maxRetries : 3 var lastErr error for i : 0; i maxRetries; i { err : handler(d.Body) if err nil { d.Ack(false) break } lastErr err time.Sleep(time.Duration(i1) * time.Second) } if lastErr ! nil { // 死信队列 c.channel.Publish(, dead-letter-queue, false, false, amqp.Publishing{ Body: d.Body, }) d.Ack(false) } } }5.3 流量控制type ThrottledProducer struct { producer *Producer limiter *rate.Limiter } func NewThrottledProducer(producer *Producer, rate rate.Limit) *ThrottledProducer { return ThrottledProducer{ producer: producer, limiter: rate.NewLimiter(rate, 100), // 每秒100条 } } func (p *ThrottledProducer) Publish(message []byte) error { p.limiter.Wait(context.Background()) return p.producer.Publish(message) }六、实战异步任务处理系统type TaskQueue struct { producer *kafka.Producer consumer *kafka.Consumer handlers map[string]func([]byte) error } func NewTaskQueue(brokers []string) *TaskQueue { return TaskQueue{ producer: NewProducer(brokers, tasks), consumer: NewConsumer(brokers, tasks, task-consumers), handlers: make(map[string]func([]byte) error), } } func (tq *TaskQueue) RegisterHandler(taskType string, handler func([]byte) error) { tq.handlers[taskType] handler } func (tq *TaskQueue) Enqueue(taskType string, payload []byte) error { message, _ : json.Marshal(map[string]interface{}{ type: taskType, payload: payload, }) return tq.producer.Produce(message) } func (tq *TaskQueue) StartConsuming() error { return tq.consumer.Consume(func(message []byte) error { var msg map[string]interface{} if err : json.Unmarshal(message, msg); err ! nil { return err } taskType : msg[type].(string) payload : []byte(msg[payload].(string)) handler, exists : tq.handlers[taskType] if !exists { return fmt.Errorf(no handler for task type: %s, taskType) } return handler(payload) }) }结论消息队列是构建分布式系统的关键组件Go语言提供了丰富的客户端库支持各种消息队列。在实际项目中需要根据业务需求选择合适的消息队列RabbitMQ适合任务队列场景Kafka适合高吞吐量日志收集Redis适合简单的缓存队列场景。同时需要注意消息持久化、重试机制和流量控制以保证系统的可靠性和稳定性。