169 lines
3.8 KiB
Go
169 lines
3.8 KiB
Go
package mqsd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/Shopify/sarama"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// KafkaProducer Kafka生产者
|
|
type KafkaProducer struct {
|
|
producer sarama.SyncProducer
|
|
config *KafkaConfig
|
|
}
|
|
|
|
// NewKafkaProducer 创建Kafka生产者
|
|
func NewKafkaProducer(config *KafkaConfig) (*KafkaProducer, error) {
|
|
saramaConfig := sarama.NewConfig()
|
|
saramaConfig.Producer.Return.Successes = true
|
|
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
|
|
saramaConfig.Producer.Retry.Max = 5
|
|
|
|
producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
|
|
}
|
|
|
|
return &KafkaProducer{
|
|
producer: producer,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Publish 发布消息
|
|
func (p *KafkaProducer) Publish(ctx context.Context, topic string, msg *Message) error {
|
|
if msg.ID == "" {
|
|
msg.ID = uuid.New().String()
|
|
}
|
|
if msg.Timestamp.IsZero() {
|
|
msg.Timestamp = time.Now()
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal message: %w", err)
|
|
}
|
|
|
|
kafkaMsg := &sarama.ProducerMessage{
|
|
Topic: topic,
|
|
Value: sarama.ByteEncoder(data),
|
|
Headers: []sarama.RecordHeader{
|
|
{Key: []byte("message_id"), Value: []byte(msg.ID)},
|
|
},
|
|
}
|
|
|
|
_, _, err = p.producer.SendMessage(kafkaMsg)
|
|
return err
|
|
}
|
|
|
|
// Close 关闭连接
|
|
func (p *KafkaProducer) Close() error {
|
|
return p.producer.Close()
|
|
}
|
|
|
|
// KafkaConsumer Kafka消费者
|
|
type KafkaConsumer struct {
|
|
consumer sarama.ConsumerGroup
|
|
config *KafkaConfig
|
|
handlers map[string]MessageHandler
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
// NewKafkaConsumer 创建Kafka消费者
|
|
func NewKafkaConsumer(config *KafkaConfig) (*KafkaConsumer, error) {
|
|
saramaConfig := sarama.NewConfig()
|
|
saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
|
|
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
|
|
|
|
consumer, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
kafkaConsumer := &KafkaConsumer{
|
|
consumer: consumer,
|
|
config: config,
|
|
handlers: make(map[string]MessageHandler),
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
return kafkaConsumer, nil
|
|
}
|
|
|
|
// Subscribe 订阅主题
|
|
func (c *KafkaConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
|
|
c.handlers[topic] = handler
|
|
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-c.ctx.Done():
|
|
return
|
|
default:
|
|
err := c.consumer.Consume(c.ctx, []string{topic}, c)
|
|
if err != nil {
|
|
log.Printf("Error from consumer: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
// Unsubscribe 取消订阅
|
|
func (c *KafkaConsumer) Unsubscribe(topic string) error {
|
|
delete(c.handlers, topic)
|
|
return nil
|
|
}
|
|
|
|
// Close 关闭连接
|
|
func (c *KafkaConsumer) Close() error {
|
|
c.cancel()
|
|
return c.consumer.Close()
|
|
}
|
|
|
|
// ConsumeClaim 实现sarama.ConsumerGroupHandler接口
|
|
func (c *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
|
|
for {
|
|
select {
|
|
case message := <-claim.Messages():
|
|
var msg Message
|
|
if err := json.Unmarshal(message.Value, &msg); err != nil {
|
|
log.Printf("failed to unmarshal message: %v", err)
|
|
continue
|
|
}
|
|
|
|
if handler, exists := c.handlers[message.Topic]; exists {
|
|
if err := handler(&msg); err != nil {
|
|
log.Printf("failed to handle message: %v", err)
|
|
}
|
|
}
|
|
|
|
session.MarkMessage(message, "")
|
|
|
|
case <-session.Context().Done():
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
// Setup 实现sarama.ConsumerGroupHandler接口
|
|
func (c *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error {
|
|
return nil
|
|
}
|
|
|
|
// Cleanup 实现sarama.ConsumerGroupHandler接口
|
|
func (c *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
|
|
return nil
|
|
}
|