音视频模块初始化
This commit is contained in:
84
livekitManage/biz/dal/redis/chat_group_processor.go
Normal file
84
livekitManage/biz/dal/redis/chat_group_processor.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"audioAndVideoCalls/biz/dal/mysql"
|
||||
"audioAndVideoCalls/protocol"
|
||||
"encoding/json"
|
||||
"github.com/cloudwego/hertz/pkg/common/hlog"
|
||||
"log"
|
||||
"strings"
|
||||
)
|
||||
|
||||
func ChatGroupProcessorInit() {
|
||||
// 订阅所有群聊消息通道
|
||||
pubsub := rdb.PSubscribe(ctx, "group*")
|
||||
defer pubsub.Close()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
go handleRedisMessage(msg.Channel, []byte(msg.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
func handleRedisMessage(channel string, payload []byte) {
|
||||
|
||||
// 解析消息类型和目标
|
||||
parts := strings.Split(channel, ":")
|
||||
if len(parts) < 2 {
|
||||
log.Printf("无效的通道格式: %s", channel)
|
||||
return
|
||||
}
|
||||
|
||||
target := parts[1]
|
||||
|
||||
// 解析消息内容
|
||||
var msg protocol.Message
|
||||
if err := json.Unmarshal(payload, &msg); err != nil {
|
||||
hlog.Errorf("消息解析错误: %v", err)
|
||||
return
|
||||
}
|
||||
handleGroupMessage(target, msg)
|
||||
|
||||
}
|
||||
|
||||
func handleGroupMessage(groupID string, msg protocol.Message) {
|
||||
hlog.Infof("处理群聊消息: %s -> 群组 %s", msg.From, groupID)
|
||||
|
||||
// 实际应用中这里需要查询群组成员
|
||||
// 这里简化处理,假设群组有固定成员
|
||||
members, err := getGroupMembers(groupID)
|
||||
if err != nil {
|
||||
hlog.Errorf("群成员获取失败: %v", err)
|
||||
}
|
||||
|
||||
// 向每个群成员发送消息
|
||||
for _, member := range members {
|
||||
if member != msg.From { // 不发送给自己
|
||||
go forwardToUser(member, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 转发消息给群用户
|
||||
func forwardToUser(userID string, msg protocol.Message) {
|
||||
// 序列化消息
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
hlog.Errorf("消息序列化错误: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 发布到用户专属通道
|
||||
PublishToRedis(protocol.TypePrivate+":"+userID, msgBytes)
|
||||
}
|
||||
|
||||
// 获取群组成员 (简化版)
|
||||
func getGroupMembers(groupID string) ([]string, error) {
|
||||
// 实际应用中应从数据库或缓存获取
|
||||
// 这里返回固定成员
|
||||
users, err := mysql.GetGroupMemberUserIDs(groupID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return users, nil
|
||||
}
|
26
livekitManage/biz/dal/redis/init.go
Normal file
26
livekitManage/biz/dal/redis/init.go
Normal file
@@ -0,0 +1,26 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"audioAndVideoCalls/config"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/cloudwego/hertz/pkg/common/hlog"
|
||||
"github.com/go-redis/redis/v8"
|
||||
)
|
||||
|
||||
var rdb *redis.Client
|
||||
var ctx = context.Background()
|
||||
|
||||
// 初始化Redis连接
|
||||
func InitRedis() {
|
||||
rdb = redis.NewClient(&redis.Options{
|
||||
Addr: fmt.Sprintf("%s:%s", config.DbHost, config.DbPort),
|
||||
Password: config.DbPassWord,
|
||||
DB: 0,
|
||||
})
|
||||
|
||||
if _, err := rdb.Ping(ctx).Result(); err != nil {
|
||||
hlog.Errorf("无法连接Redis: %v", err)
|
||||
}
|
||||
hlog.Info("已连接Redis")
|
||||
}
|
46
livekitManage/biz/dal/redis/wsCenter.go
Normal file
46
livekitManage/biz/dal/redis/wsCenter.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"audioAndVideoCalls/protocol"
|
||||
"github.com/cloudwego/hertz/pkg/common/hlog"
|
||||
)
|
||||
|
||||
// 发布消息到Redis通道
|
||||
func PublishToRedis(channel string, message []byte) {
|
||||
if err := rdb.Publish(ctx, channel, message).Err(); err != nil {
|
||||
hlog.Errorf("发布到Redis失败: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅redis私聊通道消息
|
||||
func SubscribeToUserChannel(sendChan chan []byte, userid string) {
|
||||
pubsub := rdb.Subscribe(ctx, protocol.TypePrivate+":"+userid)
|
||||
defer pubsub.Close()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
sendChan <- []byte(msg.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅redis群聊通道消息
|
||||
func SubscribeToUserGroupChannel(sendChan chan []byte) {
|
||||
pubsub := rdb.Subscribe(ctx, protocol.TypeGroup)
|
||||
defer pubsub.Close()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
sendChan <- []byte(msg.Payload)
|
||||
}
|
||||
}
|
||||
|
||||
// 订阅redis系统通道消息
|
||||
func SubscribeToSystemChannel(sendChan chan []byte) {
|
||||
pubsub := rdb.Subscribe(ctx, protocol.TypeSys)
|
||||
defer pubsub.Close()
|
||||
|
||||
ch := pubsub.Channel()
|
||||
for msg := range ch {
|
||||
sendChan <- []byte(msg.Payload)
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user