Files
MQSDK/nsq.go
2025-08-08 10:51:39 +08:00

126 lines
2.6 KiB
Go

package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/nsqio/go-nsq"
)
// NSQProducer NSQ生产者
type NSQProducer struct {
producer *nsq.Producer
config *NSQConfig
}
// NewNSQProducer 创建NSQ生产者
func NewNSQProducer(config *NSQConfig) (*NSQProducer, error) {
producer, err := nsq.NewProducer(config.NSQDAddr, nsq.NewConfig())
if err != nil {
return nil, fmt.Errorf("failed to create NSQ producer: %w", err)
}
return &NSQProducer{
producer: producer,
config: config,
}, nil
}
// Publish 发布消息
func (p *NSQProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.producer.Publish(topic, data)
}
// Close 关闭连接
func (p *NSQProducer) Close() error {
p.producer.Stop()
return nil
}
// NSQConsumer NSQ消费者
type NSQConsumer struct {
consumer *nsq.Consumer
config *NSQConfig
handlers map[string]MessageHandler
}
// NewNSQConsumer 创建NSQ消费者
func NewNSQConsumer(config *NSQConfig) (*NSQConsumer, error) {
nsqConfig := nsq.NewConfig()
nsqConfig.MaxInFlight = 10
consumer, err := nsq.NewConsumer("", "default", nsqConfig)
if err != nil {
return nil, fmt.Errorf("failed to create NSQ consumer: %w", err)
}
nsqConsumer := &NSQConsumer{
consumer: consumer,
config: config,
handlers: make(map[string]MessageHandler),
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
var msg Message
if err := json.Unmarshal(message.Body, &msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
return err
}
if handler, exists := nsqConsumer.handlers[msg.Topic]; exists {
return handler(&msg)
}
return nil
}))
return nsqConsumer, nil
}
// Subscribe 订阅主题
func (c *NSQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
// 连接到NSQLookup或直接连接到NSQD
var err error
if len(c.config.NSQLookup) > 0 {
err = c.consumer.ConnectToNSQLookupd(c.config.NSQLookup[0])
} else {
err = c.consumer.ConnectToNSQD(c.config.NSQDAddr)
}
if err != nil {
return fmt.Errorf("failed to connect to NSQ: %w", err)
}
return nil
}
// Unsubscribe 取消订阅
func (c *NSQConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
return nil
}
// Close 关闭连接
func (c *NSQConsumer) Close() error {
c.consumer.Stop()
return nil
}