Files
MQSDK/kafka.go
2025-08-08 10:51:39 +08:00

169 lines
3.8 KiB
Go

package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/google/uuid"
)
// KafkaProducer Kafka生产者
type KafkaProducer struct {
producer sarama.SyncProducer
config *KafkaConfig
}
// NewKafkaProducer 创建Kafka生产者
func NewKafkaProducer(config *KafkaConfig) (*KafkaProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.Producer.Retry.Max = 5
producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
}
return &KafkaProducer{
producer: producer,
config: config,
}, nil
}
// Publish 发布消息
func (p *KafkaProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(data),
Headers: []sarama.RecordHeader{
{Key: []byte("message_id"), Value: []byte(msg.ID)},
},
}
_, _, err = p.producer.SendMessage(kafkaMsg)
return err
}
// Close 关闭连接
func (p *KafkaProducer) Close() error {
return p.producer.Close()
}
// KafkaConsumer Kafka消费者
type KafkaConsumer struct {
consumer sarama.ConsumerGroup
config *KafkaConfig
handlers map[string]MessageHandler
ctx context.Context
cancel context.CancelFunc
}
// NewKafkaConsumer 创建Kafka消费者
func NewKafkaConsumer(config *KafkaConfig) (*KafkaConsumer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
consumer, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
kafkaConsumer := &KafkaConsumer{
consumer: consumer,
config: config,
handlers: make(map[string]MessageHandler),
ctx: ctx,
cancel: cancel,
}
return kafkaConsumer, nil
}
// Subscribe 订阅主题
func (c *KafkaConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
go func() {
for {
select {
case <-c.ctx.Done():
return
default:
err := c.consumer.Consume(c.ctx, []string{topic}, c)
if err != nil {
log.Printf("Error from consumer: %v", err)
}
}
}
}()
return nil
}
// Unsubscribe 取消订阅
func (c *KafkaConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
return nil
}
// Close 关闭连接
func (c *KafkaConsumer) Close() error {
c.cancel()
return c.consumer.Close()
}
// ConsumeClaim 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
var msg Message
if err := json.Unmarshal(message.Value, &msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
continue
}
if handler, exists := c.handlers[message.Topic]; exists {
if err := handler(&msg); err != nil {
log.Printf("failed to handle message: %v", err)
}
}
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
// Setup 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}