60 lines
1.5 KiB
Go
60 lines
1.5 KiB
Go
package mqsd
|
|
|
|
import (
|
|
"fmt"
|
|
)
|
|
|
|
// Factory 消息队列工厂
|
|
type Factory struct{}
|
|
|
|
// NewFactory 创建工厂实例
|
|
func NewFactory() *Factory {
|
|
return &Factory{}
|
|
}
|
|
|
|
// NewProducer 创建生产者
|
|
func (f *Factory) NewProducer(config Config) (Producer, error) {
|
|
switch config.GetType() {
|
|
case "nsq":
|
|
if nsqConfig, ok := config.(*NSQConfig); ok {
|
|
return NewNSQProducer(nsqConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid NSQ config")
|
|
case "kafka":
|
|
if kafkaConfig, ok := config.(*KafkaConfig); ok {
|
|
return NewKafkaProducer(kafkaConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid Kafka config")
|
|
case "rabbitmq":
|
|
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
|
|
return NewRabbitMQProducer(rabbitConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid RabbitMQ config")
|
|
default:
|
|
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
|
|
}
|
|
}
|
|
|
|
// NewConsumer 创建消费者
|
|
func (f *Factory) NewConsumer(config Config) (Consumer, error) {
|
|
switch config.GetType() {
|
|
case "nsq":
|
|
if nsqConfig, ok := config.(*NSQConfig); ok {
|
|
return NewNSQConsumer(nsqConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid NSQ config")
|
|
case "kafka":
|
|
if kafkaConfig, ok := config.(*KafkaConfig); ok {
|
|
return NewKafkaConsumer(kafkaConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid Kafka config")
|
|
case "rabbitmq":
|
|
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
|
|
return NewRabbitMQConsumer(rabbitConfig)
|
|
}
|
|
return nil, fmt.Errorf("invalid RabbitMQ config")
|
|
default:
|
|
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
|
|
}
|
|
}
|