initial commit

This commit is contained in:
yyboo586
2025-08-08 10:13:45 +08:00
commit 223097c4b3
12 changed files with 1229 additions and 0 deletions

211
README.md Normal file
View File

@@ -0,0 +1,211 @@
# MQSDK - 消息队列SDK
MQSDK是一个支持NSQ、Kafka、RabbitMQ消息队列的统一Go语言SDK。
## 特性
- 支持NSQ、Kafka、RabbitMQ三种消息队列
- 统一的接口设计,易于切换不同的消息队列
- 工厂模式,简化创建和管理
- 完整的错误处理和连接管理
- 支持消息头信息
- 自动生成消息ID和时间戳
## 安装
```bash
go get git.sixqin.com/Tom23/MQSDK
```
## 快速开始
### NSQ示例
```go
package main
import (
"context"
"fmt"
"log"
"time"
"git.sixqin.com/Tom23/MQSDK"
)
func main() {
// 创建NSQ配置
config := &mqsd.NSQConfig{
Type: "nsq",
NSQDAddr: "localhost:4150",
NSQLookup: []string{"localhost:4161"},
}
// 创建工厂
factory := mqsd.NewFactory()
// 创建生产者
producer, err := factory.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// 创建消费者
consumer, err := factory.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// 订阅主题
err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error {
fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n",
msg.ID, msg.Topic, string(msg.Body))
return nil
})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 发布消息
message := &mqsd.Message{
Topic: "test-topic",
Body: []byte("Hello NSQ!"),
Headers: map[string]string{
"source": "example",
},
}
err = producer.Publish(context.Background(), "test-topic", message)
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
fmt.Println("Message published successfully")
time.Sleep(2 * time.Second)
}
```
### Kafka示例
```go
// 创建Kafka配置
config := &mqsd.KafkaConfig{
Type: "kafka",
Brokers: []string{"localhost:9092"},
GroupID: "test-group",
Version: "2.8.0",
}
```
### RabbitMQ示例
```go
// 创建RabbitMQ配置
config := &mqsd.RabbitMQConfig{
Type: "rabbitmq",
URL: "amqp://guest:guest@localhost:5672/",
Exchange: "test-exchange",
}
```
## 配置说明
### NSQ配置
```go
type NSQConfig struct {
Type string `json:"type"`
NSQDAddr string `json:"nsqd_addr"` // NSQD地址
NSQLookup []string `json:"nsqlookup_addrs"` // NSQLookup地址列表
}
```
### Kafka配置
```go
type KafkaConfig struct {
Type string `json:"type"`
Brokers []string `json:"brokers"` // Kafka broker地址列表
GroupID string `json:"group_id"` // 消费者组ID
Version string `json:"version"` // Kafka版本
}
```
### RabbitMQ配置
```go
type RabbitMQConfig struct {
Type string `json:"type"`
URL string `json:"url"` // RabbitMQ连接URL
Exchange string `json:"exchange"` // 交换机名称
}
```
## 消息结构
```go
type Message struct {
ID string `json:"id"` // 消息ID
Topic string `json:"topic"` // 主题
Body []byte `json:"body"` // 消息体
Headers map[string]string `json:"headers"` // 消息头
Timestamp time.Time `json:"timestamp"` // 时间戳
}
```
## 接口说明
### Producer接口
```go
type Producer interface {
Publish(ctx context.Context, topic string, msg *Message) error
Close() error
}
```
### Consumer接口
```go
type Consumer interface {
Subscribe(ctx context.Context, topic string, handler MessageHandler) error
Unsubscribe(topic string) error
Close() error
}
```
## 运行示例
### NSQ示例
```bash
cd examples/nsq
go run main.go
```
### Kafka示例
```bash
cd examples/kafka
go run main.go
```
### RabbitMQ示例
```bash
cd examples/rabbitmq
go run main.go
```
## 依赖
- `github.com/nsqio/go-nsq` - NSQ客户端
- `github.com/Shopify/sarama` - Kafka客户端
- `github.com/streadway/amqp` - RabbitMQ客户端
- `github.com/google/uuid` - UUID生成
## 许可证
MIT License

66
examples/kafka/main.go Normal file
View File

@@ -0,0 +1,66 @@
package main
import (
"context"
"fmt"
"log"
"time"
"git.sixqin.com/Tom23/MQSDK"
)
func main() {
// 创建Kafka配置
config := &mqsd.KafkaConfig{
Type: "kafka",
Brokers: []string{"localhost:9092"},
GroupID: "test-group",
Version: "2.8.0",
}
// 创建工厂
factory := mqsd.NewFactory()
// 创建生产者
producer, err := factory.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// 创建消费者
consumer, err := factory.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// 订阅主题
err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error {
fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n",
msg.ID, msg.Topic, string(msg.Body))
return nil
})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 发布消息
message := &mqsd.Message{
Topic: "test-topic",
Body: []byte("Hello Kafka!"),
Headers: map[string]string{
"source": "example",
},
}
err = producer.Publish(context.Background(), "test-topic", message)
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
fmt.Println("Message published successfully")
// 等待一段时间让消息被消费
time.Sleep(2 * time.Second)
}

65
examples/nsq/main.go Normal file
View File

@@ -0,0 +1,65 @@
package main
import (
"context"
"fmt"
"log"
"time"
"git.sixqin.com/Tom23/MQSDK"
)
func main() {
// 创建NSQ配置
config := &mqsd.NSQConfig{
Type: "nsq",
NSQDAddr: "localhost:4150",
NSQLookup: []string{"localhost:4161"},
}
// 创建工厂
factory := mqsd.NewFactory()
// 创建生产者
producer, err := factory.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// 创建消费者
consumer, err := factory.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// 订阅主题
err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error {
fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n",
msg.ID, msg.Topic, string(msg.Body))
return nil
})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 发布消息
message := &mqsd.Message{
Topic: "test-topic",
Body: []byte("Hello NSQ!"),
Headers: map[string]string{
"source": "example",
},
}
err = producer.Publish(context.Background(), "test-topic", message)
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
fmt.Println("Message published successfully")
// 等待一段时间让消息被消费
time.Sleep(2 * time.Second)
}

65
examples/rabbitmq/main.go Normal file
View File

@@ -0,0 +1,65 @@
package main
import (
"context"
"fmt"
"log"
"time"
"git.sixqin.com/Tom23/MQSDK"
)
func main() {
// 创建RabbitMQ配置
config := &mqsd.RabbitMQConfig{
Type: "rabbitmq",
URL: "amqp://guest:guest@localhost:5672/",
Exchange: "test-exchange",
}
// 创建工厂
factory := mqsd.NewFactory()
// 创建生产者
producer, err := factory.NewProducer(config)
if err != nil {
log.Fatalf("Failed to create producer: %v", err)
}
defer producer.Close()
// 创建消费者
consumer, err := factory.NewConsumer(config)
if err != nil {
log.Fatalf("Failed to create consumer: %v", err)
}
defer consumer.Close()
// 订阅主题
err = consumer.Subscribe(context.Background(), "test-topic", func(msg *mqsd.Message) error {
fmt.Printf("Received message: ID=%s, Topic=%s, Body=%s\n",
msg.ID, msg.Topic, string(msg.Body))
return nil
})
if err != nil {
log.Fatalf("Failed to subscribe: %v", err)
}
// 发布消息
message := &mqsd.Message{
Topic: "test-topic",
Body: []byte("Hello RabbitMQ!"),
Headers: map[string]string{
"source": "example",
},
}
err = producer.Publish(context.Background(), "test-topic", message)
if err != nil {
log.Fatalf("Failed to publish message: %v", err)
}
fmt.Println("Message published successfully")
// 等待一段时间让消息被消费
time.Sleep(2 * time.Second)
}

59
factory.go Normal file
View File

@@ -0,0 +1,59 @@
package mqsd
import (
"fmt"
)
// Factory 消息队列工厂
type Factory struct{}
// NewFactory 创建工厂实例
func NewFactory() *Factory {
return &Factory{}
}
// NewProducer 创建生产者
func (f *Factory) NewProducer(config Config) (Producer, error) {
switch config.GetType() {
case "nsq":
if nsqConfig, ok := config.(*NSQConfig); ok {
return NewNSQProducer(nsqConfig)
}
return nil, fmt.Errorf("invalid NSQ config")
case "kafka":
if kafkaConfig, ok := config.(*KafkaConfig); ok {
return NewKafkaProducer(kafkaConfig)
}
return nil, fmt.Errorf("invalid Kafka config")
case "rabbitmq":
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
return NewRabbitMQProducer(rabbitConfig)
}
return nil, fmt.Errorf("invalid RabbitMQ config")
default:
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
}
}
// NewConsumer 创建消费者
func (f *Factory) NewConsumer(config Config) (Consumer, error) {
switch config.GetType() {
case "nsq":
if nsqConfig, ok := config.(*NSQConfig); ok {
return NewNSQConsumer(nsqConfig)
}
return nil, fmt.Errorf("invalid NSQ config")
case "kafka":
if kafkaConfig, ok := config.(*KafkaConfig); ok {
return NewKafkaConsumer(kafkaConfig)
}
return nil, fmt.Errorf("invalid Kafka config")
case "rabbitmq":
if rabbitConfig, ok := config.(*RabbitMQConfig); ok {
return NewRabbitMQConsumer(rabbitConfig)
}
return nil, fmt.Errorf("invalid RabbitMQ config")
default:
return nil, fmt.Errorf("unsupported message queue type: %s", config.GetType())
}
}

31
go.mod Normal file
View File

@@ -0,0 +1,31 @@
module git.sixqin.com/Tom23/MQSDK
go 1.21
require (
github.com/Shopify/sarama v1.38.1
github.com/google/uuid v1.3.0
github.com/nsqio/go-nsq v1.1.0
github.com/streadway/amqp v1.0.0
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.3.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.0.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.15.14 // indirect
github.com/pierrec/lz4/v4 v4.1.17 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect
golang.org/x/net v0.5.0 // indirect
)

86
go.sum Normal file
View File

@@ -0,0 +1,86 @@
github.com/Shopify/sarama v1.38.1 h1:lqqPUPQZ7zPqYlWpTh+LQ9bhYNu2xJL6k1SJN4WVe2A=
github.com/Shopify/sarama v1.38.1/go.mod h1:iwv9a67Ha8VNa+TifujYoWGxWnu2kNVAQdSdZ4X2o5g=
github.com/Shopify/toxiproxy/v2 v2.5.0 h1:i4LPT+qrSlKNtQf5QliVjdP08GyAH8+BUIc9gT0eahc=
github.com/Shopify/toxiproxy/v2 v2.5.0/go.mod h1:yhM2epWtAmel9CB8r2+L+PCmhH6yH2pITaPAo7jxJl0=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.3.0 h1:RRL0nge+cWGlxXbUzJ7yMcq6w2XBEr19dCN6HECGaT0=
github.com/eapache/go-resiliency v1.3.0/go.mod h1:5yPzW0MIvSe0JDsv0v+DvcjEv2FyD6iZYSs1ZI+iQho=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6 h1:8yY/I9ndfrgrXUbOGObLHKBR4Fl3nZXwM2c7OYTT8hM=
github.com/eapache/go-xerial-snappy v0.0.0-20230111030713-bf00bc1b83b6/go.mod h1:YvSRo5mw33fLEx1+DlK6L2VV43tJt5Eyel9n9XBcR+0=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/securecookie v1.1.1/go.mod h1:ra0sb63/xPlUeL+yeDciTfxMRAA+MP+HVt/4epWDjd4=
github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/zI+bUmuGM=
github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/UYA=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8=
github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8=
github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs=
github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo=
github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM=
github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg=
github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo=
github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o=
github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg=
github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8=
github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/klauspost/compress v1.15.14 h1:i7WCKDToww0wA+9qrUZ1xOjp218vfFo3nTU6UHp+gOc=
github.com/klauspost/compress v1.15.14/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE=
github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY=
github.com/pierrec/lz4/v4 v4.1.17 h1:kV4Ip+/hUBC+8T6+2EgburRtkE9ef4nbY3f4dFhGjMc=
github.com/pierrec/lz4/v4 v4.1.17/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM=
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c=
golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM=
golang.org/x/net v0.5.0 h1:GyT4nK/YDHSqa1c4753ouYCDajOYKTja9Xb/OHtgvSw=
golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

168
kafka.go Normal file
View File

@@ -0,0 +1,168 @@
package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/Shopify/sarama"
"github.com/google/uuid"
)
// KafkaProducer Kafka生产者
type KafkaProducer struct {
producer sarama.SyncProducer
config *KafkaConfig
}
// NewKafkaProducer 创建Kafka生产者
func NewKafkaProducer(config *KafkaConfig) (*KafkaProducer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Producer.Return.Successes = true
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.Producer.Retry.Max = 5
producer, err := sarama.NewSyncProducer(config.Brokers, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka producer: %w", err)
}
return &KafkaProducer{
producer: producer,
config: config,
}, nil
}
// Publish 发布消息
func (p *KafkaProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
kafkaMsg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(data),
Headers: []sarama.RecordHeader{
{Key: []byte("message_id"), Value: []byte(msg.ID)},
},
}
_, _, err = p.producer.SendMessage(kafkaMsg)
return err
}
// Close 关闭连接
func (p *KafkaProducer) Close() error {
return p.producer.Close()
}
// KafkaConsumer Kafka消费者
type KafkaConsumer struct {
consumer sarama.ConsumerGroup
config *KafkaConfig
handlers map[string]MessageHandler
ctx context.Context
cancel context.CancelFunc
}
// NewKafkaConsumer 创建Kafka消费者
func NewKafkaConsumer(config *KafkaConfig) (*KafkaConsumer, error) {
saramaConfig := sarama.NewConfig()
saramaConfig.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest
consumer, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, saramaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
kafkaConsumer := &KafkaConsumer{
consumer: consumer,
config: config,
handlers: make(map[string]MessageHandler),
ctx: ctx,
cancel: cancel,
}
return kafkaConsumer, nil
}
// Subscribe 订阅主题
func (c *KafkaConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
go func() {
for {
select {
case <-c.ctx.Done():
return
default:
err := c.consumer.Consume(c.ctx, []string{topic}, c)
if err != nil {
log.Printf("Error from consumer: %v", err)
}
}
}
}()
return nil
}
// Unsubscribe 取消订阅
func (c *KafkaConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
return nil
}
// Close 关闭连接
func (c *KafkaConsumer) Close() error {
c.cancel()
return c.consumer.Close()
}
// ConsumeClaim 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
select {
case message := <-claim.Messages():
var msg Message
if err := json.Unmarshal(message.Value, &msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
continue
}
if handler, exists := c.handlers[message.Topic]; exists {
if err := handler(&msg); err != nil {
log.Printf("failed to handle message: %v", err)
}
}
session.MarkMessage(message, "")
case <-session.Context().Done():
return nil
}
}
}
// Setup 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) Setup(sarama.ConsumerGroupSession) error {
return nil
}
// Cleanup 实现sarama.ConsumerGroupHandler接口
func (c *KafkaConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}

125
nsq.go Normal file
View File

@@ -0,0 +1,125 @@
package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/nsqio/go-nsq"
)
// NSQProducer NSQ生产者
type NSQProducer struct {
producer *nsq.Producer
config *NSQConfig
}
// NewNSQProducer 创建NSQ生产者
func NewNSQProducer(config *NSQConfig) (*NSQProducer, error) {
producer, err := nsq.NewProducer(config.NSQDAddr, nsq.NewConfig())
if err != nil {
return nil, fmt.Errorf("failed to create NSQ producer: %w", err)
}
return &NSQProducer{
producer: producer,
config: config,
}, nil
}
// Publish 发布消息
func (p *NSQProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.producer.Publish(topic, data)
}
// Close 关闭连接
func (p *NSQProducer) Close() error {
p.producer.Stop()
return nil
}
// NSQConsumer NSQ消费者
type NSQConsumer struct {
consumer *nsq.Consumer
config *NSQConfig
handlers map[string]MessageHandler
}
// NewNSQConsumer 创建NSQ消费者
func NewNSQConsumer(config *NSQConfig) (*NSQConsumer, error) {
nsqConfig := nsq.NewConfig()
nsqConfig.MaxInFlight = 10
consumer, err := nsq.NewConsumer("", "default", nsqConfig)
if err != nil {
return nil, fmt.Errorf("failed to create NSQ consumer: %w", err)
}
nsqConsumer := &NSQConsumer{
consumer: consumer,
config: config,
handlers: make(map[string]MessageHandler),
}
consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
var msg Message
if err := json.Unmarshal(message.Body, &msg); err != nil {
log.Printf("failed to unmarshal message: %v", err)
return err
}
if handler, exists := nsqConsumer.handlers[msg.Topic]; exists {
return handler(&msg)
}
return nil
}))
return nsqConsumer, nil
}
// Subscribe 订阅主题
func (c *NSQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
// 连接到NSQLookup或直接连接到NSQD
var err error
if len(c.config.NSQLookup) > 0 {
err = c.consumer.ConnectToNSQLookupd(c.config.NSQLookup[0])
} else {
err = c.consumer.ConnectToNSQD(c.config.NSQDAddr)
}
if err != nil {
return fmt.Errorf("failed to connect to NSQ: %w", err)
}
return nil
}
// Unsubscribe 取消订阅
func (c *NSQConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
return nil
}
// Close 关闭连接
func (c *NSQConsumer) Close() error {
c.consumer.Stop()
return nil
}

225
rabbitmq.go Normal file
View File

@@ -0,0 +1,225 @@
package mqsd
import (
"context"
"encoding/json"
"fmt"
"log"
"time"
"github.com/google/uuid"
"github.com/streadway/amqp"
)
// RabbitMQProducer RabbitMQ生产者
type RabbitMQProducer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
}
// NewRabbitMQProducer 创建RabbitMQ生产者
func NewRabbitMQProducer(config *RabbitMQConfig) (*RabbitMQProducer, error) {
conn, err := amqp.Dial(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
// 声明交换机
err = channel.ExchangeDeclare(
config.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
channel.Close()
conn.Close()
return nil, fmt.Errorf("failed to declare exchange: %w", err)
}
return &RabbitMQProducer{
conn: conn,
channel: channel,
config: config,
}, nil
}
// Publish 发布消息
func (p *RabbitMQProducer) Publish(ctx context.Context, topic string, msg *Message) error {
if msg.ID == "" {
msg.ID = uuid.New().String()
}
if msg.Timestamp.IsZero() {
msg.Timestamp = time.Now()
}
data, err := json.Marshal(msg)
if err != nil {
return fmt.Errorf("failed to marshal message: %w", err)
}
return p.channel.Publish(
p.config.Exchange, // exchange
topic, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
Headers: amqp.Table{
"message_id": msg.ID,
},
},
)
}
// Close 关闭连接
func (p *RabbitMQProducer) Close() error {
if p.channel != nil {
p.channel.Close()
}
if p.conn != nil {
return p.conn.Close()
}
return nil
}
// RabbitMQConsumer RabbitMQ消费者
type RabbitMQConsumer struct {
conn *amqp.Connection
channel *amqp.Channel
config *RabbitMQConfig
handlers map[string]MessageHandler
queues map[string]string
}
// NewRabbitMQConsumer 创建RabbitMQ消费者
func NewRabbitMQConsumer(config *RabbitMQConfig) (*RabbitMQConsumer, error) {
conn, err := amqp.Dial(config.URL)
if err != nil {
return nil, fmt.Errorf("failed to connect to RabbitMQ: %w", err)
}
channel, err := conn.Channel()
if err != nil {
conn.Close()
return nil, fmt.Errorf("failed to open channel: %w", err)
}
// 声明交换机
err = channel.ExchangeDeclare(
config.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
channel.Close()
conn.Close()
return nil, fmt.Errorf("failed to declare exchange: %w", err)
}
return &RabbitMQConsumer{
conn: conn,
channel: channel,
config: config,
handlers: make(map[string]MessageHandler),
queues: make(map[string]string),
}, nil
}
// Subscribe 订阅主题
func (c *RabbitMQConsumer) Subscribe(ctx context.Context, topic string, handler MessageHandler) error {
c.handlers[topic] = handler
// 声明队列
queue, err := c.channel.QueueDeclare(
"", // name (空字符串表示随机队列名)
false, // durable
false, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
return fmt.Errorf("failed to declare queue: %w", err)
}
// 绑定队列到交换机
err = c.channel.QueueBind(
queue.Name, // queue name
topic, // routing key
c.config.Exchange, // exchange
false,
nil,
)
if err != nil {
return fmt.Errorf("failed to bind queue: %w", err)
}
c.queues[topic] = queue.Name
// 开始消费消息
msgs, err := c.channel.Consume(
queue.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
return fmt.Errorf("failed to start consuming: %w", err)
}
go func() {
for msg := range msgs {
var message Message
if err := json.Unmarshal(msg.Body, &message); err != nil {
log.Printf("failed to unmarshal message: %v", err)
continue
}
if err := handler(&message); err != nil {
log.Printf("failed to handle message: %v", err)
}
}
}()
return nil
}
// Unsubscribe 取消订阅
func (c *RabbitMQConsumer) Unsubscribe(topic string) error {
delete(c.handlers, topic)
if queueName, exists := c.queues[topic]; exists {
c.channel.QueueDelete(queueName, false, false, false)
delete(c.queues, topic)
}
return nil
}
// Close 关闭连接
func (c *RabbitMQConsumer) Close() error {
if c.channel != nil {
c.channel.Close()
}
if c.conn != nil {
return c.conn.Close()
}
return nil
}

53
test/main.go Normal file
View File

@@ -0,0 +1,53 @@
package main
import (
"fmt"
"git.sixqin.com/Tom23/MQSDK"
)
func main() {
// 测试NSQ配置
nsqConfig := &mqsd.NSQConfig{
Type: "nsq",
NSQDAddr: "localhost:4150",
NSQLookup: []string{"localhost:4161"},
}
// 测试Kafka配置
kafkaConfig := &mqsd.KafkaConfig{
Type: "kafka",
Brokers: []string{"localhost:9092"},
GroupID: "test-group",
Version: "2.8.0",
}
// 测试RabbitMQ配置
rabbitConfig := &mqsd.RabbitMQConfig{
Type: "rabbitmq",
URL: "amqp://guest:guest@localhost:5672/",
Exchange: "test-exchange",
}
// 创建工厂
_ = mqsd.NewFactory()
fmt.Printf("NSQ Config Type: %s\n", nsqConfig.GetType())
fmt.Printf("Kafka Config Type: %s\n", kafkaConfig.GetType())
fmt.Printf("RabbitMQ Config Type: %s\n", rabbitConfig.GetType())
// 测试消息创建
message := &mqsd.Message{
ID: "test-id",
Topic: "test-topic",
Body: []byte("Hello World"),
Headers: map[string]string{
"source": "test",
},
}
fmt.Printf("Message created: ID=%s, Topic=%s, Body=%s\n",
message.ID, message.Topic, string(message.Body))
fmt.Println("Build test completed successfully!")
}

75
types.go Normal file
View File

@@ -0,0 +1,75 @@
package mqsd
import (
"context"
"time"
)
// Message 消息结构体
type Message struct {
ID string `json:"id"`
Topic string `json:"topic"`
Body []byte `json:"body"`
Headers map[string]string `json:"headers,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// MessageHandler 消息处理函数类型
type MessageHandler func(msg *Message) error
// Producer 生产者接口
type Producer interface {
// Publish 发布消息
Publish(ctx context.Context, topic string, msg *Message) error
// Close 关闭连接
Close() error
}
// Consumer 消费者接口
type Consumer interface {
// Subscribe 订阅主题
Subscribe(ctx context.Context, topic string, handler MessageHandler) error
// Unsubscribe 取消订阅
Unsubscribe(topic string) error
// Close 关闭连接
Close() error
}
// Config 配置接口
type Config interface {
GetType() string
}
// NSQConfig NSQ配置
type NSQConfig struct {
Type string `json:"type"`
NSQDAddr string `json:"nsqd_addr"`
NSQLookup []string `json:"nsqlookup_addrs"`
}
func (c *NSQConfig) GetType() string {
return "nsq"
}
// KafkaConfig Kafka配置
type KafkaConfig struct {
Type string `json:"type"`
Brokers []string `json:"brokers"`
GroupID string `json:"group_id"`
Version string `json:"version"`
}
func (c *KafkaConfig) GetType() string {
return "kafka"
}
// RabbitMQConfig RabbitMQ配置
type RabbitMQConfig struct {
Type string `json:"type"`
URL string `json:"url"`
Exchange string `json:"exchange"`
}
func (c *RabbitMQConfig) GetType() string {
return "rabbitmq"
}