Files
monibuca/plugin/rtp/pkg/transceiver.go
pggiroro 1780dde594 feat: listener before ack,download progress
1.listener before ack
2.use ps.pts to update download progress
2025-11-17 21:47:28 +08:00

269 lines
7.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package rtp
import (
"errors"
"fmt"
"io"
"net"
"strings"
"time"
"github.com/langhuihui/gomem"
task "github.com/langhuihui/gotask"
"github.com/pion/rtp"
mpegps "m7s.live/v5/pkg/format/ps"
"m7s.live/v5/pkg/util"
)
var ErrRTPReceiveLost = errors.New("rtp receive lost")
// 数据流传输模式UDP:udp传输、TCP-ACTIVEtcp主动模式、TCP-PASSIVEtcp被动模式、MANUAL手动模式
type StreamMode string
const (
StreamModeUDP StreamMode = "UDP"
StreamModeTCPActive StreamMode = "TCP-ACTIVE"
StreamModeTCPPassive StreamMode = "TCP-PASSIVE"
StreamModeManual StreamMode = "MANUAL"
)
type ChanReader chan []byte
func (r ChanReader) Read(buf []byte) (n int, err error) {
b, ok := <-r
if !ok {
return 0, io.EOF
}
copy(buf, b)
return len(b), nil
}
type RTPChanReader chan []byte
func (r RTPChanReader) Read(packet *rtp.Packet) (err error) {
b, ok := <-r
if !ok {
return io.EOF
}
return packet.Unmarshal(b)
}
func (r RTPChanReader) Close() error {
close(r)
return nil
}
type Receiver struct {
task.Task
*util.BufReader
ListenAddr string
net.Listener
StreamMode StreamMode
RTPMouth chan []byte
SinglePort io.ReadCloser
rtpReader *RTPPayloadReader // 保存 RTP 读取器引用
AllowedIP string // 允许连接的IP地址为空则不限制
started bool // 标记是否已经启动过
}
type PSReceiver struct {
Receiver
mpegps.MpegPsDemuxer
firstVideoPts uint64 // 第一个视频帧的 PTS90kHz
currentVideoPts uint64 // 当前视频帧的 PTS90kHz
hasFirstPts bool // 是否已记录第一个 PTS
lastPtsUpdate time.Time // 最后一次 PTS 更新的时间
OnProgressUpdate func() // 进度更新回调(可选,导出供外部使用)
lastProgressUpdate time.Time // 最后一次进度更新时间
ProgressUpdatePeriod time.Duration // 进度更新周期默认1秒导出供外部配置
}
func (p *PSReceiver) Start() error {
p.Info("PSReceiver.Start called", "StreamMode", p.Receiver.StreamMode, "SinglePort", p.Receiver.SinglePort != nil, "started", p.Receiver.started, "ListenAddr", p.Receiver.ListenAddr)
// 多端口模式下始终打印启动日志
if p.Receiver.StreamMode == StreamModeTCPPassive && p.Receiver.SinglePort == nil {
if !p.Receiver.started {
p.Info("start new listener", "addr", p.Receiver.ListenAddr)
} else {
p.Info("listener already started", "addr", p.Receiver.ListenAddr)
}
}
err := p.Receiver.Start()
if err == nil {
p.Using(p.Publisher)
// 设置 PTS 更新回调到 MpegPsDemuxer
p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts
// 默认进度更新周期为1秒
if p.ProgressUpdatePeriod == 0 {
p.ProgressUpdatePeriod = time.Second
}
}
return err
}
func (p *PSReceiver) Run() error {
err := p.Receiver.Run()
if err != nil {
return err
}
p.MpegPsDemuxer.Allocator = gomem.NewScalableMemoryAllocator(1 << gomem.MinPowerOf2)
p.Using(p.MpegPsDemuxer.Allocator)
// 确保回调已设置
p.MpegPsDemuxer.OnVideoPtsUpdate = p.UpdateVideoPts
return p.MpegPsDemuxer.Feed(p.BufReader)
}
// UpdateVideoPts 更新视频 PTS从 MpegPsDemuxer 中调用)
func (p *PSReceiver) UpdateVideoPts(pts uint64) {
now := time.Now()
if !p.hasFirstPts {
p.firstVideoPts = pts
p.hasFirstPts = true
p.lastPtsUpdate = now
p.lastProgressUpdate = now
p.Info("PSReceiver: 首帧视频PTS", "pts", pts)
}
// 检测 PTS 是否变化
if pts != p.currentVideoPts {
p.currentVideoPts = pts
p.lastPtsUpdate = now
// 定期触发进度更新回调(避免过于频繁)
if p.OnProgressUpdate != nil && now.Sub(p.lastProgressUpdate) >= p.ProgressUpdatePeriod {
p.lastProgressUpdate = now
p.OnProgressUpdate()
}
}
}
// GetElapsedSeconds 获取已播放的时长(秒),基于视频 PTS
// PTS 时间戳单位是 90kHzMPEG标准时钟频率
func (p *PSReceiver) GetElapsedSeconds() float64 {
if !p.hasFirstPts {
return 0
}
// 计算 PTS 差值(处理回绕)
var diff uint64
if p.currentVideoPts >= p.firstVideoPts {
diff = p.currentVideoPts - p.firstVideoPts
} else {
// 33位PTS回绕虽然极少发生
diff = (0x1FFFFFFFF - p.firstVideoPts) + p.currentVideoPts + 1
}
// 转换为秒pts / 90000
return float64(diff) / 90000.0
}
// IsPtsStable 检查视频 PTS 是否已经稳定(停止增长)
// 如果 PTS 超过 2 秒没有变化,认为已经稳定
func (p *PSReceiver) IsPtsStable() bool {
if !p.hasFirstPts {
return false
}
return time.Since(p.lastPtsUpdate) > 2*time.Second
}
func (p *Receiver) Start() (err error) {
// 如果已经启动过,直接返回
if p.started {
return nil
}
p.started = true
var rtpReader *RTPPayloadReader
switch p.StreamMode {
case StreamModeTCPActive:
// TCP主动模式不需要监听直接返回
p.Info("TCP-ACTIVE mode, no need to listen")
addr := p.ListenAddr
if !strings.Contains(addr, ":") {
addr = ":" + addr
}
if strings.HasPrefix(addr, ":") {
p.Error("invalid address, missing IP", "addr", addr)
return fmt.Errorf("invalid address %s, missing IP", addr)
}
p.Info("TCP-ACTIVE mode, connecting to device", "addr", addr)
var conn net.Conn
conn, err = net.Dial("tcp", addr)
if err != nil {
p.Error("connect to device failed", "err", err)
return err
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeTCPPassive:
var conn io.ReadCloser
if p.SinglePort == nil {
p.Info("start new listener", "addr", p.ListenAddr)
p.Listener, err = net.Listen("tcp4", p.ListenAddr)
if err != nil {
p.Error("start listen", "err", err)
return errors.New("start listen,err" + err.Error())
}
p.OnStop(p.Listener.Close)
return
} else {
conn = p.SinglePort
}
if err != nil {
p.Error("accept", "err", err)
return err
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeUDP:
var conn io.ReadCloser
if p.SinglePort == nil {
var udpAddr *net.UDPAddr
udpAddr, err = net.ResolveUDPAddr("udp4", p.ListenAddr)
if err != nil {
return
}
conn, err = net.ListenUDP("udp4", udpAddr)
if err != nil {
return
}
} else {
conn = p.SinglePort
}
p.OnStop(conn.Close)
rtpReader = NewRTPPayloadReader(NewRTPUDPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
case StreamModeManual:
p.Info("进入 StreamModeManual 分支")
p.RTPMouth = make(chan []byte)
rtpReader = NewRTPPayloadReader((RTPChanReader)(p.RTPMouth))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
default:
p.Error("未知的 StreamMode", "StreamMode", p.StreamMode)
return fmt.Errorf("unknown StreamMode: %s", p.StreamMode)
}
p.Using(rtpReader, p.BufReader)
return
}
func (p *Receiver) Run() error {
if p.Listener != nil {
conn, err := p.Accept()
if err != nil {
return err
}
p.OnStop(conn.Close)
rtpReader := NewRTPPayloadReader(NewRTPTCPReader(conn))
p.rtpReader = rtpReader
p.BufReader = util.NewBufReader(rtpReader)
}
return nil
}