Files
MQSDK/rabbitmq.go
2025-08-08 10:51:39 +08:00

226 lines
4.8 KiB
Go

package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/streadway/amqp"
)
// RabbitMQProducer RabbitMQ生产者
type RabbitMQProducer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
}
// NewRabbitMQProducer 创建RabbitMQ生产者
func NewRabbitMQProducer(config *RabbitMQConfig) (*RabbitMQProducer, error) {
conn, err := amqp.Dial(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
// 声明交换机
err = channel.ExchangeDeclare(
config.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
channel.Close()
conn.Close()
return nil, fmt.Errorf("failed to declare exchange: %w", err)
}
return &RabbitMQProducer{
conn: conn,
channel: channel,
config: config,
}, nil
}
// Publish 发布消息
func (p *RabbitMQProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.channel.Publish(
p.config.Exchange, // exchange
topic, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
Headers: amqp.Table{
"message_id": msg.ID,
},
},
)
}
// Close 关闭连接
func (p *RabbitMQProducer) Close() error {
if p.channel != nil {
p.channel.Close()
}
if p.conn != nil {
return p.conn.Close()
}
return nil
}
// RabbitMQConsumer RabbitMQ消费者
type RabbitMQConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
handlers map[string]MessageHandler
queues map[string]string
}
// NewRabbitMQConsumer 创建RabbitMQ消费者
func NewRabbitMQConsumer(config *RabbitMQConfig) (*RabbitMQConsumer, error) {
conn, err := amqp.Dial(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
// 声明交换机
err = channel.ExchangeDeclare(
config.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
channel.Close()
conn.Close()
return nil, fmt.Errorf("failed to declare exchange: %w", err)
}
return &RabbitMQConsumer{
conn: conn,
channel: channel,
config: config,
handlers: make(map[string]MessageHandler),
queues: make(map[string]string),
}, nil
}
// Subscribe 订阅主题
func (c *RabbitMQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
// 声明队列
queue, err := c.channel.QueueDeclare(
"", // name (空字符串表示随机队列名)
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}
// 绑定队列到交换机
err = c.channel.QueueBind(
queue.Name, // queue name
topic, // routing key
c.config.Exchange, // exchange
false,
nil,
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}
c.queues[topic] = queue.Name
// 开始消费消息
msgs, err := c.channel.Consume(
queue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to start consuming: %w", err)
}
go func() {
for msg := range msgs {
var message Message
if err := json.Unmarshal(msg.Body, &message); err != nil {
log.Printf("failed to unmarshal message: %v", err)
continue
}
if err := handler(&message); err != nil {
log.Printf("failed to handle message: %v", err)
}
}
}()
return nil
}
// Unsubscribe 取消订阅
func (c *RabbitMQConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
if queueName, exists := c.queues[topic]; exists {
c.channel.QueueDelete(queueName, false, false, false)
delete(c.queues, topic)
}
return nil
}
// Close 关闭连接
func (c *RabbitMQConsumer) Close() error {
if c.channel != nil {
c.channel.Close()
}
if c.conn != nil {
return c.conn.Close()
}
return nil
}