126 lines
2.6 KiB
Go
126 lines
2.6 KiB
Go
package mqsd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/nsqio/go-nsq"
|
|
)
|
|
|
|
// NSQProducer NSQ生产者
|
|
type NSQProducer struct {
|
|
producer *nsq.Producer
|
|
config *NSQConfig
|
|
}
|
|
|
|
// NewNSQProducer 创建NSQ生产者
|
|
func NewNSQProducer(config *NSQConfig) (*NSQProducer, error) {
|
|
producer, err := nsq.NewProducer(config.NSQDAddr, nsq.NewConfig())
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create NSQ producer: %w", err)
|
|
}
|
|
|
|
return &NSQProducer{
|
|
producer: producer,
|
|
config: config,
|
|
}, nil
|
|
}
|
|
|
|
// Publish 发布消息
|
|
func (p *NSQProducer) Publish(ctx context.Context, topic string, msg *Message) error {
|
|
if msg.ID == "" {
|
|
msg.ID = uuid.New().String()
|
|
}
|
|
if msg.Timestamp.IsZero() {
|
|
msg.Timestamp = time.Now()
|
|
}
|
|
|
|
data, err := json.Marshal(msg)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal message: %w", err)
|
|
}
|
|
|
|
return p.producer.Publish(topic, data)
|
|
}
|
|
|
|
// Close 关闭连接
|
|
func (p *NSQProducer) Close() error {
|
|
p.producer.Stop()
|
|
return nil
|
|
}
|
|
|
|
// NSQConsumer NSQ消费者
|
|
type NSQConsumer struct {
|
|
consumer *nsq.Consumer
|
|
config *NSQConfig
|
|
handlers map[string]MessageHandler
|
|
}
|
|
|
|
// NewNSQConsumer 创建NSQ消费者
|
|
func NewNSQConsumer(config *NSQConfig) (*NSQConsumer, error) {
|
|
nsqConfig := nsq.NewConfig()
|
|
nsqConfig.MaxInFlight = 10
|
|
|
|
consumer, err := nsq.NewConsumer("", "default", nsqConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create NSQ consumer: %w", err)
|
|
}
|
|
|
|
nsqConsumer := &NSQConsumer{
|
|
consumer: consumer,
|
|
config: config,
|
|
handlers: make(map[string]MessageHandler),
|
|
}
|
|
|
|
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
|
|
var msg Message
|
|
if err := json.Unmarshal(message.Body, &msg); err != nil {
|
|
log.Printf("failed to unmarshal message: %v", err)
|
|
return err
|
|
}
|
|
|
|
if handler, exists := nsqConsumer.handlers[msg.Topic]; exists {
|
|
return handler(&msg)
|
|
}
|
|
|
|
return nil
|
|
}))
|
|
|
|
return nsqConsumer, nil
|
|
}
|
|
|
|
// Subscribe 订阅主题
|
|
func (c *NSQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
|
|
c.handlers[topic] = handler
|
|
|
|
// 连接到NSQLookup或直接连接到NSQD
|
|
var err error
|
|
if len(c.config.NSQLookup) > 0 {
|
|
err = c.consumer.ConnectToNSQLookupd(c.config.NSQLookup[0])
|
|
} else {
|
|
err = c.consumer.ConnectToNSQD(c.config.NSQDAddr)
|
|
}
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to NSQ: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Unsubscribe 取消订阅
|
|
func (c *NSQConsumer) Unsubscribe(topic string) error {
|
|
delete(c.handlers, topic)
|
|
return nil
|
|
}
|
|
|
|
// Close 关闭连接
|
|
func (c *NSQConsumer) Close() error {
|
|
c.consumer.Stop()
|
|
return nil
|
|
}
|