From 223097c4b3928af86346d25f9fc26f6df726e304 Mon Sep 17 00:00:00 2001 From: yyboo586 Date: Fri, 8 Aug 2025 10:13:45 +0800 Subject: [PATCH] initial commit --- README.md | 211 +++++++++++++++++++++++++++++++++++ examples/kafka/main.go | 66 +++++++++++ examples/nsq/main.go | 65 +++++++++++ examples/rabbitmq/main.go | 65 +++++++++++ factory.go | 59 ++++++++++ go.mod | 31 ++++++ go.sum | 86 +++++++++++++++ kafka.go | 168 ++++++++++++++++++++++++++++ nsq.go | 125 +++++++++++++++++++++ rabbitmq.go | 225 ++++++++++++++++++++++++++++++++++++++ test/main.go | 53 +++++++++ types.go | 75 +++++++++++++ 12 files changed, 1229 insertions(+) create mode 100644 README.md create mode 100644 examples/kafka/main.go create mode 100644 examples/nsq/main.go create mode 100644 examples/rabbitmq/main.go create mode 100644 factory.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 kafka.go create mode 100644 nsq.go create mode 100644 rabbitmq.go create mode 100644 test/main.go create mode 100644 types.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..7c0fd8e --- /dev/null +++ b/README.md @@ -0,0 +1,211 @@ +# MQSDK - 消息队列SDK + +MQSDK是一个支持NSQ、Kafka、RabbitMQ消息队列的统一Go语言SDK。 + +## 特性 + +- 支持NSQ、Kafka、RabbitMQ三种消息队列 +- 统一的接口设计,易于切换不同的消息队列 +- 工厂模式,简化创建和管理 +- 完整的错误处理和连接管理 +- 支持消息头信息 +- 自动生成消息ID和时间戳 + +## 安装 + +```bash +go get git.sixqin.com/Tom23/MQSDK +``` + +## 快速开始 + +### NSQ示例 + +```go +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.sixqin.com/Tom23/MQSDK" +) + +func main() { + // 创建NSQ配置 + config := &mqsd.NSQConfig{ + Type: "nsq", + NSQDAddr: "localhost:4150", + NSQLookup: []string{"localhost:4161"}, + } + + // 创建工厂 + factory := mqsd.NewFactory() + + // 创建生产者 + producer, err := factory.NewProducer(config) + if err != nil { + log.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // 创建消费者 + consumer, err := factory.NewConsumer(config) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer consumer.Close() + + // 订阅主题 + err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error { + fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n", + msg.ID, msg.Topic, string(msg.Body)) + return nil + }) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // 发布消息 + message := &mqsd.Message{ + Topic: "test-topic", + Body: []byte("Hello NSQ!"), + Headers: map[string]string{ + "source": "example", + }, + } + + err = producer.Publish(context.Background(), "test-topic", message) + if err != nil { + log.Fatalf("Failed to publish message: %v", err) + } + + fmt.Println("Message published successfully") + time.Sleep(2 * time.Second) +} +``` + +### Kafka示例 + +```go +// 创建Kafka配置 +config := &mqsd.KafkaConfig{ + Type: "kafka", + Brokers: []string{"localhost:9092"}, + GroupID: "test-group", + Version: "2.8.0", +} +``` + +### RabbitMQ示例 + +```go +// 创建RabbitMQ配置 +config := &mqsd.RabbitMQConfig{ + Type: "rabbitmq", + URL: "amqp://guest:guest@localhost:5672/", + Exchange: "test-exchange", +} +``` + +## 配置说明 + +### NSQ配置 + +```go +type NSQConfig struct { + Type string `json:"type"` + NSQDAddr string `json:"nsqd_addr"` // NSQD地址 + NSQLookup []string `json:"nsqlookup_addrs"` // NSQLookup地址列表 +} +``` + +### Kafka配置 + +```go +type KafkaConfig struct { + Type string `json:"type"` + Brokers []string `json:"brokers"` // Kafka broker地址列表 + GroupID string `json:"group_id"` // 消费者组ID + Version string `json:"version"` // Kafka版本 +} +``` + +### RabbitMQ配置 + +```go +type RabbitMQConfig struct { + Type string `json:"type"` + URL string `json:"url"` // RabbitMQ连接URL + Exchange string `json:"exchange"` // 交换机名称 +} +``` + +## 消息结构 + +```go +type Message struct { + ID string `json:"id"` // 消息ID + Topic string `json:"topic"` // 主题 + Body []byte `json:"body"` // 消息体 + Headers map[string]string `json:"headers"` // 消息头 + Timestamp time.Time `json:"timestamp"` // 时间戳 +} +``` + +## 接口说明 + +### Producer接口 + +```go +type Producer interface { + Publish(ctx context.Context, topic string, msg *Message) error + Close() error +} +``` + +### Consumer接口 + +```go +type Consumer interface { + Subscribe(ctx context.Context, topic string, handler MessageHandler) error + Unsubscribe(topic string) error + Close() error +} +``` + +## 运行示例 + +### NSQ示例 + +```bash +cd examples/nsq +go run main.go +``` + +### Kafka示例 + +```bash +cd examples/kafka +go run main.go +``` + +### RabbitMQ示例 + +```bash +cd examples/rabbitmq +go run main.go +``` + +## 依赖 + +- `github.com/nsqio/go-nsq` - NSQ客户端 +- `github.com/Shopify/sarama` - Kafka客户端 +- `github.com/streadway/amqp` - RabbitMQ客户端 +- `github.com/google/uuid` - UUID生成 + +## 许可证 + +MIT License \ No newline at end of file diff --git a/examples/kafka/main.go b/examples/kafka/main.go new file mode 100644 index 0000000..01e4ae0 --- /dev/null +++ b/examples/kafka/main.go @@ -0,0 +1,66 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.sixqin.com/Tom23/MQSDK" +) + +func main() { + // 创建Kafka配置 + config := &mqsd.KafkaConfig{ + Type: "kafka", + Brokers: []string{"localhost:9092"}, + GroupID: "test-group", + Version: "2.8.0", + } + + // 创建工厂 + factory := mqsd.NewFactory() + + // 创建生产者 + producer, err := factory.NewProducer(config) + if err != nil { + log.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // 创建消费者 + consumer, err := factory.NewConsumer(config) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer consumer.Close() + + // 订阅主题 + err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error { + fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n", + msg.ID, msg.Topic, string(msg.Body)) + return nil + }) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // 发布消息 + message := &mqsd.Message{ + Topic: "test-topic", + Body: []byte("Hello Kafka!"), + Headers: map[string]string{ + "source": "example", + }, + } + + err = producer.Publish(context.Background(), "test-topic", message) + if err != nil { + log.Fatalf("Failed to publish message: %v", err) + } + + fmt.Println("Message published successfully") + + // 等待一段时间让消息被消费 + time.Sleep(2 * time.Second) +} diff --git a/examples/nsq/main.go b/examples/nsq/main.go new file mode 100644 index 0000000..dc264de --- /dev/null +++ b/examples/nsq/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.sixqin.com/Tom23/MQSDK" +) + +func main() { + // 创建NSQ配置 + config := &mqsd.NSQConfig{ + Type: "nsq", + NSQDAddr: "localhost:4150", + NSQLookup: []string{"localhost:4161"}, + } + + // 创建工厂 + factory := mqsd.NewFactory() + + // 创建生产者 + producer, err := factory.NewProducer(config) + if err != nil { + log.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // 创建消费者 + consumer, err := factory.NewConsumer(config) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer consumer.Close() + + // 订阅主题 + err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error { + fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n", + msg.ID, msg.Topic, string(msg.Body)) + return nil + }) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // 发布消息 + message := &mqsd.Message{ + Topic: "test-topic", + Body: []byte("Hello NSQ!"), + Headers: map[string]string{ + "source": "example", + }, + } + + err = producer.Publish(context.Background(), "test-topic", message) + if err != nil { + log.Fatalf("Failed to publish message: %v", err) + } + + fmt.Println("Message published successfully") + + // 等待一段时间让消息被消费 + time.Sleep(2 * time.Second) +} diff --git a/examples/rabbitmq/main.go b/examples/rabbitmq/main.go new file mode 100644 index 0000000..0eaac29 --- /dev/null +++ b/examples/rabbitmq/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "git.sixqin.com/Tom23/MQSDK" +) + +func main() { + // 创建RabbitMQ配置 + config := &mqsd.RabbitMQConfig{ + Type: "rabbitmq", + URL: "amqp://guest:guest@localhost:5672/", + Exchange: "test-exchange", + } + + // 创建工厂 + factory := mqsd.NewFactory() + + // 创建生产者 + producer, err := factory.NewProducer(config) + if err != nil { + log.Fatalf("Failed to create producer: %v", err) + } + defer producer.Close() + + // 创建消费者 + consumer, err := factory.NewConsumer(config) + if err != nil { + log.Fatalf("Failed to create consumer: %v", err) + } + defer consumer.Close() + + // 订阅主题 + err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error { + fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n", + msg.ID, msg.Topic, string(msg.Body)) + return nil + }) + if err != nil { + log.Fatalf("Failed to subscribe: %v", err) + } + + // 发布消息 + message := &mqsd.Message{ + Topic: "test-topic", + Body: []byte("Hello RabbitMQ!"), + Headers: map[string]string{ + "source": "example", + }, + } + + err = producer.Publish(context.Background(), "test-topic", message) + if err != nil { + log.Fatalf("Failed to publish message: %v", err) + } + + fmt.Println("Message published successfully") + + // 等待一段时间让消息被消费 + time.Sleep(2 * time.Second) +} diff --git a/factory.go b/factory.go new file mode 100644 index 0000000..077ebe2 --- /dev/null +++ b/factory.go @@ -0,0 +1,59 @@ +package mqsd + +import ( + "fmt" +) + +// Factory 消息队列工厂 +type Factory struct{} + +// NewFactory 创建工厂实例 +func NewFactory() *Factory { + return &Factory{} +} + +// NewProducer 创建生产者 +func (f *Factory) NewProducer(config Config) (Producer, error) { + switch config.GetType() { + case "nsq": + if nsqConfig, ok := config.(*NSQConfig); ok { + return NewNSQProducer(nsqConfig) + } + return nil, fmt.Errorf("invalid NSQ config") + case "kafka": + if kafkaConfig, ok := config.(*KafkaConfig); ok { + return NewKafkaProducer(kafkaConfig) + } + return nil, fmt.Errorf("invalid Kafka config") + case "rabbitmq": + if rabbitConfig, ok := config.(*RabbitMQConfig); ok { + return NewRabbitMQProducer(rabbitConfig) + } + return nil, fmt.Errorf("invalid RabbitMQ config") + default: + return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType()) + } +} + +// NewConsumer 创建消费者 +func (f *Factory) NewConsumer(config Config) (Consumer, error) { + switch config.GetType() { + case "nsq": + if nsqConfig, ok := config.(*NSQConfig); ok { + return NewNSQConsumer(nsqConfig) + } + return nil, fmt.Errorf("invalid NSQ config") + case "kafka": + if kafkaConfig, ok := config.(*KafkaConfig); ok { + return NewKafkaConsumer(kafkaConfig) + } + return nil, fmt.Errorf("invalid Kafka config") + case "rabbitmq": + if rabbitConfig, ok := config.(*RabbitMQConfig); ok { + return NewRabbitMQConsumer(rabbitConfig) + } + return nil, fmt.Errorf("invalid RabbitMQ config") + default: + return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType()) + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..cd0180d --- /dev/null +++ b/go.mod @@ -0,0 +1,31 @@ +module git.sixqin.com/Tom23/MQSDK + +go 1.21 + +require ( + github.com/Shopify/sarama v1.38.1 + github.com/google/uuid v1.3.0 + github.com/nsqio/go-nsq v1.1.0 + github.com/streadway/amqp v1.0.0 +) + +require ( + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.3.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.15.14 // indirect + github.com/pierrec/lz4/v4 v4.1.17 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect + golang.org/x/net v0.5.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..6b6311d --- /dev/null +++ b/go.sum @@ -0,0 +1,86 @@ +github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A= +github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g= +github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc= +github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0= +github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM= +github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0= +github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw= +github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4= +github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM= +github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA= +github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= +github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= +github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= +github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= +github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= +github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= +github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= +github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= +github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= +github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= +github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc= +github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= +github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc= +github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= +github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= +github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= +golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= +golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw= +golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka.go b/kafka.go new file mode 100644 index 0000000..3bbf835 --- /dev/null +++ b/kafka.go @@ -0,0 +1,168 @@ +package mqsd + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/Shopify/sarama" + "github.com/google/uuid" +) + +// KafkaProducer Kafka生产者 +type KafkaProducer struct { + producer sarama.SyncProducer + config *KafkaConfig +} + +// NewKafkaProducer 创建Kafka生产者 +func NewKafkaProducer(config *KafkaConfig) (*KafkaProducer, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.Producer.Return.Successes = true + saramaConfig.Producer.RequiredAcks = sarama.WaitForAll + saramaConfig.Producer.Retry.Max = 5 + + producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka producer: %w", err) + } + + return &KafkaProducer{ + producer: producer, + config: config, + }, nil +} + +// Publish 发布消息 +func (p *KafkaProducer) Publish(ctx context.Context, topic string, msg *Message) error { + if msg.ID == "" { + msg.ID = uuid.New().String() + } + if msg.Timestamp.IsZero() { + msg.Timestamp = time.Now() + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + kafkaMsg := &sarama.ProducerMessage{ + Topic: topic, + Value: sarama.ByteEncoder(data), + Headers: []sarama.RecordHeader{ + {Key: []byte("message_id"), Value: []byte(msg.ID)}, + }, + } + + _, _, err = p.producer.SendMessage(kafkaMsg) + return err +} + +// Close 关闭连接 +func (p *KafkaProducer) Close() error { + return p.producer.Close() +} + +// KafkaConsumer Kafka消费者 +type KafkaConsumer struct { + consumer sarama.ConsumerGroup + config *KafkaConfig + handlers map[string]MessageHandler + ctx context.Context + cancel context.CancelFunc +} + +// NewKafkaConsumer 创建Kafka消费者 +func NewKafkaConsumer(config *KafkaConfig) (*KafkaConsumer, error) { + saramaConfig := sarama.NewConfig() + saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest + + consumer, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig) + if err != nil { + return nil, fmt.Errorf("failed to create Kafka consumer: %w", err) + } + + ctx, cancel := context.WithCancel(context.Background()) + + kafkaConsumer := &KafkaConsumer{ + consumer: consumer, + config: config, + handlers: make(map[string]MessageHandler), + ctx: ctx, + cancel: cancel, + } + + return kafkaConsumer, nil +} + +// Subscribe 订阅主题 +func (c *KafkaConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { + c.handlers[topic] = handler + + go func() { + for { + select { + case <-c.ctx.Done(): + return + default: + err := c.consumer.Consume(c.ctx, []string{topic}, c) + if err != nil { + log.Printf("Error from consumer: %v", err) + } + } + } + }() + + return nil +} + +// Unsubscribe 取消订阅 +func (c *KafkaConsumer) Unsubscribe(topic string) error { + delete(c.handlers, topic) + return nil +} + +// Close 关闭连接 +func (c *KafkaConsumer) Close() error { + c.cancel() + return c.consumer.Close() +} + +// ConsumeClaim 实现sarama.ConsumerGroupHandler接口 +func (c *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { + for { + select { + case message := <-claim.Messages(): + var msg Message + if err := json.Unmarshal(message.Value, &msg); err != nil { + log.Printf("failed to unmarshal message: %v", err) + continue + } + + if handler, exists := c.handlers[message.Topic]; exists { + if err := handler(&msg); err != nil { + log.Printf("failed to handle message: %v", err) + } + } + + session.MarkMessage(message, "") + + case <-session.Context().Done(): + return nil + } + } +} + +// Setup 实现sarama.ConsumerGroupHandler接口 +func (c *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error { + return nil +} + +// Cleanup 实现sarama.ConsumerGroupHandler接口 +func (c *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error { + return nil +} diff --git a/nsq.go b/nsq.go new file mode 100644 index 0000000..811ca33 --- /dev/null +++ b/nsq.go @@ -0,0 +1,125 @@ +package mqsd + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/google/uuid" + "github.com/nsqio/go-nsq" +) + +// NSQProducer NSQ生产者 +type NSQProducer struct { + producer *nsq.Producer + config *NSQConfig +} + +// NewNSQProducer 创建NSQ生产者 +func NewNSQProducer(config *NSQConfig) (*NSQProducer, error) { + producer, err := nsq.NewProducer(config.NSQDAddr, nsq.NewConfig()) + if err != nil { + return nil, fmt.Errorf("failed to create NSQ producer: %w", err) + } + + return &NSQProducer{ + producer: producer, + config: config, + }, nil +} + +// Publish 发布消息 +func (p *NSQProducer) Publish(ctx context.Context, topic string, msg *Message) error { + if msg.ID == "" { + msg.ID = uuid.New().String() + } + if msg.Timestamp.IsZero() { + msg.Timestamp = time.Now() + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + return p.producer.Publish(topic, data) +} + +// Close 关闭连接 +func (p *NSQProducer) Close() error { + p.producer.Stop() + return nil +} + +// NSQConsumer NSQ消费者 +type NSQConsumer struct { + consumer *nsq.Consumer + config *NSQConfig + handlers map[string]MessageHandler +} + +// NewNSQConsumer 创建NSQ消费者 +func NewNSQConsumer(config *NSQConfig) (*NSQConsumer, error) { + nsqConfig := nsq.NewConfig() + nsqConfig.MaxInFlight = 10 + + consumer, err := nsq.NewConsumer("", "default", nsqConfig) + if err != nil { + return nil, fmt.Errorf("failed to create NSQ consumer: %w", err) + } + + nsqConsumer := &NSQConsumer{ + consumer: consumer, + config: config, + handlers: make(map[string]MessageHandler), + } + + consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { + var msg Message + if err := json.Unmarshal(message.Body, &msg); err != nil { + log.Printf("failed to unmarshal message: %v", err) + return err + } + + if handler, exists := nsqConsumer.handlers[msg.Topic]; exists { + return handler(&msg) + } + + return nil + })) + + return nsqConsumer, nil +} + +// Subscribe 订阅主题 +func (c *NSQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { + c.handlers[topic] = handler + + // 连接到NSQLookup或直接连接到NSQD + var err error + if len(c.config.NSQLookup) > 0 { + err = c.consumer.ConnectToNSQLookupd(c.config.NSQLookup[0]) + } else { + err = c.consumer.ConnectToNSQD(c.config.NSQDAddr) + } + + if err != nil { + return fmt.Errorf("failed to connect to NSQ: %w", err) + } + + return nil +} + +// Unsubscribe 取消订阅 +func (c *NSQConsumer) Unsubscribe(topic string) error { + delete(c.handlers, topic) + return nil +} + +// Close 关闭连接 +func (c *NSQConsumer) Close() error { + c.consumer.Stop() + return nil +} diff --git a/rabbitmq.go b/rabbitmq.go new file mode 100644 index 0000000..8bf9eed --- /dev/null +++ b/rabbitmq.go @@ -0,0 +1,225 @@ +package mqsd + +import ( + "context" + "encoding/json" + "fmt" + "log" + "time" + + "github.com/google/uuid" + "github.com/streadway/amqp" +) + +// RabbitMQProducer RabbitMQ生产者 +type RabbitMQProducer struct { + conn *amqp.Connection + channel *amqp.Channel + config *RabbitMQConfig +} + +// NewRabbitMQProducer 创建RabbitMQ生产者 +func NewRabbitMQProducer(config *RabbitMQConfig) (*RabbitMQProducer, error) { + conn, err := amqp.Dial(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + + channel, err := conn.Channel() + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to open channel: %w", err) + } + + // 声明交换机 + err = channel.ExchangeDeclare( + config.Exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + channel.Close() + conn.Close() + return nil, fmt.Errorf("failed to declare exchange: %w", err) + } + + return &RabbitMQProducer{ + conn: conn, + channel: channel, + config: config, + }, nil +} + +// Publish 发布消息 +func (p *RabbitMQProducer) Publish(ctx context.Context, topic string, msg *Message) error { + if msg.ID == "" { + msg.ID = uuid.New().String() + } + if msg.Timestamp.IsZero() { + msg.Timestamp = time.Now() + } + + data, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message: %w", err) + } + + return p.channel.Publish( + p.config.Exchange, // exchange + topic, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: data, + Headers: amqp.Table{ + "message_id": msg.ID, + }, + }, + ) +} + +// Close 关闭连接 +func (p *RabbitMQProducer) Close() error { + if p.channel != nil { + p.channel.Close() + } + if p.conn != nil { + return p.conn.Close() + } + return nil +} + +// RabbitMQConsumer RabbitMQ消费者 +type RabbitMQConsumer struct { + conn *amqp.Connection + channel *amqp.Channel + config *RabbitMQConfig + handlers map[string]MessageHandler + queues map[string]string +} + +// NewRabbitMQConsumer 创建RabbitMQ消费者 +func NewRabbitMQConsumer(config *RabbitMQConfig) (*RabbitMQConsumer, error) { + conn, err := amqp.Dial(config.URL) + if err != nil { + return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err) + } + + channel, err := conn.Channel() + if err != nil { + conn.Close() + return nil, fmt.Errorf("failed to open channel: %w", err) + } + + // 声明交换机 + err = channel.ExchangeDeclare( + config.Exchange, // name + "topic", // type + true, // durable + false, // auto-deleted + false, // internal + false, // no-wait + nil, // arguments + ) + if err != nil { + channel.Close() + conn.Close() + return nil, fmt.Errorf("failed to declare exchange: %w", err) + } + + return &RabbitMQConsumer{ + conn: conn, + channel: channel, + config: config, + handlers: make(map[string]MessageHandler), + queues: make(map[string]string), + }, nil +} + +// Subscribe 订阅主题 +func (c *RabbitMQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error { + c.handlers[topic] = handler + + // 声明队列 + queue, err := c.channel.QueueDeclare( + "", // name (空字符串表示随机队列名) + false, // durable + false, // delete when unused + true, // exclusive + false, // no-wait + nil, // arguments + ) + if err != nil { + return fmt.Errorf("failed to declare queue: %w", err) + } + + // 绑定队列到交换机 + err = c.channel.QueueBind( + queue.Name, // queue name + topic, // routing key + c.config.Exchange, // exchange + false, + nil, + ) + if err != nil { + return fmt.Errorf("failed to bind queue: %w", err) + } + + c.queues[topic] = queue.Name + + // 开始消费消息 + msgs, err := c.channel.Consume( + queue.Name, // queue + "", // consumer + true, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + return fmt.Errorf("failed to start consuming: %w", err) + } + + go func() { + for msg := range msgs { + var message Message + if err := json.Unmarshal(msg.Body, &message); err != nil { + log.Printf("failed to unmarshal message: %v", err) + continue + } + + if err := handler(&message); err != nil { + log.Printf("failed to handle message: %v", err) + } + } + }() + + return nil +} + +// Unsubscribe 取消订阅 +func (c *RabbitMQConsumer) Unsubscribe(topic string) error { + delete(c.handlers, topic) + if queueName, exists := c.queues[topic]; exists { + c.channel.QueueDelete(queueName, false, false, false) + delete(c.queues, topic) + } + return nil +} + +// Close 关闭连接 +func (c *RabbitMQConsumer) Close() error { + if c.channel != nil { + c.channel.Close() + } + if c.conn != nil { + return c.conn.Close() + } + return nil +} diff --git a/test/main.go b/test/main.go new file mode 100644 index 0000000..409ecde --- /dev/null +++ b/test/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + + "git.sixqin.com/Tom23/MQSDK" +) + +func main() { + // 测试NSQ配置 + nsqConfig := &mqsd.NSQConfig{ + Type: "nsq", + NSQDAddr: "localhost:4150", + NSQLookup: []string{"localhost:4161"}, + } + + // 测试Kafka配置 + kafkaConfig := &mqsd.KafkaConfig{ + Type: "kafka", + Brokers: []string{"localhost:9092"}, + GroupID: "test-group", + Version: "2.8.0", + } + + // 测试RabbitMQ配置 + rabbitConfig := &mqsd.RabbitMQConfig{ + Type: "rabbitmq", + URL: "amqp://guest:guest@localhost:5672/", + Exchange: "test-exchange", + } + + // 创建工厂 + _ = mqsd.NewFactory() + + fmt.Printf("NSQ Config Type: %s\n", nsqConfig.GetType()) + fmt.Printf("Kafka Config Type: %s\n", kafkaConfig.GetType()) + fmt.Printf("RabbitMQ Config Type: %s\n", rabbitConfig.GetType()) + + // 测试消息创建 + message := &mqsd.Message{ + ID: "test-id", + Topic: "test-topic", + Body: []byte("Hello World"), + Headers: map[string]string{ + "source": "test", + }, + } + + fmt.Printf("Message created: ID=%s, Topic=%s, Body=%s\n", + message.ID, message.Topic, string(message.Body)) + + fmt.Println("Build test completed successfully!") +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..60261b2 --- /dev/null +++ b/types.go @@ -0,0 +1,75 @@ +package mqsd + +import ( + "context" + "time" +) + +// Message 消息结构体 +type Message struct { + ID string `json:"id"` + Topic string `json:"topic"` + Body []byte `json:"body"` + Headers map[string]string `json:"headers,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// MessageHandler 消息处理函数类型 +type MessageHandler func(msg *Message) error + +// Producer 生产者接口 +type Producer interface { + // Publish 发布消息 + Publish(ctx context.Context, topic string, msg *Message) error + // Close 关闭连接 + Close() error +} + +// Consumer 消费者接口 +type Consumer interface { + // Subscribe 订阅主题 + Subscribe(ctx context.Context, topic string, handler MessageHandler) error + // Unsubscribe 取消订阅 + Unsubscribe(topic string) error + // Close 关闭连接 + Close() error +} + +// Config 配置接口 +type Config interface { + GetType() string +} + +// NSQConfig NSQ配置 +type NSQConfig struct { + Type string `json:"type"` + NSQDAddr string `json:"nsqd_addr"` + NSQLookup []string `json:"nsqlookup_addrs"` +} + +func (c *NSQConfig) GetType() string { + return "nsq" +} + +// KafkaConfig Kafka配置 +type KafkaConfig struct { + Type string `json:"type"` + Brokers []string `json:"brokers"` + GroupID string `json:"group_id"` + Version string `json:"version"` +} + +func (c *KafkaConfig) GetType() string { + return "kafka" +} + +// RabbitMQConfig RabbitMQ配置 +type RabbitMQConfig struct { + Type string `json:"type"` + URL string `json:"url"` + Exchange string `json:"exchange"` +} + +func (c *RabbitMQConfig) GetType() string { + return "rabbitmq" +}