视频通话功能实现

This commit is contained in:
2025-07-30 18:54:00 +08:00
parent cc831e127b
commit 5838515b9a
15 changed files with 2493 additions and 64 deletions

View File

@@ -8,5 +8,6 @@ import (
func Init() {
redis.InitRedis()
go redis.AudioCallProcessorInit()
go redis.VideoCallProcessorInit()
go mysql.Init()
}

View File

@@ -109,14 +109,36 @@ func (c *RoomClient) CreateAudioRoom() (*livekit.Room, error) {
return room, nil
}
// CreateVideoRoom 创建视频房间
func (c *RoomClient) CreateVideoRoom() (*livekit.Room, error) {
hlog.Infof("start CreateVideoRoom")
creatReq := &livekit.CreateRoomRequest{
Name: utils.GenerateUniqueVideoRoomName(),
EmptyTimeout: 60,
MaxParticipants: 2,
MinPlayoutDelay: 0,
MaxPlayoutDelay: 0,
SyncStreams: false,
}
room, err := c.client.CreateRoom(context.Background(), creatReq)
if err != nil {
return nil, fmt.Errorf("创建房间失败: %w", err)
}
hlog.Info("create room:", room)
return room, nil
}
// ListRooms 列出房间
func (c *RoomClient) ListRooms(req *livekit.ListRoomsRequest) (*livekit.ListRoomsResponse, error) {
func (c *RoomClient) ListRooms() (*livekit.ListRoomsResponse, error) {
if !c.isInit {
if err := c.Init(); err != nil {
return nil, err
}
}
req := &livekit.ListRoomsRequest{
Names: nil,
}
resp, err := c.client.ListRooms(context.Background(), req)
if err != nil {
return nil, fmt.Errorf("列出房间失败: %w", err)
@@ -158,7 +180,7 @@ func (c *RoomClient) GetJoinToken(roomName, userId, userName string) string {
}
// 用户名可为空,使用默认值
if userName == "" {
userName = "user_" + userId[:6] // 取用户ID前6位作为默认名
userName = userId // 取用户ID
}
// 2. 初始化访问令牌
@@ -175,8 +197,8 @@ func (c *RoomClient) GetJoinToken(roomName, userId, userName string) string {
Room: roomName, // 绑定具体房间(限制只能加入该房间)
CanUpdateOwnMetadata: &metaOpen, // 允许更新自己的元数据(如昵称、头像)
// 可根据业务需求添加更多权限,如:
// CanPublish: true, // 允许发布媒体流
// CanSubscribe: true, // 允许订阅媒体流
CanPublish: &metaOpen, // 允许发布媒体流
CanSubscribe: &metaOpen, // 允许订阅媒体流
}
// 4. 设置令牌属性

View File

@@ -15,6 +15,7 @@ import (
"time"
)
// AudioCallProcessorInit 音频通话消息监听器
func AudioCallProcessorInit() {
// 订阅所有音频通话消息通道
pubsub := rdb.PSubscribe(ctx, "audioCall*")
@@ -22,11 +23,12 @@ func AudioCallProcessorInit() {
ch := pubsub.Channel()
for msg := range ch {
go handleRedisMessage(msg.Channel, []byte(msg.Payload))
go audioHandleRedisMessage(msg.Channel, []byte(msg.Payload))
}
}
func handleRedisMessage(channel string, payload []byte) {
// 音频消息解析器
func audioHandleRedisMessage(channel string, payload []byte) {
// 解析消息类型和目标
parts := strings.Split(channel, ":")
@@ -44,10 +46,10 @@ func handleRedisMessage(channel string, payload []byte) {
return
}
//handleGroupMessage(target, msg)
go HandleUserMessage(target, msg)
go audioHandleUserMessage(target, msg)
}
func handleGroupMessage(groupID string, msg protocol.Message) {
func audioHandleGroupMessage(groupID string, msg protocol.Message) {
hlog.Infof("处理音频通话消息: %s -> 群组 %s", msg.From, groupID)
// 查询群组成员
@@ -64,7 +66,7 @@ func handleGroupMessage(groupID string, msg protocol.Message) {
}
}
func HandleUserMessage(groupID string, msg protocol.Message) {
func audioHandleUserMessage(groupID string, msg protocol.Message) {
hlog.Infof("处理音频通话消息: %s -> user: %s", msg.From, groupID)
if msg.Content != protocol.AudioCallAgree {
@@ -80,7 +82,7 @@ func HandleUserMessage(groupID string, msg protocol.Message) {
}
fromUserToken := client.GetJoinToken(room.Name, msg.From, "")
toUserTokem := client.GetJoinToken(room.Name, msg.To, "")
hlog.Infof("获取访问密钥成功 -> fromUserToken:%s toUserTokem:%s", fromUserToken, toUserTokem)
hlog.Infof("获取访问密钥成功 -> fromUserToken:%s \n toUserTokem:%s", fromUserToken, toUserTokem)
server := fmt.Sprintf("%s:%s", config.LivekitHost, config.LivekitPort)
expand_from := protocol.AudioCallMessage{
Room: room.Name,
@@ -178,3 +180,32 @@ func AudioCall(req audioAndVideoCalls.AudioCallReq) error {
PublishToRedis(protocol.TypePrivate+":"+req.TargetUserID, msgBytes)
return nil
}
// VideoCall 发起通话申请
func VideoCall(req audioAndVideoCalls.VideoCallReq) error {
if req.UserID == "" {
return errors.New("invalid user id")
}
if req.TargetUserID == "" {
return errors.New("invalid target user id")
}
msg := protocol.Message{
Type: protocol.TypeVideoCall,
FileType: "",
From: req.UserID,
To: req.TargetUserID,
Content: "Video Call",
MsgID: "",
SendTime: time.Now().Format("2006-01-02 15:04:05"),
Expand: nil,
}
// 序列化消息
msgBytes, err := json.Marshal(msg)
if err != nil {
hlog.Errorf("消息序列化错误: %v", err)
return err
}
PublishToRedis(protocol.TypePrivate+":"+req.TargetUserID, msgBytes)
return nil
}

View File

@@ -0,0 +1,128 @@
package redis
import (
"audioAndVideoCalls/biz/dal/livekit"
audioAndVideoCalls "audioAndVideoCalls/biz/model/audioAndVideoCalls"
"audioAndVideoCalls/config"
"audioAndVideoCalls/protocol"
"encoding/json"
"fmt"
"github.com/cloudwego/hertz/pkg/common/hlog"
lsdk "github.com/livekit/protocol/livekit"
"log"
"strings"
"time"
)
// VideoCallProcessorInit 视频通话消息监听器
func VideoCallProcessorInit() {
// 订阅所有音频通话消息通道
pubsub := rdb.PSubscribe(ctx, "videoCall*")
defer pubsub.Close()
ch := pubsub.Channel()
for msg := range ch {
go videoHandleRedisMessage(msg.Channel, []byte(msg.Payload))
}
}
// 音频消息解析器
func videoHandleRedisMessage(channel string, payload []byte) {
// 解析消息类型和目标
parts := strings.Split(channel, ":")
if len(parts) < 2 {
log.Printf("无效的通道格式: %s", channel)
return
}
target := parts[1]
// 解析消息内容
var msg protocol.Message
if err := json.Unmarshal(payload, &msg); err != nil {
hlog.Errorf("消息解析错误: %v", err)
return
}
go videoHandleUserMessage(target, msg)
}
func videoHandleUserMessage(groupID string, msg protocol.Message) {
hlog.Infof("处理视频通话消息: %s -> user: %s", msg.From, groupID)
if msg.Content != protocol.VideoCallAgree {
return
}
// 向每个参与者员发送房间和权限消息
client := livekit.GetGlobalRoomClient()
room, err := client.CreateVideoRoom()
if err != nil {
hlog.Errorf("创建视频房间失败: %v", err)
return
}
fromUserToken := client.GetJoinToken(room.Name, msg.From, "")
toUserTokem := client.GetJoinToken(room.Name, msg.To, "")
hlog.Infof("获取访问密钥成功 -> fromUserToken:%s \n toUserTokem:%s", fromUserToken, toUserTokem)
server := fmt.Sprintf("%s:%s", config.LivekitHost, config.LivekitPort)
expand_from := protocol.AudioCallMessage{
Room: room.Name,
Token: fromUserToken,
Server: server,
}
expand_from_marshal, err := json.Marshal(expand_from)
if err != nil {
hlog.Errorf("序列化 expand_from 失败: %v", err)
return
}
expand_to := protocol.AudioCallMessage{
Room: room.Name,
Token: toUserTokem,
Server: server,
}
expand_to_marshal, err := json.Marshal(expand_to)
if err != nil {
hlog.Errorf("序列化 expand_from 失败: %v", err)
return
}
msg_from := protocol.Message{
Type: protocol.TypeVideoCall,
FileType: "",
From: "sys",
To: msg.From,
Content: "",
MsgID: "",
SendTime: time.Now().Format("2006-01-02 15:04:05"),
Expand: expand_from_marshal,
}
msg_to := protocol.Message{
Type: protocol.TypeVideoCall,
FileType: "",
From: "sys",
To: msg.To,
Content: "",
MsgID: "",
SendTime: time.Now().Format("2006-01-02 15:04:05"),
Expand: expand_to_marshal,
}
go forwardToUser(msg.From, msg_from)
go forwardToUser(msg.To, msg_to)
}
func RoomList() (*lsdk.ListRoomsResponse, error) {
client := livekit.GetGlobalRoomClient()
data, err := client.ListRooms()
if err != nil {
return nil, err
}
return data, nil
}
func RoomRemove(req audioAndVideoCalls.RemoveRoomReq) error {
client := livekit.GetGlobalRoomClient()
err := client.DeleteRoom(req.RoomID)
if err != nil {
return err
}
return nil
}

View File

@@ -10,9 +10,9 @@ import (
"github.com/cloudwego/hertz/pkg/protocol/consts"
)
// Call .
// AudioCall .
// @router /audioCall [POST]
func Call(ctx context.Context, c *app.RequestContext) {
func AudioCall(ctx context.Context, c *app.RequestContext) {
var err error
var req audioAndVideoCalls.AudioCallReq
err = c.BindAndValidate(&req)

View File

@@ -0,0 +1,51 @@
// Code generated by hertz generator.
package audioAndVideoCalls
import (
"audioAndVideoCalls/biz/dal/redis"
audioAndVideoCalls "audioAndVideoCalls/biz/model/audioAndVideoCalls"
"context"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/protocol/consts"
)
// RoomList .
// @router /roomlist [GET]
func RoomList(ctx context.Context, c *app.RequestContext) {
var err error
var req audioAndVideoCalls.RoomListReq
err = c.BindAndValidate(&req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
data, err := redis.RoomList()
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
c.JSON(consts.StatusOK, data)
}
// RoomRemove .
// @router /room [DELETE]
func RoomRemove(ctx context.Context, c *app.RequestContext) {
var err error
var req audioAndVideoCalls.RemoveRoomReq
err = c.BindAndValidate(&req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
err = redis.RoomRemove(req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
resp := new(audioAndVideoCalls.RemoveRoomResp)
c.JSON(consts.StatusOK, resp)
}

View File

@@ -0,0 +1,32 @@
// Code generated by hertz generator.
package audioAndVideoCalls
import (
"audioAndVideoCalls/biz/dal/redis"
"context"
audioAndVideoCalls "audioAndVideoCalls/biz/model/audioAndVideoCalls"
"github.com/cloudwego/hertz/pkg/app"
"github.com/cloudwego/hertz/pkg/protocol/consts"
)
// VideoCall .
// @router /videoCall [POST]
func VideoCall(ctx context.Context, c *app.RequestContext) {
var err error
var req audioAndVideoCalls.VideoCallReq
err = c.BindAndValidate(&req)
if err != nil {
c.String(consts.StatusBadRequest, err.Error())
return
}
err = redis.VideoCall(req)
if err != nil {
c.JSON(consts.StatusBadRequest, err.Error())
return
}
resp := new(audioAndVideoCalls.VideoCallResp)
c.JSON(consts.StatusOK, resp)
}

View File

@@ -17,5 +17,8 @@ import (
func Register(r *server.Hertz) {
root := r.Group("/", rootMw()...)
root.POST("/audioCall", append(_callMw(), audioAndVideoCalls.Call)...)
root.POST("/audioCall", append(_audiocallMw(), audioAndVideoCalls.AudioCall)...)
root.DELETE("/room", append(_roomremoveMw(), audioAndVideoCalls.RoomRemove)...)
root.GET("/roomlist", append(_roomlistMw(), audioAndVideoCalls.RoomList)...)
root.POST("/videoCall", append(_videocallMw(), audioAndVideoCalls.VideoCall)...)
}

View File

@@ -20,3 +20,28 @@ func _callMw() []app.HandlerFunc {
// your code...
return nil
}
func _call0Mw() []app.HandlerFunc {
// your code...
return nil
}
func _audiocallMw() []app.HandlerFunc {
// your code...
return nil
}
func _videocallMw() []app.HandlerFunc {
// your code...
return nil
}
func _roomlistMw() []app.HandlerFunc {
// your code...
return nil
}
func _roomremoveMw() []app.HandlerFunc {
// your code...
return nil
}

View File

@@ -28,3 +28,25 @@ func GenerateUniqueAudioRoomName() string {
// 3. 拼接结果
return fmt.Sprintf("audio%s-%s", hexTimestamp, randomStr)
}
// GenerateUniqueVideoRoomName 生成唯一视频房间名
// 格式:[时间戳(16进制)]-[随机字符(4位)]
func GenerateUniqueVideoRoomName() string {
// 1. 获取毫秒级时间戳并转为16进制缩短长度
timestamp := time.Now().UnixMilli() // 毫秒级时间戳13位数字
hexTimestamp := fmt.Sprintf("%x", timestamp) // 转为16进制约10-11位
// 2. 生成4位随机字符字母+数字)
randomBytes := make([]byte, 3) // 3字节经base64编码后约4字符
_, err := rand.Read(randomBytes)
if err != nil {
// 极端情况下随机数生成失败,用当前纳秒补充(降低重复风险)
ns := time.Now().UnixNano() % 10000
return fmt.Sprintf("%s-%04d", hexTimestamp, ns)
}
// 取base64的前4位过滤特殊字符
randomStr := base64.URLEncoding.EncodeToString(randomBytes)[:4]
// 3. 拼接结果
return fmt.Sprintf("video%s-%s", hexTimestamp, randomStr)
}

View File

@@ -20,5 +20,42 @@ struct audioCallResp {
service AudioCallService {
audioCallResp Call(1: audioCallReq request) (api.post="/audioCall");
audioCallResp audioCall(1: audioCallReq request) (api.post="/audioCall");
}
struct videoCallReq {
1: string user_id (api.body="user_id");
2: string target_user_id (api.body="target_user_id");
}
struct videoCallResp {
1: Code code
2: string msg
}
service VideoCallService {
videoCallResp videoCall(1: videoCallReq request) (api.post="/videoCall");
}
struct roomListReq {
}
struct roomListResp {
}
struct RemoveRoomReq {
1: string room_id (api.body="room_id");
}
struct RemoveRoomResp {
1: Code code
2: string msg
}
service RoomService {
roomListResp RoomList(1: roomListReq request) (api.get="/roomlist");
RemoveRoomResp RoomRemove(1: RemoveRoomReq request) (api.delete="/room");
}

View File

@@ -8,12 +8,18 @@ const (
TypeGroup = "group"
TypeSys = "sys"
TypeAudioCall = "audioCall"
TypeVideoCall = "videoCall"
)
// 消息固定值
const (
//音频通话
AudioCallAgree = "AudioCallAgree"
AudioCallRefuse = "AudioCallRefuse"
//视频通话
VideoCallAgree = "VideoCallAgree"
VideoCallRefuse = "videoCallRefuse"
)
// 消息结构
@@ -40,3 +46,10 @@ type AudioCallMessage struct {
Token string `json:"token"`
Server string `json:"server"`
}
// 视频通话消息
type VideoCallMessage struct {
Room string `json:"room"`
Token string `json:"token"`
Server string `json:"server"`
}

View File

@@ -123,6 +123,8 @@ func handleIncomingMessage(senderID string, msg []byte) {
redis.PublishToRedis(protocol.TypeSys, msg)
case protocol.TypeAudioCall:
redis.PublishToRedis(protocol.TypeAudioCall+":"+message.To, msg)
case protocol.TypeVideoCall:
redis.PublishToRedis(protocol.TypeVideoCall+":"+message.To, msg)
default:
hlog.Errorf("未知消息类型: %s", message.Type)

View File

@@ -8,12 +8,18 @@ const (
TypeGroup = "group"
TypeSys = "sys"
TypeAudioCall = "audioCall"
TypeVideoCall = "videoCall"
)
// 消息固定值
const (
//音频通话
AudioCallAgree = "AudioCallAgree"
AudioCallRefuse = "AudioCallRefuse"
//视频通话
VideoCallAgree = "VideoCallAgree"
VideoCallRefuse = "videoCallRefuse"
)
// 消息结构
@@ -40,3 +46,10 @@ type AudioCallMessage struct {
Token string `json:"token"`
Server string `json:"server"`
}
// 视频通话消息
type VideoCallMessage struct {
Room string `json:"room"`
Token string `json:"token"`
Server string `json:"server"`
}