Files
MQSDK/factory.go
2025-08-08 10:51:39 +08:00

60 lines
1.5 KiB
Go

package mqsd
import (
"fmt"
)
// Factory 消息队列工厂
type Factory struct{}
// NewFactory 创建工厂实例
func NewFactory() *Factory {
return &Factory{}
}
// NewProducer 创建生产者
func (f *Factory) NewProducer(config Config) (Producer, error) {
switch config.GetType() {
case "nsq":
if nsqConfig, ok := config.(*NSQConfig); ok {
return NewNSQProducer(nsqConfig)
}
return nil, fmt.Errorf("invalid NSQ config")
case "kafka":
if kafkaConfig, ok := config.(*KafkaConfig); ok {
return NewKafkaProducer(kafkaConfig)
}
return nil, fmt.Errorf("invalid Kafka config")
case "rabbitmq":
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
return NewRabbitMQProducer(rabbitConfig)
}
return nil, fmt.Errorf("invalid RabbitMQ config")
default:
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
}
}
// NewConsumer 创建消费者
func (f *Factory) NewConsumer(config Config) (Consumer, error) {
switch config.GetType() {
case "nsq":
if nsqConfig, ok := config.(*NSQConfig); ok {
return NewNSQConsumer(nsqConfig)
}
return nil, fmt.Errorf("invalid NSQ config")
case "kafka":
if kafkaConfig, ok := config.(*KafkaConfig); ok {
return NewKafkaConsumer(kafkaConfig)
}
return nil, fmt.Errorf("invalid Kafka config")
case "rabbitmq":
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
return NewRabbitMQConsumer(rabbitConfig)
}
return nil, fmt.Errorf("invalid RabbitMQ config")
default:
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
}
}