package mqsd import ( "context" "encoding/json" "fmt" "log" "time" "github.com/google/uuid" "github.com/nsqio/go-nsq" ) // NSQProducer NSQ生产者 type NSQProducer struct { producer *nsq.Producer config *NSQConfig } // NewNSQProducer 创建NSQ生产者 func NewNSQProducer(config *NSQConfig) (*NSQProducer, error) { producer, err := nsq.NewProducer(config.NSQDAddr, nsq.NewConfig()) if err != nil { return nil, fmt.Errorf("failed to create NSQ producer: %w", err) } return &NSQProducer{ producer: producer, config: config, }, nil } // Publish 发布消息 func (p *NSQProducer) Publish(ctx context.Context, topic string, msg *Message) error { if msg.ID == "" { msg.ID = uuid.New().String() } if msg.Timestamp.IsZero() { msg.Timestamp = time.Now() } data, err := json.Marshal(msg) if err != nil { return fmt.Errorf("failed to marshal message: %w", err) } return p.producer.Publish(topic, data) } // Close 关闭连接 func (p *NSQProducer) Close() error { p.producer.Stop() return nil } // NSQConsumer NSQ消费者 type NSQConsumer struct { consumer *nsq.Consumer config *NSQConfig handlers map[string]MessageHandler } // NewNSQConsumer 创建NSQ消费者 func NewNSQConsumer(config *NSQConfig) (*NSQConsumer, error) { nsqConfig := nsq.NewConfig() nsqConfig.MaxInFlight = 10 consumer, err := nsq.NewConsumer("", "default", nsqConfig) if err != nil { return nil, fmt.Errorf("failed to create NSQ consumer: %w", err) } nsqConsumer := &NSQConsumer{ consumer: consumer, config: config, handlers: make(map[string]MessageHandler), } consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { var msg Message if err := json.Unmarshal(message.Body, &msg); err != nil { log.Printf("failed to unmarshal message: %v", err) return err } if handler, exists := nsqConsumer.handlers[msg.Topic]; exists { return handler(&msg) } return nil })) return nsqConsumer, nil } // Subscribe 订阅主题 func (c *NSQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { c.handlers[topic] = handler // 连接到NSQLookup或直接连接到NSQD var err error if len(c.config.NSQLookup) > 0 { err = c.consumer.ConnectToNSQLookupd(c.config.NSQLookup[0]) } else { err = c.consumer.ConnectToNSQD(c.config.NSQDAddr) } if err != nil { return fmt.Errorf("failed to connect to NSQ: %w", err) } return nil } // Unsubscribe 取消订阅 func (c *NSQConsumer) Unsubscribe(topic string) error { delete(c.handlers, topic) return nil } // Close 关闭连接 func (c *NSQConsumer) Close() error { c.consumer.Stop() return nil }