feature: gb28181 support playback speed

This commit is contained in:
pg
2025-04-14 22:59:26 +08:00
committed by pggiroro
parent 3d6d618a79
commit 6069ddf2c2
4 changed files with 114 additions and 37 deletions

View File

@@ -2019,7 +2019,7 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus
resp := &pb.BaseResponse{}
// 参数校验
if req.Streampath == "" {
if req.StreamPath == "" {
resp.Code = 400
resp.Message = "流路径不能为空"
return resp, nil
@@ -2027,7 +2027,7 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus
// 查找对应的dialog
dialog, ok := gb.dialogs.Find(func(d *Dialog) bool {
return d.pullCtx.StreamPath == req.Streampath
return d.pullCtx.StreamPath == req.StreamPath
})
if !ok {
resp.Code = 404
@@ -2054,9 +2054,14 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus
resp.Message = fmt.Sprintf("发送暂停请求失败: %v", err)
return resp, nil
}
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Pause()
}
return nil
})
gb.Info("暂停回放",
"streampath", req.Streampath)
"streampath", req.StreamPath)
resp.Code = 0
resp.Message = "success"
@@ -2068,7 +2073,7 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes
resp := &pb.BaseResponse{}
// 参数校验
if req.Streampath == "" {
if req.StreamPath == "" {
resp.Code = 400
resp.Message = "流路径不能为空"
return resp, nil
@@ -2076,7 +2081,7 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes
// 查找对应的dialog
dialog, ok := gb.dialogs.Find(func(d *Dialog) bool {
return d.pullCtx.StreamPath == req.Streampath
return d.pullCtx.StreamPath == req.StreamPath
})
if !ok {
resp.Code = 404
@@ -2103,9 +2108,14 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes
resp.Message = fmt.Sprintf("发送恢复请求失败: %v", err)
return resp, nil
}
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Resume()
}
return nil
})
gb.Info("恢复回放",
"streampath", req.Streampath)
"streampath", req.StreamPath)
resp.Code = 0
resp.Message = "success"
@@ -2117,7 +2127,7 @@ func (gb *GB28181Plugin) PlaybackSeek(ctx context.Context, req *pb.PlaybackSeekR
resp := &pb.BaseResponse{}
// 参数校验
if req.Streampath == "" {
if req.StreamPath == "" {
resp.Code = 400
resp.Message = "流路径不能为空"
return resp, nil
@@ -2126,7 +2136,7 @@ func (gb *GB28181Plugin) PlaybackSeek(ctx context.Context, req *pb.PlaybackSeekR
// TODO: 实现拖动播放逻辑
gb.Info("拖动回放",
"streampath", req.Streampath,
"streampath", req.StreamPath,
"seekTime", req.SeekTime)
resp.Code = 0
@@ -2139,16 +2149,54 @@ func (gb *GB28181Plugin) PlaybackSpeed(ctx context.Context, req *pb.PlaybackSpee
resp := &pb.BaseResponse{}
// 参数校验
if req.Streampath == "" {
if req.StreamPath == "" {
resp.Code = 400
resp.Message = "流路径不能为空"
return resp, nil
}
// TODO: 实现倍速播放逻辑
// 查找对应的dialog
dialog, ok := gb.dialogs.Find(func(d *Dialog) bool {
return d.pullCtx.StreamPath == req.StreamPath
})
if !ok {
resp.Code = 404
resp.Message = "未找到对应的回放会话"
return resp, nil
}
// 构建RTSP SCALE消息内容
content := strings.Builder{}
content.WriteString("PLAY RTSP/1.0\r\n")
content.WriteString(fmt.Sprintf("CSeq: %d\r\n", int(time.Now().UnixNano()/1e6%1000000)))
content.WriteString(fmt.Sprintf("Scale: %f\r\n", req.Speed))
content.WriteString("Range: npt=now-\r\n")
// 创建INFO请求
request := sip.NewRequest(sip.INFO, dialog.session.InviteRequest.Recipient)
request.SetBody([]byte(content.String()))
contentType := sip.ContentTypeHeader("Application/MANSRTSP")
request.AppendHeader(&contentType)
// 发送请求
_, err := dialog.session.TransactionRequest(ctx, request)
gb.Server.Streams.Call(func() error {
if s, ok := gb.Server.Streams.Get(req.StreamPath); ok {
s.Speed = float64(req.Speed)
s.Scale = float64(req.Speed)
s.Info("set stream speed", "speed", req.Speed)
}
return nil
})
if err != nil {
resp.Code = 500
resp.Message = fmt.Sprintf("发送倍速请求失败: %v", err)
return resp, nil
}
gb.Info("倍速回放",
"streampath", req.Streampath,
"streampath", req.StreamPath,
"speed", req.Speed)
resp.Code = 0

View File

@@ -88,8 +88,19 @@ func (d *Device) TableName() string {
func (d *Device) Dispose() {
if d.plugin.DB != nil {
d.Status = DeviceOfflineStatus
d.plugin.DB.Save(d)
if d.channels.Length > 0 {
d.channels.Range(func(channel *Channel) bool {
d.plugin.DB.Model(&gb28181.DeviceChannel{}).Where("device_id = ? AND device_db_id = ?", channel.DeviceID, d.ID).Updates(channel.DeviceChannel)
return true
})
} else {
// 如果没有通道,则直接更新通道状态为 OFF
d.plugin.DB.Model(&gb28181.DeviceChannel{}).Where("device_db_id = ?", d.ID).Update("status", "OFF")
}
}
d.plugin.devices.Remove(d)
}
func (d *Device) GetKey() string {
@@ -322,10 +333,28 @@ func (d *Device) Go() (err error) {
subTick := time.NewTicker(time.Second * 3600)
defer subTick.Stop()
catalogTick := time.NewTicker(time.Second * 60)
keepaliveSeconds := 60
if d.KeepaliveInterval >= 5 {
keepaliveSeconds = d.KeepaliveInterval
}
keepLiveTick := time.NewTicker(time.Second * 10)
defer keepLiveTick.Stop()
defer catalogTick.Stop()
for {
select {
case <-d.Done():
case <-keepLiveTick.C:
if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second {
d.Online = false
d.Status = DeviceOfflineStatus
// 设置所有通道状态为off
d.channels.Range(func(channel *Channel) bool {
channel.Status = "OFF"
return true
})
d.Stop(fmt.Errorf("device keepalive timeout after %v", timeDiff))
return
}
case <-subTick.C:
response, err = d.subscribeCatalog()
if err != nil {

View File

@@ -5682,7 +5682,7 @@ func (x *GroupChannelsResponse) GetList() []*GroupChannel {
// 回放暂停请求
type PlaybackPauseRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -5717,9 +5717,9 @@ func (*PlaybackPauseRequest) Descriptor() ([]byte, []int) {
return file_gb28181_proto_rawDescGZIP(), []int{80}
}
func (x *PlaybackPauseRequest) GetStreampath() string {
func (x *PlaybackPauseRequest) GetStreamPath() string {
if x != nil {
return x.Streampath
return x.StreamPath
}
return ""
}
@@ -5727,7 +5727,7 @@ func (x *PlaybackPauseRequest) GetStreampath() string {
// 回放恢复请求
type PlaybackResumeRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -5762,9 +5762,9 @@ func (*PlaybackResumeRequest) Descriptor() ([]byte, []int) {
return file_gb28181_proto_rawDescGZIP(), []int{81}
}
func (x *PlaybackResumeRequest) GetStreampath() string {
func (x *PlaybackResumeRequest) GetStreamPath() string {
if x != nil {
return x.Streampath
return x.StreamPath
}
return ""
}
@@ -5772,7 +5772,7 @@ func (x *PlaybackResumeRequest) GetStreampath() string {
// 回放拖动播放请求
type PlaybackSeekRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径
SeekTime int64 `protobuf:"varint,2,opt,name=seekTime,proto3" json:"seekTime,omitempty"` // 拖动偏移量单位s
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
@@ -5808,9 +5808,9 @@ func (*PlaybackSeekRequest) Descriptor() ([]byte, []int) {
return file_gb28181_proto_rawDescGZIP(), []int{82}
}
func (x *PlaybackSeekRequest) GetStreampath() string {
func (x *PlaybackSeekRequest) GetStreamPath() string {
if x != nil {
return x.Streampath
return x.StreamPath
}
return ""
}
@@ -5825,7 +5825,7 @@ func (x *PlaybackSeekRequest) GetSeekTime() int64 {
// 回放倍速播放请求
type PlaybackSpeedRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径
StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径
Speed float64 `protobuf:"fixed64,2,opt,name=speed,proto3" json:"speed,omitempty"` // 倍速0.25 0.5 1、2、4
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
@@ -5861,9 +5861,9 @@ func (*PlaybackSpeedRequest) Descriptor() ([]byte, []int) {
return file_gb28181_proto_rawDescGZIP(), []int{83}
}
func (x *PlaybackSpeedRequest) GetStreampath() string {
func (x *PlaybackSpeedRequest) GetStreamPath() string {
if x != nil {
return x.Streampath
return x.StreamPath
}
return ""
}
@@ -6662,20 +6662,20 @@ var file_gb28181_proto_rawDesc = string([]byte{
0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74,
0x22, 0x36, 0x0a, 0x14, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x75, 0x73,
0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x22, 0x37, 0x0a, 0x15, 0x50, 0x6c, 0x61, 0x79,
0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x37, 0x0a, 0x15, 0x50, 0x6c, 0x61, 0x79,
0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74,
0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18,
0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74,
0x68, 0x22, 0x51, 0x0a, 0x13, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x65,
0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65,
0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x65, 0x6b,
0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74,
0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x65, 0x6b,
0x54, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x65, 0x6b,
0x54, 0x69, 0x6d, 0x65, 0x22, 0x4c, 0x0a, 0x14, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b,
0x53, 0x70, 0x65, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05,
0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09,
0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05,
0x73, 0x70, 0x65, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x73, 0x70, 0x65,
0x65, 0x64, 0x32, 0x8b, 0x3d, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x5d, 0x0a, 0x04, 0x4c, 0x69,
0x73, 0x74, 0x12, 0x1d, 0x2e, 0x67, 0x62, 0x32, 0x38, 0x31, 0x38, 0x31, 0x70, 0x72, 0x6f, 0x2e,

View File

@@ -1088,22 +1088,22 @@ message GroupChannelsResponse {
// 回放暂停请求
message PlaybackPauseRequest {
string streampath = 1; // 回放流路径
string streamPath = 1; // 回放流路径
}
// 回放恢复请求
message PlaybackResumeRequest {
string streampath = 1; // 回放流路径
string streamPath = 1; // 回放流路径
}
// 回放拖动播放请求
message PlaybackSeekRequest {
string streampath = 1; // 回放流路径
string streamPath = 1; // 回放流路径
int64 seekTime = 2; // 拖动偏移量单位s
}
// 回放倍速播放请求
message PlaybackSpeedRequest {
string streampath = 1; // 回放流路径
string streamPath = 1; // 回放流路径
double speed = 2; // 倍速0.25 0.5 1、2、4
}