diff --git a/plugin/gb28181/api.go b/plugin/gb28181/api.go index 46e7f94..010f78a 100644 --- a/plugin/gb28181/api.go +++ b/plugin/gb28181/api.go @@ -2019,7 +2019,7 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus resp := &pb.BaseResponse{} // 参数校验 - if req.Streampath == "" { + if req.StreamPath == "" { resp.Code = 400 resp.Message = "流路径不能为空" return resp, nil @@ -2027,7 +2027,7 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus // 查找对应的dialog dialog, ok := gb.dialogs.Find(func(d *Dialog) bool { - return d.pullCtx.StreamPath == req.Streampath + return d.pullCtx.StreamPath == req.StreamPath }) if !ok { resp.Code = 404 @@ -2054,9 +2054,14 @@ func (gb *GB28181Plugin) PlaybackPause(ctx context.Context, req *pb.PlaybackPaus resp.Message = fmt.Sprintf("发送暂停请求失败: %v", err) return resp, nil } - + gb.Server.Streams.Call(func() error { + if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { + s.Pause() + } + return nil + }) gb.Info("暂停回放", - "streampath", req.Streampath) + "streampath", req.StreamPath) resp.Code = 0 resp.Message = "success" @@ -2068,7 +2073,7 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes resp := &pb.BaseResponse{} // 参数校验 - if req.Streampath == "" { + if req.StreamPath == "" { resp.Code = 400 resp.Message = "流路径不能为空" return resp, nil @@ -2076,7 +2081,7 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes // 查找对应的dialog dialog, ok := gb.dialogs.Find(func(d *Dialog) bool { - return d.pullCtx.StreamPath == req.Streampath + return d.pullCtx.StreamPath == req.StreamPath }) if !ok { resp.Code = 404 @@ -2103,9 +2108,14 @@ func (gb *GB28181Plugin) PlaybackResume(ctx context.Context, req *pb.PlaybackRes resp.Message = fmt.Sprintf("发送恢复请求失败: %v", err) return resp, nil } - + gb.Server.Streams.Call(func() error { + if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { + s.Resume() + } + return nil + }) gb.Info("恢复回放", - "streampath", req.Streampath) + "streampath", req.StreamPath) resp.Code = 0 resp.Message = "success" @@ -2117,7 +2127,7 @@ func (gb *GB28181Plugin) PlaybackSeek(ctx context.Context, req *pb.PlaybackSeekR resp := &pb.BaseResponse{} // 参数校验 - if req.Streampath == "" { + if req.StreamPath == "" { resp.Code = 400 resp.Message = "流路径不能为空" return resp, nil @@ -2126,7 +2136,7 @@ func (gb *GB28181Plugin) PlaybackSeek(ctx context.Context, req *pb.PlaybackSeekR // TODO: 实现拖动播放逻辑 gb.Info("拖动回放", - "streampath", req.Streampath, + "streampath", req.StreamPath, "seekTime", req.SeekTime) resp.Code = 0 @@ -2139,16 +2149,54 @@ func (gb *GB28181Plugin) PlaybackSpeed(ctx context.Context, req *pb.PlaybackSpee resp := &pb.BaseResponse{} // 参数校验 - if req.Streampath == "" { + if req.StreamPath == "" { resp.Code = 400 resp.Message = "流路径不能为空" return resp, nil } - // TODO: 实现倍速播放逻辑 + // 查找对应的dialog + dialog, ok := gb.dialogs.Find(func(d *Dialog) bool { + return d.pullCtx.StreamPath == req.StreamPath + }) + if !ok { + resp.Code = 404 + resp.Message = "未找到对应的回放会话" + return resp, nil + } + + // 构建RTSP SCALE消息内容 + content := strings.Builder{} + content.WriteString("PLAY RTSP/1.0\r\n") + content.WriteString(fmt.Sprintf("CSeq: %d\r\n", int(time.Now().UnixNano()/1e6%1000000))) + content.WriteString(fmt.Sprintf("Scale: %f\r\n", req.Speed)) + content.WriteString("Range: npt=now-\r\n") + + // 创建INFO请求 + request := sip.NewRequest(sip.INFO, dialog.session.InviteRequest.Recipient) + request.SetBody([]byte(content.String())) + contentType := sip.ContentTypeHeader("Application/MANSRTSP") + request.AppendHeader(&contentType) + + // 发送请求 + _, err := dialog.session.TransactionRequest(ctx, request) + + gb.Server.Streams.Call(func() error { + if s, ok := gb.Server.Streams.Get(req.StreamPath); ok { + s.Speed = float64(req.Speed) + s.Scale = float64(req.Speed) + s.Info("set stream speed", "speed", req.Speed) + } + return nil + }) + if err != nil { + resp.Code = 500 + resp.Message = fmt.Sprintf("发送倍速请求失败: %v", err) + return resp, nil + } gb.Info("倍速回放", - "streampath", req.Streampath, + "streampath", req.StreamPath, "speed", req.Speed) resp.Code = 0 diff --git a/plugin/gb28181/device.go b/plugin/gb28181/device.go index 76fa030..58e8921 100644 --- a/plugin/gb28181/device.go +++ b/plugin/gb28181/device.go @@ -88,8 +88,19 @@ func (d *Device) TableName() string { func (d *Device) Dispose() { if d.plugin.DB != nil { + d.Status = DeviceOfflineStatus d.plugin.DB.Save(d) + if d.channels.Length > 0 { + d.channels.Range(func(channel *Channel) bool { + d.plugin.DB.Model(&gb28181.DeviceChannel{}).Where("device_id = ? AND device_db_id = ?", channel.DeviceID, d.ID).Updates(channel.DeviceChannel) + return true + }) + } else { + // 如果没有通道,则直接更新通道状态为 OFF + d.plugin.DB.Model(&gb28181.DeviceChannel{}).Where("device_db_id = ?", d.ID).Update("status", "OFF") + } } + d.plugin.devices.Remove(d) } func (d *Device) GetKey() string { @@ -322,10 +333,28 @@ func (d *Device) Go() (err error) { subTick := time.NewTicker(time.Second * 3600) defer subTick.Stop() catalogTick := time.NewTicker(time.Second * 60) + keepaliveSeconds := 60 + if d.KeepaliveInterval >= 5 { + keepaliveSeconds = d.KeepaliveInterval + } + keepLiveTick := time.NewTicker(time.Second * 10) + defer keepLiveTick.Stop() defer catalogTick.Stop() for { select { case <-d.Done(): + case <-keepLiveTick.C: + if timeDiff := time.Since(d.KeepaliveTime); timeDiff > time.Duration(3*keepaliveSeconds)*time.Second { + d.Online = false + d.Status = DeviceOfflineStatus + // 设置所有通道状态为off + d.channels.Range(func(channel *Channel) bool { + channel.Status = "OFF" + return true + }) + d.Stop(fmt.Errorf("device keepalive timeout after %v", timeDiff)) + return + } case <-subTick.C: response, err = d.subscribeCatalog() if err != nil { diff --git a/plugin/gb28181/pb/gb28181.pb.go b/plugin/gb28181/pb/gb28181.pb.go index 34b1f9e..070c8b4 100644 --- a/plugin/gb28181/pb/gb28181.pb.go +++ b/plugin/gb28181/pb/gb28181.pb.go @@ -5682,7 +5682,7 @@ func (x *GroupChannelsResponse) GetList() []*GroupChannel { // 回放暂停请求 type PlaybackPauseRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径 + StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -5717,9 +5717,9 @@ func (*PlaybackPauseRequest) Descriptor() ([]byte, []int) { return file_gb28181_proto_rawDescGZIP(), []int{80} } -func (x *PlaybackPauseRequest) GetStreampath() string { +func (x *PlaybackPauseRequest) GetStreamPath() string { if x != nil { - return x.Streampath + return x.StreamPath } return "" } @@ -5727,7 +5727,7 @@ func (x *PlaybackPauseRequest) GetStreampath() string { // 回放恢复请求 type PlaybackResumeRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径 + StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -5762,9 +5762,9 @@ func (*PlaybackResumeRequest) Descriptor() ([]byte, []int) { return file_gb28181_proto_rawDescGZIP(), []int{81} } -func (x *PlaybackResumeRequest) GetStreampath() string { +func (x *PlaybackResumeRequest) GetStreamPath() string { if x != nil { - return x.Streampath + return x.StreamPath } return "" } @@ -5772,7 +5772,7 @@ func (x *PlaybackResumeRequest) GetStreampath() string { // 回放拖动播放请求 type PlaybackSeekRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径 + StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径 SeekTime int64 `protobuf:"varint,2,opt,name=seekTime,proto3" json:"seekTime,omitempty"` // 拖动偏移量,单位s unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -5808,9 +5808,9 @@ func (*PlaybackSeekRequest) Descriptor() ([]byte, []int) { return file_gb28181_proto_rawDescGZIP(), []int{82} } -func (x *PlaybackSeekRequest) GetStreampath() string { +func (x *PlaybackSeekRequest) GetStreamPath() string { if x != nil { - return x.Streampath + return x.StreamPath } return "" } @@ -5825,7 +5825,7 @@ func (x *PlaybackSeekRequest) GetSeekTime() int64 { // 回放倍速播放请求 type PlaybackSpeedRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - Streampath string `protobuf:"bytes,1,opt,name=streampath,proto3" json:"streampath,omitempty"` // 回放流路径 + StreamPath string `protobuf:"bytes,1,opt,name=streamPath,proto3" json:"streamPath,omitempty"` // 回放流路径 Speed float64 `protobuf:"fixed64,2,opt,name=speed,proto3" json:"speed,omitempty"` // 倍速0.25 0.5 1、2、4 unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache @@ -5861,9 +5861,9 @@ func (*PlaybackSpeedRequest) Descriptor() ([]byte, []int) { return file_gb28181_proto_rawDescGZIP(), []int{83} } -func (x *PlaybackSpeedRequest) GetStreampath() string { +func (x *PlaybackSpeedRequest) GetStreamPath() string { if x != nil { - return x.Streampath + return x.StreamPath } return "" } @@ -6662,20 +6662,20 @@ var file_gb28181_proto_rawDesc = string([]byte{ 0x6f, 0x75, 0x70, 0x43, 0x68, 0x61, 0x6e, 0x6e, 0x65, 0x6c, 0x52, 0x04, 0x6c, 0x69, 0x73, 0x74, 0x22, 0x36, 0x0a, 0x14, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x50, 0x61, 0x75, 0x73, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x22, 0x37, 0x0a, 0x15, 0x50, 0x6c, 0x61, 0x79, + 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x37, 0x0a, 0x15, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x75, 0x6d, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, + 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x22, 0x51, 0x0a, 0x13, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x53, 0x65, 0x65, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, - 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x65, 0x6b, + 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0a, 0x73, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x1a, 0x0a, 0x08, 0x73, 0x65, 0x65, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x08, 0x73, 0x65, 0x65, 0x6b, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x4c, 0x0a, 0x14, 0x50, 0x6c, 0x61, 0x79, 0x62, 0x61, 0x63, 0x6b, 0x53, 0x70, 0x65, 0x65, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1e, 0x0a, 0x0a, - 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x70, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05, + 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0a, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x50, 0x61, 0x74, 0x68, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x70, 0x65, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x01, 0x52, 0x05, 0x73, 0x70, 0x65, 0x65, 0x64, 0x32, 0x8b, 0x3d, 0x0a, 0x03, 0x61, 0x70, 0x69, 0x12, 0x5d, 0x0a, 0x04, 0x4c, 0x69, 0x73, 0x74, 0x12, 0x1d, 0x2e, 0x67, 0x62, 0x32, 0x38, 0x31, 0x38, 0x31, 0x70, 0x72, 0x6f, 0x2e, diff --git a/plugin/gb28181/pb/gb28181.proto b/plugin/gb28181/pb/gb28181.proto index fc0c96a..abce921 100644 --- a/plugin/gb28181/pb/gb28181.proto +++ b/plugin/gb28181/pb/gb28181.proto @@ -1088,22 +1088,22 @@ message GroupChannelsResponse { // 回放暂停请求 message PlaybackPauseRequest { - string streampath = 1; // 回放流路径 + string streamPath = 1; // 回放流路径 } // 回放恢复请求 message PlaybackResumeRequest { - string streampath = 1; // 回放流路径 + string streamPath = 1; // 回放流路径 } // 回放拖动播放请求 message PlaybackSeekRequest { - string streampath = 1; // 回放流路径 + string streamPath = 1; // 回放流路径 int64 seekTime = 2; // 拖动偏移量,单位s } // 回放倍速播放请求 message PlaybackSpeedRequest { - string streampath = 1; // 回放流路径 + string streamPath = 1; // 回放流路径 double speed = 2; // 倍速0.25 0.5 1、2、4 } \ No newline at end of file