feat: listener before ack,download progress

1.listener before ack
2.use ps.pts to update download progress
This commit is contained in:
pggiroro
2025-11-17 21:45:53 +08:00
parent cca64aeb99
commit 1780dde594
14 changed files with 734 additions and 346 deletions

View File

@@ -43,6 +43,7 @@ type MpegPsDemuxer struct {
Publisher *m7s.Publisher
Allocator *gomem.ScalableMemoryAllocator
writer m7s.PublishWriter[*format.Mpeg2Audio, *format.AnnexB]
OnVideoPtsUpdate func(pts uint64) // 视频PTS更新回调
}
func (s *MpegPsDemuxer) Feed(reader *util.BufReader) (err error) {
@@ -104,6 +105,10 @@ func (s *MpegPsDemuxer) Feed(reader *util.BufReader) (err error) {
pes.SetDTS(time.Duration(pesHeader.Dts))
pes.SetPTS(time.Duration(pesHeader.Pts))
lastVideoPts = pesHeader.Pts
// 触发PTS更新回调
if s.OnVideoPtsUpdate != nil {
s.OnVideoPtsUpdate(pesHeader.Pts)
}
}
annexb := s.Allocator.Malloc(reader.Length)
reader.Read(annexb)

View File

@@ -13,6 +13,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"gorm.io/gorm"
"m7s.live/v5"
"m7s.live/v5/pkg/util"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -137,6 +138,7 @@ func (gb *GB28181Plugin) List(ctx context.Context, req *pb.GetDevicesRequest) (*
SubscribeCatalog: util.Conditional(d.SubscribeCatalog == 0, false, true),
SubscribePosition: util.Conditional(d.SubscribePosition == 0, false, true),
SubscribeAlarm: util.Conditional(d.SubscribeAlarm == 0, false, true),
Charset: d.Charset,
})
}
@@ -484,6 +486,9 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
if req.StreamMode != "" {
d.StreamMode = mrtp.StreamMode(req.StreamMode)
}
if req.Charset != "" {
d.Charset = req.Charset
}
if req.Password != "" {
d.Password = req.Password
}
@@ -507,6 +512,12 @@ func (gb *GB28181Plugin) UpdateDevice(ctx context.Context, req *pb.Device) (*pb.
} else {
d.SubscribeAlarm = 0 // 不订阅
}
if req.BroadcastPushAfterAck {
d.BroadcastPushAfterAck = req.BroadcastPushAfterAck
} else {
d.BroadcastPushAfterAck = false
}
d.UpdateTime = time.Now()
// 先停止设备任务
@@ -3392,13 +3403,47 @@ func (gb *GB28181Plugin) StartDownload(ctx context.Context, req *pb.StartDownloa
return resp, nil
}
// 4. 生成下载任务ID
downloadId := fmt.Sprintf("%d_%d_%s_%s", startTime.Unix(), endTime.Unix(), req.DeviceId, req.ChannelId)
// 4. 生成下载任务ID(复合键)
downloadId := fmt.Sprintf("%s_%s_%d_%d", req.DeviceId, req.ChannelId, startTime.Unix(), endTime.Unix())
// 5. 检查任务是否已存在
// 5. 优先从缓存表查询已完成的下载
if gb.DB != nil {
var cachedRecord gb28181.GB28181Record
if err := gb.DB.Where("download_id = ? AND status = ?", downloadId, "completed").First(&cachedRecord).Error; err == nil {
// 检查文件是否存在
if _, err := os.Stat(cachedRecord.FilePath); err == nil {
// 生成下载 URL
downloadUrl := fmt.Sprintf("/gb28181/download?downloadId=%s", downloadId)
gb.Info("从缓存返回已下载的录像",
"downloadId", downloadId,
"filePath", cachedRecord.FilePath,
"downloadUrl", downloadUrl)
resp.Code = 0
resp.Message = "录像已存在(来自缓存)"
resp.Total = 0
resp.Data = &pb.StartDownloadData{
DownloadId: downloadId,
Status: "completed",
DownloadUrl: downloadUrl,
}
return resp, nil
} else {
// 文件不存在删除缓存记录和RecordStream记录
gb.DB.Delete(&cachedRecord)
// 同时删除MP4插件的RecordStream记录通过FilePath
gb.DB.Where("file_path = ?", cachedRecord.FilePath).Delete(&m7s.RecordStream{})
gb.Warn("缓存记录的文件不存在已删除缓存和RecordStream记录",
"downloadId", downloadId,
"filePath", cachedRecord.FilePath)
}
}
}
// 6. 检查正在进行的下载任务
if existingDialog, exists := gb.downloadDialogs.Get(downloadId); exists {
resp.Code = 200
resp.Message = "下载任务已存在"
resp.Message = "下载任务正在进行中"
resp.Total = 0
resp.Data = &pb.StartDownloadData{
DownloadId: downloadId,
@@ -3408,14 +3453,27 @@ func (gb *GB28181Plugin) StartDownload(ctx context.Context, req *pb.StartDownloa
return resp, nil
}
// 6. 下载链接将在录制开始后动态生成
// 7. 检查已完成的下载任务(内存缓存)
if completedDialog, exists := gb.completedDownloads.Get(downloadId); exists {
resp.Code = 0
resp.Message = "下载任务已完成"
resp.Total = 0
resp.Data = &pb.StartDownloadData{
DownloadId: downloadId,
Status: completedDialog.Status,
DownloadUrl: completedDialog.DownloadUrl,
}
return resp, nil
}
// 8. 下载链接将在录制开始后动态生成
// 初始为空,等进度更新时从数据库查询后填充
downloadUrl := ""
// 7. 创建下载对话
// 9. 创建下载对话
downloadSpeed := int(req.DownloadSpeed)
if downloadSpeed <= 0 || downloadSpeed > 4 {
downloadSpeed = 1 // 默认1倍速,避免丢帧
downloadSpeed = 4 // 默认4倍速,避免丢帧
}
dialog := &DownloadDialog{
@@ -3432,7 +3490,7 @@ func (gb *GB28181Plugin) StartDownload(ctx context.Context, req *pb.StartDownloa
}
dialog.Task.Context = ctx
// 8. 添加到下载对话集合(会自动调用 Start 方法)
// 10. 添加到下载对话集合(会自动调用 Start 方法)
gb.downloadDialogs.AddTask(dialog)
resp.Code = 0
@@ -3460,9 +3518,29 @@ func (gb *GB28181Plugin) GetDownloadProgress(ctx context.Context, req *pb.GetDow
// 2. 查询任务
dialog, exists := gb.downloadDialogs.Get(req.DownloadId)
if !exists {
resp.Code = 404
resp.Message = "下载任务不存在"
return resp, nil
completedDialog, exists := gb.completedDownloads.Get(req.DownloadId)
if exists {
resp.Code = 0
resp.Message = "success"
resp.Total = 0
resp.Data = &pb.DownloadProgressData{
DownloadId: completedDialog.DownloadId,
Status: completedDialog.Status,
Progress: int32(completedDialog.Progress),
FilePath: completedDialog.FilePath,
DownloadUrl: completedDialog.DownloadUrl,
Error: completedDialog.Error,
StartedAt: timestamppb.New(completedDialog.StartedAt),
}
if !completedDialog.CompletedAt.IsZero() {
resp.Data.CompletedAt = timestamppb.New(completedDialog.CompletedAt)
}
return resp, nil
} else {
resp.Code = 404
resp.Message = "下载任务不存在"
return resp, nil
}
}
// 3. 构建响应
@@ -3470,15 +3548,13 @@ func (gb *GB28181Plugin) GetDownloadProgress(ctx context.Context, req *pb.GetDow
resp.Message = "success"
resp.Total = 0
resp.Data = &pb.DownloadProgressData{
DownloadId: dialog.DownloadId,
Status: dialog.Status,
Progress: int32(dialog.Progress),
FilePath: dialog.FilePath,
DownloadUrl: dialog.DownloadUrl,
Error: dialog.Error,
DownloadedBytes: dialog.DownloadedBytes,
TotalBytes: dialog.TotalBytes,
StartedAt: timestamppb.New(dialog.StartedAt),
DownloadId: dialog.DownloadId,
Status: dialog.Status,
Progress: int32(dialog.Progress),
FilePath: dialog.FilePath,
DownloadUrl: dialog.DownloadUrl,
Error: dialog.Error,
StartedAt: timestamppb.New(dialog.StartedAt),
}
if !dialog.CompletedAt.IsZero() {
resp.Data.CompletedAt = timestamppb.New(dialog.CompletedAt)

View File

@@ -49,10 +49,12 @@ func (d *DeviceKeepaliveTickTask) Tick(any) {
keepaliveSeconds = d.device.KeepaliveInterval
}
if timeDiff := time.Since(d.device.KeepaliveTime); timeDiff > time.Duration(d.device.KeepaliveCount*keepaliveSeconds)*time.Second {
d.device.Debug("keeplive time out", "timediff", timeDiff, "currettime", time.Now(), "d.device.KeepaliveTime", d.device.KeepaliveTime, "timeout time", time.Duration(d.device.KeepaliveCount*keepaliveSeconds)*time.Second)
d.device.Online = false
d.device.Status = DeviceOfflineStatus
// 设置所有通道状态为off
d.device.channels.Range(func(channel *Channel) bool {
d.device.Debug("keeplive time out", "timediff", timeDiff, "offline channeid", channel.ChannelId)
channel.Status = "OFF"
return true
})
@@ -114,6 +116,7 @@ type Device struct {
CatalogSubscribeTask *CatalogSubscribeTask `gorm:"-:all"`
PositionSubscribeTask *PositionSubscribeTask `gorm:"-:all"`
AlarmSubscribeTask *AlarmSubscribeTask `gorm:"-:all"`
Cataloging bool `gorm:"-:all" default:"false"`
}
func (d *Device) TableName() string {
@@ -201,6 +204,7 @@ type catalogHandlerTask struct {
func (c *catalogHandlerTask) Run() (err error) {
// 处理目录信息
d := c.d
d.Cataloging = true
msg := c.msg
catalogReq, exists := d.catalogReqs.Get(msg.SN)
if !exists {
@@ -250,6 +254,7 @@ func (c *catalogHandlerTask) Run() (err error) {
if catalogReq.IsComplete() {
catalogReq.Resolve()
d.catalogReqs.RemoveByKey(msg.SN)
d.Cataloging = false
}
return
}

View File

@@ -107,7 +107,7 @@ func (d *Dialog) Start() (err error) {
sss := strings.Split(d.pullCtx.RemoteURL, "/")
if len(sss) < 2 {
d.Info("remote url is invalid", d.pullCtx.RemoteURL)
d.Channel.Device.Info("remote url is invalid", d.pullCtx.RemoteURL)
d.pullCtx.Fail("remote url is invalid")
return
}
@@ -125,11 +125,11 @@ func (d *Dialog) Start() (err error) {
d.Channel = channel
} else {
d.pullCtx.Fail(fmt.Sprintf("channel %s not found", channelId))
return fmt.Errorf("channel %s not found", channelId)
return errors.Join(fmt.Errorf("channel %s not found", channelId))
}
} else {
d.pullCtx.Fail(fmt.Sprintf("device %s not found", deviceId))
return fmt.Errorf("device %s not found", deviceId)
return errors.Join(fmt.Errorf("device %s not found", deviceId))
}
d.pullCtx.GoToStepConst(StepSIPPrepare)
@@ -145,7 +145,7 @@ func (d *Dialog) Start() (err error) {
d.MediaPort, ok = d.gb.tcpPB.Allocate()
if !ok {
d.pullCtx.Fail("no available tcp port")
return fmt.Errorf("no available tcp port")
return errors.Join(fmt.Errorf("no available tcp port"))
}
} else {
d.MediaPort = d.gb.MediaPort[0]
@@ -171,7 +171,6 @@ func (d *Dialog) Start() (err error) {
d.pullCtx.GoToStepConst(StepSDPBuild)
ssrc := d.CreateSSRC(d.gb.Serial)
d.Info("MediaIp is ", device.MediaIp)
// 构建 SDP 内容
sdpInfo := []string{
@@ -278,14 +277,8 @@ func (d *Dialog) Start() (err error) {
Params: sip.NewParams(),
}
fromHDR.Params.Add("tag", sip.GenerateTagN(32))
// 输出设备Transport信息用于调试
d.Info("准备发送INVITE", "deviceId", device.DeviceId, "device.Transport", device.Transport, "device.SipIp", device.SipIp, "device.LocalPort", device.LocalPort)
dialogClientCache := sipgo.NewDialogClientCache(device.client, contactHDR)
// 创建会话不传递Via头部让Client自动创建
d.Info("start to invite", "recipient:", recipient, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:",
device.contactHDR, "contactHDR:", contactHDR, "sdpInfo:", strings.Join(sdpInfo, "\r\n")+"\r\n")
d.pullCtx.GoToStepConst(StepInviteSend)
@@ -300,80 +293,53 @@ func (d *Dialog) Start() (err error) {
Params: sip.HeaderParams(sip.NewParams()),
}
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
d.Info("发送INVITE使用Transport", "transport", device.Transport, "via", viaHeader)
d.Info("start to invite", "recipient:", recipient, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:",
device.contactHDR, "contactHDR:", contactHDR, "sdpInfo:", strings.Join(sdpInfo, "|||"), "viaHeader:", viaHeader, "transport", device.Transport)
// Via头部必须是第一个参数这样即使用AppendHeaderVia也会在最前面
// 这样Client检查req.Via()时就能找到我们的Via头部不会再创建默认的UDP Via
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
d.session, err = dialogClientCache.Invite(d, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
// 最后添加Content-Length头部
if err != nil {
d.pullCtx.Fail("dialog invite error: " + err.Error())
return errors.New("dialog invite error" + err.Error())
return errors.Join(fmt.Errorf("dialog invite error:%s", err.Error()))
}
d.SetDescriptions(task.Description{
"streamPath": d.StreamPath,
"streamMode": d.StreamMode,
"mediaPort": d.MediaPort,
"mediaIP": device.MediaIp,
"sipIP": device.SipIp,
"transport": device.Transport,
"ssrc": ssrc,
"callID": customCallID,
"deviceID": device.DeviceId,
"channelID": channelId,
"deviceIP": device.IP,
"devicePort": device.Port,
"localPort": device.LocalPort,
"startTime": time.Now(),
"from": fromHDR.Address.String(),
"to": toHeader.Address.String(),
"contact": contactHDR.Address.String(),
"subject": fmt.Sprintf("%s:%s,%s:0", channelId, ssrc, d.gb.Serial),
"recipient": recipient.String(),
"sdp": strings.Join(sdpInfo, "\r\n"),
"via": viaHeader.String(),
"viaBranch": func() string { v, _ := viaHeader.Params.Get("branch"); return v }(),
"broadcastPushAfterAck": device.BroadcastPushAfterAck,
})
d.pullCtx.GoToStepConst(StepResponseWait)
return
}
func (d *Dialog) Run() (err error) {
d.Info("before WaitAnswer")
err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{})
d.Info("after WaitAnswer")
if err != nil {
d.pullCtx.Fail("等待响应错误: " + err.Error())
return errors.New("wait answer error" + err.Error())
}
inviteResponseBody := string(d.session.InviteResponse.Body())
d.Info("inviteResponse", "body", inviteResponseBody)
ds := strings.Split(inviteResponseBody, "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
switch ls[0] {
case "y":
if len(ls[1]) > 0 {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
d.SSRC = uint32(_ssrc)
} else {
d.pullCtx.Fail("解析邀请响应y字段错误: " + err.Error())
return errors.New("read invite respose y error" + err.Error())
}
}
case "c":
// 解析 c=IN IP4 xxx.xxx.xxx.xxx 格式
parts := strings.Split(ls[1], " ")
if len(parts) >= 3 {
d.targetIP = parts[len(parts)-1]
}
case "m":
// 解析 m=video port xxx 格式
parts := strings.Split(ls[1], " ")
if len(parts) >= 2 {
if port, err := strconv.Atoi(parts[1]); err == nil {
d.targetPort = port
}
}
}
}
}
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
}
// 移动到流数据接收步骤
d.pullCtx.GoToStepConst(pkg.StepStreaming)
var pub mrtp.PSReceiver
pub.Publisher = d.pullCtx.Publisher
// setupReceiver 配置 PSReceiver 的网络参数(单端口模式、监听地址等)
func (d *Dialog) setupReceiver(pub *mrtp.PSReceiver) {
switch d.StreamMode {
case mrtp.StreamModeTCPActive:
pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort)
// 创建一个可取消的上下文
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 1),
@@ -413,8 +379,110 @@ func (d *Dialog) Run() (err error) {
pub.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
pub.StreamMode = d.StreamMode
}
err = d.session.Ack(d.gb)
func (d *Dialog) Run() (err error) {
var pub mrtp.PSReceiver
pub.Publisher = d.pullCtx.Publisher
// 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要)
if !d.Channel.Device.BroadcastPushAfterAck {
d.Channel.Device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.setupReceiver(&pub)
// 提前启动监听器
err = pub.Receiver.Start()
if err != nil {
d.Error("start listener before WaitAnswer failed", "err", err)
return err
}
}
d.Channel.Device.Info("before WaitAnswer")
err = d.session.WaitAnswer(d, sipgo.AnswerOptions{})
d.Channel.Device.Info("after WaitAnswer")
if err != nil {
d.pullCtx.Fail("等待响应错误: " + err.Error())
return errors.Join(errors.New("wait answer error"), err)
}
inviteResponseBody := string(d.session.InviteResponse.Body())
d.Channel.Device.Info("inviteResponse", "body", inviteResponseBody)
// 添加响应信息到 Description
d.SetDescriptions(task.Description{
"responseStatus": d.session.InviteResponse.StatusCode,
"responseReason": d.session.InviteResponse.Reason,
"responseSDP": inviteResponseBody,
"responseContact": func() string {
if c := d.session.InviteResponse.Contact(); c != nil {
return c.Address.String()
}
return ""
}(),
})
ds := strings.Split(inviteResponseBody, "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
switch ls[0] {
case "y":
if len(ls[1]) > 0 {
if _ssrc, err := strconv.ParseInt(ls[1], 10, 0); err == nil {
d.SSRC = uint32(_ssrc)
} else {
d.pullCtx.Fail("解析邀请响应y字段错误: " + err.Error())
return errors.New("read invite respose y error" + err.Error())
}
}
case "c":
// 解析 c=IN IP4 xxx.xxx.xxx.xxx 格式
parts := strings.Split(ls[1], " ")
if len(parts) >= 3 {
d.targetIP = parts[len(parts)-1]
}
case "m":
// 解析 m=video port xxx 格式
parts := strings.Split(ls[1], " ")
if len(parts) >= 2 {
if port, err := strconv.Atoi(parts[1]); err == nil {
d.targetPort = port
}
}
}
}
}
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
}
// 添加解析后的响应参数到 Description
d.SetDescriptions(task.Description{
"responseSSRC": d.SSRC,
"responseTargetIP": d.targetIP,
"responseTargetPort": d.targetPort,
})
// 移动到流数据接收步骤
d.pullCtx.GoToStepConst(pkg.StepStreaming)
// 设置允许连接的IP地址从INVITE响应中解析
pub.Receiver.AllowedIP = d.targetIP
d.Channel.Device.Info("set allowed IP for receiver", "allowedIP", d.targetIP)
// TCP-ACTIVE 模式需要在解析 targetIP 后设置连接地址
if d.StreamMode == mrtp.StreamModeTCPActive {
pub.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
d.Channel.Device.Info("set TCP-ACTIVE connect address", "addr", pub.ListenAddr)
}
// 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置
if d.Channel.Device.BroadcastPushAfterAck {
d.Channel.Device.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.setupReceiver(&pub)
}
err = d.session.Ack(d)
if err != nil {
d.Error("ack session err", err)
}
@@ -427,27 +495,31 @@ func (d *Dialog) GetKey() string {
}
func (d *Dialog) Dispose() {
switch d.StreamMode {
case mrtp.StreamModeUDP:
if d.gb.udpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.udpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "udp")
go func() {
time.Sleep(90 * time.Second)
switch d.StreamMode {
case mrtp.StreamModeUDP:
if d.gb.udpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.udpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "udp")
}
}
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.tcpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp")
}
}
}
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.tcpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp")
}
}
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
d.Info("listener port release", "port", d.MediaPort)
}()
d.Info("dialog dispose", "ssrc", d.SSRC, "listener", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
if d.session != nil && d.session.InviteResponse != nil {
err := d.session.Bye(d)
if err != nil {
d.Error("dialog bye bye err", err)
d.Error("listener dialog bye bye", " err", err)
}
}
}

View File

@@ -5,31 +5,53 @@ import (
"os"
"path/filepath"
"strconv"
"strings"
gb28181 "m7s.live/v5/plugin/gb28181/pkg"
)
// handleDownloadFile 处理文件下载请求
// URL: /gb28181/download?downloadId=xxx
func (gb *GB28181Plugin) handleDownloadFile(w http.ResponseWriter, r *http.Request) {
// 从 URL 路径中提取参数
// 路径格式:/gb28181/download/{deviceId}/{channelId}/{filename}
pathParts := strings.Split(strings.TrimPrefix(r.URL.Path, "/"), "/")
if len(pathParts) < 3 {
http.Error(w, "Invalid path", http.StatusBadRequest)
// 获取 downloadId 参数
downloadId := r.URL.Query().Get("downloadId")
if downloadId == "" {
http.Error(w, "downloadId parameter is required", http.StatusBadRequest)
return
}
deviceId := pathParts[len(pathParts)-3]
channelId := pathParts[len(pathParts)-2]
filename := pathParts[len(pathParts)-1]
// 验证文件名格式(防止路径遍历攻击)
if strings.Contains(filename, "..") || strings.Contains(filename, "/") {
http.Error(w, "Invalid filename", http.StatusBadRequest)
// 检查数据库
if gb.DB == nil {
http.Error(w, "Database not available", http.StatusInternalServerError)
gb.Error("数据库未初始化")
return
}
// 构建文件路径
filePath := filepath.Join("download", deviceId, channelId, filename)
// 从 gb28181_record 表查询文件路径
var record gb28181.GB28181Record
if err := gb.DB.Where("download_id = ? AND status = ?", downloadId, "completed").First(&record).Error; err != nil {
// 检查是否是正在进行的下载任务
if dialog, exists := gb.downloadDialogs.Get(downloadId); exists {
http.Error(w, "Download in progress", http.StatusAccepted)
gb.Info("下载任务进行中",
"downloadId", downloadId,
"status", dialog.Status,
"progress", dialog.Progress)
return
}
http.Error(w, "Download record not found or not completed", http.StatusNotFound)
gb.Warn("下载记录不存在或未完成",
"downloadId", downloadId,
"error", err)
return
}
filePath := record.FilePath
filename := filepath.Base(filePath)
gb.Info("从缓存记录获取文件路径",
"downloadId", downloadId,
"filePath", filePath)
// 检查文件是否存在
fileInfo, err := os.Stat(filePath)
@@ -68,9 +90,8 @@ func (gb *GB28181Plugin) handleDownloadFile(w http.ResponseWriter, r *http.Reque
http.ServeContent(w, r, filename, fileInfo.ModTime(), file)
gb.Info("文件下载",
"deviceId", deviceId,
"channelId", channelId,
"filename", filename,
"filePath", filePath,
"size", fileInfo.Size(),
"remote", r.RemoteAddr)
}

View File

@@ -1,9 +1,9 @@
package plugin_gb28181pro
import (
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strconv"
"strings"
@@ -28,7 +28,6 @@ type DownloadDialog struct {
device *Device
channel *Channel
MediaPort uint16
SSRC uint32
targetIP string
targetPort uint16
// 任务信息
@@ -39,15 +38,79 @@ type DownloadDialog struct {
EndTime time.Time
DownloadSpeed int // 下载速度倍数1-4倍默认1倍
// 状态信息
Status string // pending/downloading/completed/failed
Progress int // 0-100
FilePath string
DownloadUrl string // 下载链接
Error string
DownloadedBytes int64
TotalBytes int64
StartedAt time.Time
CompletedAt time.Time
Status string // pending/downloading/completed/failed
Progress int // 0-100
FilePath string
DownloadUrl string // 下载链接
Error string
StartedAt time.Time
CompletedAt time.Time
}
// CompletedDownloadDialog 用于缓存已完成下载任务的最终结果
// 与 DownloadDialog 生命周期解耦,仅保留前端查询所需字段
type CompletedDownloadDialog struct {
DownloadId string
DeviceId string
ChannelId string
Status string
Progress int
FilePath string
DownloadUrl string
Error string
StartedAt time.Time
CompletedAt time.Time
}
func (d *CompletedDownloadDialog) GetKey() string {
return d.DownloadId
}
// setupReceiver 配置 PSReceiver 的网络参数(单端口模式、监听地址等)
func (d *DownloadDialog) setupReceiver(ps *mrtp.PSReceiver) {
switch d.device.StreamMode {
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
// 单端口模式
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 1),
Context: d,
}
var loaded bool
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
if loaded {
reader.Context = d
}
ps.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
if d.gb.udpPort > 0 {
// 单端口模式
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 100),
Context: d,
}
var loaded bool
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
if loaded {
reader.Context = d
}
ps.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
ps.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
ps.StreamMode = d.device.StreamMode
}
// GetKey 返回下载任务的唯一标识
@@ -56,7 +119,7 @@ func (d *DownloadDialog) GetKey() string {
}
// Start 启动下载会话
func (d *DownloadDialog) Start() error {
func (d *DownloadDialog) Start() (err error) {
// 更新状态
d.Status = "downloading"
d.StartedAt = time.Now()
@@ -66,7 +129,7 @@ func (d *DownloadDialog) Start() error {
if !ok {
d.Status = "failed"
d.Error = fmt.Sprintf("设备不存在: %s", d.DeviceId)
return fmt.Errorf(d.Error)
return errors.Join(fmt.Errorf("device not found"), errors.New(d.Error))
}
d.device = device
@@ -75,7 +138,7 @@ func (d *DownloadDialog) Start() error {
if !ok {
d.Status = "failed"
d.Error = fmt.Sprintf("通道不存在: %s", d.ChannelId)
return fmt.Errorf(d.Error)
return errors.Join(fmt.Errorf("channel not found"), errors.New(d.Error))
}
d.channel = channel
@@ -89,7 +152,7 @@ func (d *DownloadDialog) Start() error {
var ok bool
d.MediaPort, ok = d.gb.tcpPB.Allocate()
if !ok {
return fmt.Errorf("no available tcp port")
return errors.Join(fmt.Errorf("no available tcp port"))
}
} else {
d.MediaPort = d.gb.MediaPort[0]
@@ -103,7 +166,7 @@ func (d *DownloadDialog) Start() error {
var ok bool
d.MediaPort, ok = d.gb.udpPB.Allocate()
if !ok {
return fmt.Errorf("no available udp port")
return errors.Join(fmt.Errorf("no available udp port"))
}
} else {
d.MediaPort = d.gb.MediaPort[0]
@@ -112,7 +175,7 @@ func (d *DownloadDialog) Start() error {
}
// 3. 生成 SSRC
d.SSRC = device.CreateSSRC(d.gb.Serial)
ssrc := d.CreateSSRC(d.gb.Serial)
// 4. 构建 SDP
startTimestamp := d.StartTime.Unix()
@@ -120,7 +183,7 @@ func (d *DownloadDialog) Start() error {
sdpInfo := []string{
"v=0",
fmt.Sprintf("o=%s 0 0 IN IP4 %s", device.DeviceId, device.MediaIp),
fmt.Sprintf("o=%s 0 0 IN IP4 %s", d.ChannelId, device.MediaIp),
"s=Download", // 下载模式
fmt.Sprintf("u=%s:0", d.ChannelId),
"c=IN IP4 " + device.MediaIp,
@@ -162,19 +225,41 @@ func (d *DownloadDialog) Start() error {
sdpInfo = append(sdpInfo, fmt.Sprintf("a=downloadspeed:%d", downloadSpeed))
// 添加 SSRC
ssrcStr := strconv.FormatUint(uint64(d.SSRC), 10)
sdpInfo = append(sdpInfo, fmt.Sprintf("y=%s", ssrcStr))
// 5. 创建 INVITE 请求
request := sip.NewRequest(sip.INVITE, sip.Uri{User: d.ChannelId, Host: device.IP})
subject := fmt.Sprintf("%s:%s,%s:0", d.ChannelId, ssrcStr, device.DeviceId)
sdpInfo = append(sdpInfo, fmt.Sprintf("y=%s", ssrc))
// 创建 INVITE 请求
recipient := sip.Uri{
Host: device.IP,
Port: device.Port,
User: d.ChannelId,
}
// 设置必需的头部
contentTypeHeader := sip.ContentTypeHeader("APPLICATION/SDP")
subjectHeader := sip.NewHeader("Subject", subject)
request.SetBody([]byte(strings.Join(sdpInfo, "\r\n") + "\r\n"))
subjectHeader := sip.NewHeader("Subject", fmt.Sprintf("%s:%s,%s:0", d.ChannelId, ssrc, d.gb.Serial))
//allowHeader := sip.NewHeader("Allow", "INVITE, ACK, CANCEL, REGISTER, MESSAGE, NOTIFY, BYE")
//Toheader里需要放入目录通道的id
toHeader := sip.ToHeader{
Address: sip.Uri{User: d.ChannelId, Host: d.ChannelId[0:10]},
}
userAgentHeader := sip.NewHeader("User-Agent", "M7S/"+m7s.Version)
recipient := device.Recipient
recipient.User = d.ChannelId
//customCallID := fmt.Sprintf("%s-%s-%d@%s", device.DeviceId, channelId, time.Now().Unix(), device.SipIp)
customCallID := fmt.Sprintf("%s@%s", GenerateCallID(32), device.MediaIp)
callID := sip.CallIDHeader(customCallID)
maxforward := sip.MaxForwardsHeader(70)
//contentLengthHeader := sip.ContentLengthHeader(len(strings.Join(sdpInfo, "\r\n") + "\r\n"))
csqHeader := sip.CSeqHeader{
SeqNo: uint32(device.SN),
MethodName: "INVITE",
}
//request.AppendHeader(&contentLengthHeader)
contactHDR := sip.ContactHeader{
Address: sip.Uri{
User: d.gb.Serial,
Host: device.MediaIp,
Port: device.LocalPort,
},
}
fromHDR := sip.FromHeader{
Address: sip.Uri{
@@ -184,43 +269,96 @@ func (d *DownloadDialog) Start() error {
},
Params: sip.NewParams(),
}
toHeader := sip.ToHeader{
Address: sip.Uri{User: d.ChannelId, Host: d.ChannelId[0:10]},
fromHDR.Params.Add("tag", sip.GenerateTagN(32))
dialogClientCache := sipgo.NewDialogClientCache(device.client, contactHDR)
// 创建Via头部使用设备的Transport协议
// Via头部必须放在第一个位置这样AppendHeader时Via会在最前面
viaHeader := &sip.ViaHeader{
ProtocolName: "SIP",
ProtocolVersion: "2.0",
Transport: device.Transport, // 使用设备注册时的Transport
Host: device.SipIp,
Port: device.LocalPort,
Params: sip.HeaderParams(sip.NewParams()),
}
fromHDR.Params.Add("tag", sip.GenerateTagN(16))
viaHeader.Params.Add("branch", sip.GenerateBranchN(16))
// 6. 创建会话并发送 INVITE
dialogClientCache := sipgo.NewDialogClientCache(device.client, device.contactHDR)
d.gb.Info("发送 INVITE 请求",
"deviceId", d.DeviceId,
"channelId", d.ChannelId,
"startTime", d.StartTime,
"endTime", d.EndTime,
"ssrc", ssrcStr)
session, err := dialogClientCache.Invite(d.gb, recipient, request.Body(), &fromHDR, &toHeader, subjectHeader, &contentTypeHeader)
d.Info("start to invite", "recipient:", recipient, " fromHDR:", fromHDR, " toHeader:", toHeader, " device.contactHDR:",
device.contactHDR, "contactHDR:", contactHDR, "sdpInfo:", strings.Join(sdpInfo, "|||"), "viaHeader:", viaHeader, "transport", device.Transport)
// Via头部必须是第一个参数这样即使用AppendHeaderVia也会在最前面
// 这样Client检查req.Via()时就能找到我们的Via头部不会再创建默认的UDP Via
d.session, err = dialogClientCache.Invite(d, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &callID, &csqHeader, &fromHDR, &toHeader, &maxforward, userAgentHeader, subjectHeader, &contentTypeHeader)
if err != nil {
return fmt.Errorf("发送 INVITE 失败: %w", err)
}
d.session = session
d.SetDescriptions(task.Description{
"streamPath": d.StreamPath,
"streamMode": device.StreamMode,
"mediaPort": d.MediaPort,
"mediaIP": device.MediaIp,
"sipIP": device.SipIp,
"transport": device.Transport,
"ssrc": ssrc,
"callID": d.session.InviteRequest.CallID().Value(),
"deviceID": device.DeviceId,
"channelID": d.ChannelId,
"deviceIP": device.IP,
"devicePort": device.Port,
"localPort": device.LocalPort,
"startTime": time.Now(),
"from": fromHDR.Address.String(),
"to": toHeader.Address.String(),
"subject": fmt.Sprintf("%s:%s,%s:0", d.ChannelId, ssrc, d.gb.Serial),
"recipient": recipient.String(),
"sdp": strings.Join(sdpInfo, "\r\n"),
"viaBranch": func() string { v, _ := viaHeader.Params.Get("branch"); return v }(),
"broadcastPushAfterAck": device.BroadcastPushAfterAck,
})
return nil
}
// Go 运行下载会话(异步执行,支持并发)
func (d *DownloadDialog) Go() error {
// 1. 等待 200 OK 响应
err := d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{})
var psReceiver mrtp.PSReceiver
// 如果不是 BroadcastPushAfterAck 模式,提前创建监听器(多端口模式需要)
if !d.device.BroadcastPushAfterAck {
d.device.Info("creating listener before WaitAnswer", "broadcastPushAfterAck", false, "addr", d.MediaPort)
d.setupReceiver(&psReceiver)
// 提前启动监听器
if err := psReceiver.Receiver.Start(); err != nil {
d.device.Error("start listener before WaitAnswer failed", "err", err)
return err
}
}
d.device.Info("before WaitAnswer")
err := d.session.WaitAnswer(d, sipgo.AnswerOptions{})
d.device.Info("after WaitAnswer")
if err != nil {
d.Status = "failed"
d.Error = fmt.Sprintf("等待响应失败: %v", err)
return fmt.Errorf("等待响应失败: %w", err)
return errors.Join(errors.New("wait answer error"), err)
}
// 2. 解析响应
// 解析响应
inviteResponseBody := string(d.session.InviteResponse.Body())
d.gb.Info("收到 INVITE 响应", "body", inviteResponseBody)
d.device.Info("收到 INVITE 响应", "body", inviteResponseBody)
// 添加响应信息到 Description
d.SetDescriptions(task.Description{
"responseStatus": d.session.InviteResponse.StatusCode,
"responseReason": d.session.InviteResponse.Reason,
"responseSDP": inviteResponseBody,
"responseContact": func() string {
if c := d.session.InviteResponse.Contact(); c != nil {
return c.Address.String()
}
return ""
}(),
})
ds := strings.Split(inviteResponseBody, "\r\n")
for _, l := range ds {
if ls := strings.Split(l, "="); len(ls) > 1 {
@@ -250,13 +388,24 @@ func (d *DownloadDialog) Go() error {
}
}
}
// invite响应里的contact是域名的话sip尝试去解析可能失败这时候用invite请求里的recipient
if d.session.InviteResponse.Contact() != nil {
if &d.session.InviteRequest.Recipient != &d.session.InviteResponse.Contact().Address {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
}
// 3. 发送 ACK
err = d.session.Ack(d.gb)
// 如果是 BroadcastPushAfterAck 模式,在 Ack 后创建监听器配置
if d.device.BroadcastPushAfterAck {
d.device.Info("setup receiver after Ack", "broadcastPushAfterAck", true)
d.setupReceiver(&psReceiver)
}
// 发送 ACK
err = d.session.Ack(d)
if err != nil {
d.Status = "failed"
d.Error = fmt.Sprintf("发送 ACK 失败: %v", err)
return fmt.Errorf("发送 ACK 失败: %w", err)
// 与 dialog.Run 保持一致,仅记录错误,不直接 panic
d.device.Error("ack session", "err", err)
}
d.gb.Info("下载会话已建立",
@@ -264,7 +413,7 @@ func (d *DownloadDialog) Go() error {
"targetIP", d.targetIP,
"targetPort", d.targetPort)
// 4. 使用简洁的流路径格式
// 使用简洁的流路径格式
// 格式:{设备ID}/{通道ID}
streamPath := fmt.Sprintf("%s%s/%s/%s", "gbdownload_", time.Now().Local().Format("20060102150405"), d.DeviceId, d.ChannelId)
@@ -284,8 +433,7 @@ func (d *DownloadDialog) Go() error {
return fmt.Errorf("创建 Publisher 失败: %w", err)
}
// 6. 创建 PSReceiver 接收 RTP 并解析 PS 流
var psReceiver mrtp.PSReceiver
// 6. 绑定 Publisher 到 PSReceiver并监听 Publisher 停止事件
psReceiver.Publisher = publisher
// 监听 Publisher 停止事件,主动停止 PSReceiver
@@ -297,53 +445,6 @@ func (d *DownloadDialog) Go() error {
psReceiver.Stop(io.EOF)
})
// 配置接收器
switch d.device.StreamMode {
case mrtp.StreamModeTCPActive:
psReceiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort > 0 {
// 单端口模式
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 1),
Context: d,
}
var loaded bool
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
if loaded {
reader.Context = d
}
psReceiver.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
psReceiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
case mrtp.StreamModeUDP:
if d.gb.udpPort > 0 {
// 单端口模式
reader := &gb28181.SinglePortReader{
SSRC: d.SSRC,
Mouth: make(chan []byte, 100),
Context: d,
}
var loaded bool
reader, loaded = d.gb.singlePorts.LoadOrStore(reader)
if loaded {
reader.Context = d
}
psReceiver.SinglePort = reader
d.OnStop(func() {
reader.Close()
d.gb.singlePorts.Remove(reader)
})
}
psReceiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
}
psReceiver.StreamMode = d.device.StreamMode
// 7. 创建 Recorder 订阅 Publisher 并录制
// 使用 MP4 插件的标准录制配置
if mp4Plugin, ok := d.gb.Server.Plugins.Get("MP4"); ok && mp4Plugin.Meta.NewRecorder != nil {
@@ -359,7 +460,15 @@ func (d *DownloadDialog) Go() error {
// 使用 Plugin.Record 方法创建录制任务
mp4Plugin.Record(publisher, recConf, nil)
d.gb.Info("MP4 录制器已创建", "streamPath", streamPath, "filePath", filePath)
// 保存存储路径前缀(用于后续模糊匹配查找完整路径)
d.FilePath = filePath
// 生成下载 URL
d.DownloadUrl = fmt.Sprintf("/gb28181/download?downloadId=%s", d.DownloadId)
d.gb.Info("MP4 录制器已创建",
"streamPath", streamPath,
"storagePathPrefix", filePath,
"downloadUrl", d.DownloadUrl)
} else {
d.gb.Warn("MP4 插件未加载,无法录制")
}
@@ -377,17 +486,17 @@ func (d *DownloadDialog) Go() error {
// 10. 任务完成,更新状态
if err != nil {
// 判断是否为正常结束EOF/timeout 且 RTP 时间戳已稳定(说明流真的结束了)
// 判断是否为正常结束EOF/timeout 且视频PTS已稳定(说明流真的结束了)
errStr := err.Error()
isNormalEnd := err == io.EOF ||
strings.Contains(errStr, "EOF") ||
strings.Contains(errStr, "timeout")
// 时间戳稳定说明设备已经停止发送数据,流真正结束了
timestampStable := psReceiver.IsTimestampStable()
// PTS稳定说明设备已经停止发送数据,流真正结束了
ptsStable := psReceiver.IsPtsStable()
if isNormalEnd && timestampStable {
d.gb.Info("下载完成:RTP 时间戳已稳定,视为成功",
if isNormalEnd && ptsStable {
d.gb.Info("下载完成:视频 PTS 已稳定,视为成功",
"downloadId", d.DownloadId,
"progress", d.Progress,
"error", errStr)
@@ -400,7 +509,7 @@ func (d *DownloadDialog) Go() error {
d.gb.Warn("下载失败",
"downloadId", d.DownloadId,
"progress", d.Progress,
"timestampStable", timestampStable,
"ptsStable", ptsStable,
"error", errStr)
}
} else {
@@ -414,16 +523,89 @@ func (d *DownloadDialog) Go() error {
d.gb.Info("下载任务已完成,延迟 5 秒后释放资源(确保前端获取到 100% 状态)",
"downloadId", d.DownloadId,
"progress", d.Progress)
time.Sleep(5 * time.Second)
d.gb.Info("延迟时间到,准备释放资源", "downloadId", d.DownloadId)
// 12. 从 RecordStream 表查询完整的文件路径(通过 LIKE 模糊匹配)
var actualFilePath string
if d.gb.DB != nil && d.FilePath != "" {
var record m7s.RecordStream
// 使用 LIKE 查询匹配存储路径前缀的记录
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").
Order("start_time DESC").First(&record).Error; err == nil {
actualFilePath = record.FilePath
d.FilePath = actualFilePath // 更新为完整路径
d.gb.Info("找到完整文件路径",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
} else {
d.gb.Warn("未找到匹配的录制文件",
"downloadId", d.DownloadId,
"searchPath", d.FilePath,
"error", err)
}
}
completed := &CompletedDownloadDialog{
DownloadId: d.DownloadId,
DeviceId: d.DeviceId,
ChannelId: d.ChannelId,
Status: d.Status,
Progress: d.Progress,
FilePath: d.FilePath,
DownloadUrl: d.DownloadUrl,
Error: d.Error,
StartedAt: d.StartedAt,
CompletedAt: d.CompletedAt,
}
d.gb.completedDownloads.Set(completed)
// 13. 保存到 GB28181Record 缓存表并清理RecordStream记录
if d.gb.DB != nil && actualFilePath != "" {
record := &gb28181.GB28181Record{
DownloadId: d.DownloadId,
FilePath: actualFilePath,
Status: "completed",
}
// 使用 Save 方法,如果存在则更新,不存在则插入
if err := d.gb.DB.Save(record).Error; err != nil {
d.gb.Error("保存下载记录到缓存表失败",
"downloadId", d.DownloadId,
"error", err)
} else {
d.gb.Info("下载记录已保存到缓存表",
"downloadId", d.DownloadId,
"filePath", actualFilePath)
// 清理MP4插件的RecordStream记录通过完整路径
if err := d.gb.DB.Where("file_path = ?", actualFilePath).Delete(&m7s.RecordStream{}).Error; err != nil {
d.gb.Error("删除RecordStream记录失败",
"filePath", actualFilePath,
"error", err)
} else {
d.gb.Info("已清理RecordStream记录",
"filePath", actualFilePath)
}
}
}
} else if d.Status == "failed" {
// 14. 下载失败时也需要清理RecordStream记录通过 LIKE 模糊匹配)
if d.gb.DB != nil && d.FilePath != "" {
if err := d.gb.DB.Where("file_path LIKE ?", d.FilePath+"%").Delete(&m7s.RecordStream{}).Error; err != nil {
d.gb.Error("删除失败任务的RecordStream记录失败",
"searchPath", d.FilePath,
"error", err)
} else {
d.gb.Info("已清理失败任务的RecordStream记录",
"searchPath", d.FilePath)
}
}
}
return err
}
// updateProgress 更新下载进度(在 RTP 读取循环中通过回调触发)
// updateProgress 更新下载进度(在 PS 流解析过程中通过回调触发)
func (d *DownloadDialog) updateProgress(psReceiver *mrtp.PSReceiver, totalDuration float64) {
// 基于 RTP 时间戳的进度计算(与倍速无关)
// 基于视频 PTS 的进度计算(与倍速无关,反映真实媒体时长
elapsedSeconds := psReceiver.GetElapsedSeconds()
progress := int(elapsedSeconds / totalDuration * 100)
@@ -435,63 +617,35 @@ func (d *DownloadDialog) updateProgress(psReceiver *mrtp.PSReceiver, totalDurati
}
d.Progress = progress
// 尝试从 MP4 插件的数据库中获取文件信息
if mp4Plugin, ok := d.gb.Server.Plugins.Get("MP4"); ok {
if mp4Plugin.DB != nil {
var record m7s.RecordStream
// 查询最新的录制记录
if err := mp4Plugin.DB.Where("stream_path = ? AND type = ?", psReceiver.Publisher.StreamPath, "mp4").
Order("start_time DESC").First(&record).Error; err == nil {
d.FilePath = record.FilePath
// 使用 record.ID 生成下载链接(单文件下载)
// 这样无论录制是否完成,都能正确下载
d.DownloadUrl = fmt.Sprintf("/mp4/download/%s.mp4?id=%d",
psReceiver.Publisher.StreamPath,
record.ID)
// 获取文件大小
if fileInfo, err := os.Stat(record.FilePath); err == nil {
d.DownloadedBytes = fileInfo.Size()
// 根据当前进度估算总大小
if progress > 0 && progress < 100 {
d.TotalBytes = d.DownloadedBytes * 100 / int64(progress)
} else if progress >= 100 {
d.TotalBytes = d.DownloadedBytes
}
}
}
}
}
d.gb.Info("下载进度更新",
"downloadId", d.DownloadId,
"elapsedSeconds", elapsedSeconds,
"totalDuration", totalDuration,
"progress", progress,
"downloadedBytes", d.DownloadedBytes,
"totalBytes", d.TotalBytes,
"filePath", d.FilePath)
}
// Dispose 释放资源
func (d *DownloadDialog) Dispose() {
switch d.device.StreamMode {
case mrtp.StreamModeUDP:
if d.gb.udpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.udpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "udp")
go func() {
time.Sleep(60 * time.Second)
switch d.device.StreamMode {
case mrtp.StreamModeUDP:
if d.gb.udpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.udpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "udp")
}
}
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.tcpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp")
}
}
}
case mrtp.StreamModeTCPPassive:
if d.gb.tcpPort == 0 { //多端口模式
// 回收端口,防止重复回收
if !d.gb.tcpPB.Release(d.MediaPort) {
d.Warn("port already released or not allocated", "port", d.MediaPort, "type", "tcp")
}
}
}
}()
// 2. 记录日志
d.gb.Info("download dialog dispose",

View File

@@ -78,7 +78,7 @@ func (d *ForwardDialog) Start() (err error) {
d.MediaPort = uint16(0)
d.Debug("ForwardDialog端口分配", "device.StreamMode", device.StreamMode, "StreamModeTCPActive", mrtp.StreamModeTCPActive)
if device.StreamMode != mrtp.StreamModeTCPActive {
if d.gb.MediaPort.Valid() {
d.Debug("ForwardDialog端口分配路径", "path", "tcpPB.Allocate()", "MediaPort.Valid", true)
@@ -223,13 +223,13 @@ func (d *ForwardDialog) Start() (err error) {
d.Info("ForwardDialog发送INVITE使用Transport", "transport", device.Transport, "via", viaHeader)
// Via头部必须是第一个参数
d.session, err = dialogClientCache.Invite(d.gb, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &fromHDR, &toHeader, subjectHeader, &contentTypeHeader)
d.session, err = dialogClientCache.Invite(d, recipient, []byte(strings.Join(sdpInfo, "\r\n")+"\r\n"), viaHeader, &fromHDR, &toHeader, subjectHeader, &contentTypeHeader)
return
}
// Run 运行会话
func (d *ForwardDialog) Run() (err error) {
err = d.session.WaitAnswer(d.gb, sipgo.AnswerOptions{})
err = d.session.WaitAnswer(d, sipgo.AnswerOptions{})
if err != nil {
return
}
@@ -273,7 +273,7 @@ func (d *ForwardDialog) Run() (err error) {
d.session.InviteResponse.Contact().Address = d.session.InviteRequest.Recipient
}
}
err = d.session.Ack(d.gb)
err = d.session.Ack(d)
if err != nil {
d.Error("ack session err", err)
d.Stop(errors.New("ack session err" + err.Error()))

View File

@@ -77,6 +77,7 @@ type GB28181Plugin struct {
channels util.Collection[string, *Channel]
singlePorts util.Collection[uint32, *gb28181.SinglePortReader]
downloadDialogs task.WorkCollection[string, *DownloadDialog]
completedDownloads util.Collection[string, *CompletedDownloadDialog]
}
// ClientWrapper 包装sipgo.Client以实现GetKey接口
@@ -104,7 +105,7 @@ var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
// RegisterHandler 注册自定义 HTTP 路由
func (gb *GB28181Plugin) RegisterHandler() map[string]http.HandlerFunc {
return map[string]http.HandlerFunc{
"/download/{deviceId}/{channelId}/{filename}": gb.handleDownloadFile,
"/download": gb.handleDownloadFile,
}
}
@@ -130,6 +131,7 @@ func (gb *GB28181Plugin) initDatabase() error {
&gb28181.GroupsModel{},
&gb28181.GroupsChannelModel{},
&gb28181.DevicePosition{},
&gb28181.GB28181Record{},
); err != nil {
return fmt.Errorf("auto migrate tables error: %v", err)
}
@@ -191,6 +193,7 @@ func (gb *GB28181Plugin) Start() (err error) {
gb.forwardDialogs.L = new(sync.RWMutex)
gb.singlePorts.L = new(sync.RWMutex)
gb.clients.L = new(sync.RWMutex)
gb.completedDownloads.L = new(sync.RWMutex)
gb.server, _ = sipgo.NewServer(gb.ua, sipgo.WithServerLogger(logger)) // Creating server handle for ua
gb.server.OnMessage(gb.OnMessage)
gb.server.OnRegister(gb.OnRegister)
@@ -303,18 +306,18 @@ func (gb *GB28181Plugin) Start() (err error) {
func (gb *GB28181Plugin) getOrCreateClient(hostname string, port int, transport string) (*sipgo.Client, error) {
// Key包含transport因为同一个IP:Port可能同时有TCP和UDP
key := fmt.Sprintf("%s:%d:%s", hostname, port, strings.ToUpper(transport))
// 尝试从缓存获取
if wrapper, ok := gb.clients.Get(key); ok {
return wrapper.Client, nil
}
// 创建新Client
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
AddSource: false,
}))
client, err := sipgo.NewClient(gb.ua,
sipgo.WithClientLogger(logger),
sipgo.WithClientHostname(hostname),
@@ -323,7 +326,7 @@ func (gb *GB28181Plugin) getOrCreateClient(hostname string, port int, transport
if err != nil {
return nil, fmt.Errorf("创建Client失败 %s: %v", key, err)
}
// 存入缓存需要包装成实现GetKey的类型
wrapper := &ClientWrapper{
Client: client,
@@ -331,7 +334,7 @@ func (gb *GB28181Plugin) getOrCreateClient(hostname string, port int, transport
}
gb.clients.Set(wrapper)
gb.Info("创建新Client", "hostname", hostname, "port", port, "key", key)
return client, nil
}
@@ -471,7 +474,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
// 加载设备的通道包括deviceId或parentId等于device.DeviceId的通道
var channels []gb28181.DeviceChannel
if err := gb.DB.Where("device_id = ? OR parent_id = ?", device.DeviceId, device.DeviceId).Find(&channels).Error; err != nil {
if err := gb.DB.Where("device_id = ?", device.DeviceId).Find(&channels).Error; err != nil {
gb.Error("加载通道失败", "error", err, "deviceId", device.DeviceId)
continue
}
@@ -479,7 +482,7 @@ func (gb *GB28181Plugin) checkDeviceExpire() (err error) {
if gb.SipIP != "" {
device.SipIp = gb.SipIP
}
if gb.MediaIP != "" {
if gb.MediaIP != "" && device.MediaIp == "" {
device.MediaIp = gb.MediaIP
}

View File

@@ -942,6 +942,7 @@ type Device struct {
Ip string `protobuf:"bytes,22,opt,name=ip,proto3" json:"ip,omitempty"`
Port int32 `protobuf:"varint,23,opt,name=port,proto3" json:"port,omitempty"`
BroadcastPushAfterAck bool `protobuf:"varint,24,opt,name=broadcastPushAfterAck,proto3" json:"broadcastPushAfterAck,omitempty"`
Charset string `protobuf:"bytes,25,opt,name=charset,proto3" json:"charset,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1144,6 +1145,13 @@ func (x *Device) GetBroadcastPushAfterAck() bool {
return false
}
func (x *Device) GetCharset() string {
if x != nil {
return x.Charset
}
return ""
}
type ResponseList struct {
state protoimpl.MessageState `protogen:"open.v1"`
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
@@ -7481,7 +7489,7 @@ const file_gb28181_proto_rawDesc = "" +
"\x06status\x18\x10 \x01(\tR\x06status\x124\n" +
"\agpsTime\x18\x11 \x01(\v2\x1a.google.protobuf.TimestampR\agpsTime\x12\x1c\n" +
"\tlongitude\x18\x12 \x01(\tR\tlongitude\x12\x1a\n" +
"\blatitude\x18\x13 \x01(\tR\blatitude\"\xd5\x06\n" +
"\blatitude\x18\x13 \x01(\tR\blatitude\"\xef\x06\n" +
"\x06Device\x12\x1a\n" +
"\bdeviceId\x18\x01 \x01(\tR\bdeviceId\x12\x12\n" +
"\x04name\x18\x02 \x01(\tR\x04name\x12\"\n" +
@@ -7511,7 +7519,8 @@ const file_gb28181_proto_rawDesc = "" +
"\ttransport\x18\x15 \x01(\tR\ttransport\x12\x0e\n" +
"\x02ip\x18\x16 \x01(\tR\x02ip\x12\x12\n" +
"\x04port\x18\x17 \x01(\x05R\x04port\x124\n" +
"\x15broadcastPushAfterAck\x18\x18 \x01(\bR\x15broadcastPushAfterAck\"d\n" +
"\x15broadcastPushAfterAck\x18\x18 \x01(\bR\x15broadcastPushAfterAck\x12\x18\n" +
"\acharset\x18\x19 \x01(\tR\acharset\"d\n" +
"\fResponseList\x12\x12\n" +
"\x04code\x18\x01 \x01(\x05R\x04code\x12\x18\n" +
"\amessage\x18\x02 \x01(\tR\amessage\x12&\n" +

View File

@@ -648,6 +648,7 @@ message Device {
string ip = 22;
int32 port = 23;
bool broadcastPushAfterAck =24;
string charset =25;
}
message ResponseList {
@@ -1300,7 +1301,7 @@ message StartDownloadRequest {
// StartDownloadData 下载任务数据
message StartDownloadData {
string downloadId = 1; // 下载任务ID格式startTime_endTime_deviceId_channelId
string downloadId = 1; // 下载任务ID格式deviceId_channelId_startTime_endTime
string status = 2; // 初始状态pending
string downloadUrl = 3; // 下载链接(完成后可直接访问)
}

View File

@@ -0,0 +1,22 @@
package gb28181
import "time"
// GB28181Record GB28181录像缓存表用于避免重复下载相同时间段的录像
type GB28181Record struct {
DownloadId string `gorm:"primaryKey"` // 格式:{deviceId}_{channelId}_{startTime}_{endTime}
FilePath string // MP4文件路径绝对路径
Status string // completed/failed
CreatedAt time.Time `gorm:"autoCreateTime"`
UpdatedAt time.Time `gorm:"autoUpdateTime"`
}
// TableName 指定表名
func (GB28181Record) TableName() string {
return "gb28181_record"
}
// GetKey 实现 Collection 接口
func (r *GB28181Record) GetKey() string {
return r.DownloadId
}

View File

@@ -43,6 +43,8 @@ func (task *registerHandlerTask) getDevicePassword(device *Device) string {
}
func (task *registerHandlerTask) Run() (err error) {
task.SetDescription("OnRegister,deviceid:", task.req.From().Address.User)
task.SetDescription("starttime", time.Now().Format("2006-01-02 15:04:05"))
var password string
var device *Device
var recover = false
@@ -311,7 +313,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
d.channels.L = new(sync.RWMutex)
d.catalogReqs.L = new(sync.RWMutex)
d.plugin = task.gb
d.plugin.Info("RecoverDevice", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort, "deviceid", d.DeviceId)
d.Info("RecoverDevice end before catalog", "source", source, "desc", desc, "device.SipIp", myLanIP, "device.WanIP", myWanIP, "recipient", req.Recipient, "myPort", myPort, "deviceid", d.DeviceId)
if task.gb.DB != nil {
//var existing Device
@@ -323,7 +325,7 @@ func (task *registerHandlerTask) RecoverDevice(d *Device, req *sip.Request) {
//}
task.gb.DB.Save(d)
}
d.catalog()
go d.catalog()
return
}

View File

@@ -83,9 +83,8 @@ func (r *RTPTCPReader) Read(packet *rtp.Packet) (err error) {
type RTPPayloadReader struct {
IRTPReader
rtp.Packet
SSRC uint32 // RTP SSRC
buffer gomem.MemoryReader
onTimestampUpdate func(uint32) // 时间戳更新回调
SSRC uint32 // RTP SSRC
buffer gomem.MemoryReader
}
// func NewTCPRTPPayloadReaderForFeed() *RTPPayloadReader {
@@ -126,11 +125,6 @@ func (r *RTPPayloadReader) Read(buf []byte) (n int, err error) {
continue
}
// 更新时间戳
if r.onTimestampUpdate != nil {
r.onTimestampUpdate(r.Timestamp)
}
// 检查序列号是否连续
if lastSeq == 0 || r.SequenceNumber == lastSeq+1 {
// 序列号连续,处理当前包的数据

View File

@@ -62,27 +62,42 @@ type Receiver struct {
RTPMouth chan []byte
SinglePort io.ReadCloser
rtpReader *RTPPayloadReader // 保存 RTP 读取器引用
AllowedIP string // 允许连接的IP地址为空则不限制
started bool // 标记是否已经启动过
}
type PSReceiver struct {
Receiver
mpegps.MpegPsDemuxer
firstRtpTimestamp uint32 // 第一个 RTP 包的时间戳
currentRtpTimestamp uint32 // 当前 RTP 包的时间戳
hasFirstTimestamp bool // 是否已记录第一个时间戳
lastTimestampUpdate time.Time // 最后一次时间戳更新的时间
firstVideoPts uint64 // 第一个视频帧的 PTS90kHz
currentVideoPts uint64 // 当前视频帧的 PTS90kHz
hasFirstPts bool // 是否已记录第一个 PTS
lastPtsUpdate time.Time // 最后一次 PTS 更新的时间
OnProgressUpdate func() // 进度更新回调(可选,导出供外部使用)
lastProgressUpdate time.Time // 最后一次进度更新时间
ProgressUpdatePeriod time.Duration // 进度更新周期默认1秒导出供外部配置
}
func (p *PSReceiver) Start() error {
p.Info("PSReceiver.Start called", "StreamMode", p.Receiver.StreamMode, "SinglePort", p.Receiver.SinglePort != nil, "started", p.Receiver.started, "ListenAddr", p.Receiver.ListenAddr)
// 多端口模式下始终打印启动日志
if p.Receiver.StreamMode == StreamModeTCPPassive && p.Receiver.SinglePort == nil {
if !p.Receiver.started {
p.Info("start new listener", "addr", p.Receiver.ListenAddr)
} else {
p.Info("listener already started", "addr", p.Receiver.ListenAddr)
}
}
err := p.Receiver.Start()
if err == nil {
p.Using(p.Publisher)
// 设置 RTP 时间戳更新回调
if p.rtpReader != nil {
p.rtpReader.onTimestampUpdate = p.UpdateRtpTimestamp
// 设置 PTS 更新回调到 MpegPsDemuxer
p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts
// 默认进度更新周期为1秒
if p.ProgressUpdatePeriod == 0 {
p.ProgressUpdatePeriod = time.Second
}
}
return err
@@ -93,30 +108,29 @@ func (p *PSReceiver) Run() error {
if err != nil {
return err
}
p.MpegPsDemuxer.Allocator = gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2)
p.Using(p.MpegPsDemuxer.Allocator)
// 确保回调已设置
p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts
return p.MpegPsDemuxer.Feed(p.BufReader)
}
// UpdateRtpTimestamp 更新 RTP 时间戳(从 RTP 包中调用)
func (p *PSReceiver) UpdateRtpTimestamp(timestamp uint32) {
// UpdateVideoPts 更新视频 PTS从 MpegPsDemuxer 中调用)
func (p *PSReceiver) UpdateVideoPts(pts uint64) {
now := time.Now()
if !p.hasFirstTimestamp {
p.firstRtpTimestamp = timestamp
p.hasFirstTimestamp = true
p.lastTimestampUpdate = now
if !p.hasFirstPts {
p.firstVideoPts = pts
p.hasFirstPts = true
p.lastPtsUpdate = now
p.lastProgressUpdate = now
// 默认进度更新周期为1秒
if p.ProgressUpdatePeriod == 0 {
p.ProgressUpdatePeriod = time.Second
}
p.Info("PSReceiver: 首帧视频PTS", "pts", pts)
}
// 检测时间戳是否变化
if timestamp != p.currentRtpTimestamp {
p.currentRtpTimestamp = timestamp
p.lastTimestampUpdate = now
// 检测 PTS 是否变化
if pts != p.currentVideoPts {
p.currentVideoPts = pts
p.lastPtsUpdate = now
// 定期触发进度更新回调(避免过于频繁)
if p.OnProgressUpdate != nil && now.Sub(p.lastProgressUpdate) >= p.ProgressUpdatePeriod {
@@ -126,34 +140,40 @@ func (p *PSReceiver) UpdateRtpTimestamp(timestamp uint32) {
}
}
// GetElapsedSeconds 获取已播放的时长(秒),基于 RTP 时间戳
// RTP 时间戳单位是 90kHz视频标准时钟频率)
// GetElapsedSeconds 获取已播放的时长(秒),基于视频 PTS
// PTS 时间戳单位是 90kHzMPEG标准时钟频率)
func (p *PSReceiver) GetElapsedSeconds() float64 {
if !p.hasFirstTimestamp {
if !p.hasFirstPts {
return 0
}
// 计算时间戳差值(处理回绕)
var diff uint32
if p.currentRtpTimestamp >= p.firstRtpTimestamp {
diff = p.currentRtpTimestamp - p.firstRtpTimestamp
// 计算 PTS 差值(处理回绕)
var diff uint64
if p.currentVideoPts >= p.firstVideoPts {
diff = p.currentVideoPts - p.firstVideoPts
} else {
// 32位回绕
diff = (0xFFFFFFFF - p.firstRtpTimestamp) + p.currentRtpTimestamp + 1
// 33位PTS回绕虽然极少发生
diff = (0x1FFFFFFFF - p.firstVideoPts) + p.currentVideoPts + 1
}
// 转换为秒:timestamp / 90000
// 转换为秒:pts / 90000
return float64(diff) / 90000.0
}
// IsTimestampStable 检查 RTP 时间戳是否已经稳定(停止增长)
// 如果时间戳超过 2 秒没有变化,认为已经稳定
func (p *PSReceiver) IsTimestampStable() bool {
if !p.hasFirstTimestamp {
// IsPtsStable 检查视频 PTS 是否已经稳定(停止增长)
// 如果 PTS 超过 2 秒没有变化,认为已经稳定
func (p *PSReceiver) IsPtsStable() bool {
if !p.hasFirstPts {
return false
}
return time.Since(p.lastTimestampUpdate) > 2*time.Second
return time.Since(p.lastPtsUpdate) > 2*time.Second
}
func (p *Receiver) Start() (err error) {
// 如果已经启动过,直接返回
if p.started {
return nil
}
p.started = true
var rtpReader *RTPPayloadReader
switch p.StreamMode {
case StreamModeTCPActive:
@@ -220,10 +240,14 @@ func (p *Receiver) Start() (err error) {
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeManual:
p.Info("进入 StreamModeManual 分支")
p.RTPMouth = make(chan []byte)
rtpReader = NewRTPPayloadReader((RTPChanReader)(p.RTPMouth))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
default:
p.Error("未知的 StreamMode", "StreamMode", p.StreamMode)
return fmt.Errorf("unknown StreamMode: %s", p.StreamMode)
}
p.Using(rtpReader, p.BufReader)
return