feat:support receive stream via UDP (#317)

Co-authored-by: yjx <yjx>
This commit is contained in:
yangjinxing123
2025-08-25 15:59:26 +08:00
committed by GitHub
parent b6ee2843b0
commit db265e0ef0
5 changed files with 375 additions and 46 deletions

View File

@@ -104,6 +104,7 @@ func (d *Dialog) Start() (err error) {
}
//defer d.gb.dialogs.Remove(d)
if d.StreamMode == "TCP-PASSIVE" {
if d.gb.tcpPort > 0 {
d.MediaPort = d.gb.tcpPort
} else {
@@ -117,6 +118,21 @@ func (d *Dialog) Start() (err error) {
d.MediaPort = d.gb.MediaPort[0]
}
}
} else if d.StreamMode == "UDP" {
if d.gb.udpPort > 0 {
d.MediaPort = d.gb.udpPort
} else {
if d.gb.MediaPort.Valid() {
select {
case d.MediaPort = <-d.gb.udpPorts:
default:
return fmt.Errorf("no available udp port")
}
} else {
d.MediaPort = d.gb.MediaPort[0]
}
}
}
ssrc := d.CreateSSRC(d.gb.Serial)
d.Info("MediaIp is ", device.MediaIp)
@@ -179,7 +195,9 @@ func (d *Dialog) Start() (err error) {
"a=connection:new",
)
case "UDP":
/* 支持udp收流 yjx
return errors.New("do not support udp mode")
*/
default:
sdpInfo = append(sdpInfo,
"a=setup:passive",
@@ -310,7 +328,7 @@ func (d *Dialog) Run() (err error) {
pub := gb28181.NewPSPublisher(d.pullCtx.Publisher)
if d.StreamMode == "TCP-ACTIVE" {
pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort)
} else {
} else if d.StreamMode == "TCP-PASSIVE" {
if d.gb.tcpPort > 0 {
d.Info("into single port mode, use gb.tcpPort", d.gb.tcpPort)
if d.gb.netListener != nil {
@@ -324,14 +342,44 @@ func (d *Dialog) Run() (err error) {
pub.Receiver.SSRC = d.SSRC
}
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
} else if d.StreamMode == "UDP" {
pub.Receiver.IsSinglePort = false
if d.gb.udpPort > 0 {
d.Info("into single port mode, use gb.udpPort", d.gb.udpPort)
if d.gb.netUDPListener != nil {
d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr())
pub.Receiver.ListenerUdp = d.gb.netUDPListener
} else {
d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort))
pub.Receiver.ListenerUdp, err = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4)
if err != nil {
d.Error("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort), "err", err)
return errors.New("start udp listen, err" + err.Error())
}
d.gb.netUDPListener = pub.Receiver.ListenerUdp
}
pub.Receiver.IsSinglePort = true
}
pub.Receiver.SSRC = d.SSRC
pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort)
pub.Receiver.UdpCacheSize = 10
}
pub.Receiver.StreamMode = d.StreamMode
d.AddTask(&pub.Receiver)
startResult := pub.Receiver.WaitStarted()
if startResult != nil {
return fmt.Errorf("pub.Receiver.WaitStarted %s", startResult)
}
d.gb.udpPubs.Set(pub)
pub.Demux()
d.gb.udpPubs.Remove(pub)
return
}
@@ -340,10 +388,18 @@ func (d *Dialog) GetKey() string {
}
func (d *Dialog) Dispose() {
if d.StreamMode == "UDP" {
if d.gb.udpPort == 0 {
// 如果没有设置udp端口则将MediaPort设置为0表示不再使用
d.gb.udpPorts <- d.MediaPort
}
} else {
if d.gb.tcpPort == 0 {
// 如果没有设置tcp端口则将MediaPort设置为0表示不再使用
d.gb.tcpPorts <- d.MediaPort
}
}
d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId)
if d.session != nil {
err := d.session.Bye(d)

View File

@@ -17,6 +17,7 @@ import (
"github.com/emiago/sipgo"
"github.com/emiago/sipgo/sip"
"github.com/pion/rtp"
"github.com/rs/zerolog"
m7s "m7s.live/v5"
"m7s.live/v5/pkg/config"
@@ -64,6 +65,10 @@ type GB28181Plugin struct {
Platforms []*gb28181.PlatformModel
channels util.Collection[string, *Channel]
netListener net.Listener
udpPorts chan uint16
udpPort uint16
netUDPListener *net.UDPConn
udpPubs task.Manager[uint32, *gb28181.PSPublisher]
}
var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{
@@ -177,15 +182,42 @@ func (gb *GB28181Plugin) OnInit() (err error) {
if gb.MediaPort.Valid() {
gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1]))
gb.tcpPorts = make(chan uint16, gb.MediaPort.Size())
gb.udpPorts = make(chan uint16, gb.MediaPort.Size())
if gb.MediaPort.Size() == 0 {
gb.tcpPort = gb.MediaPort[0]
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.udpPort = gb.MediaPort[0]
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
} else if gb.MediaPort.Size() == 1 {
gb.tcpPort = gb.MediaPort[0] + 1
gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort))
//support udp
{
gb.udpPort = gb.MediaPort[0] + 1
gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4)
if err != nil {
gb.Error("start listen", "err", err)
return errors.New("start udp listen, err" + err.Error())
}
go gb.ReadUdpInsinglePort()
}
} else {
for i := range gb.MediaPort.Size() {
gb.tcpPorts <- gb.MediaPort[0] + i
gb.udpPorts <- gb.MediaPort[0] + i
}
}
} else {
@@ -1049,3 +1081,24 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) {
return
}
}
func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) {
buffer := make(util.Buffer, 1024*1024)
var rtpPacket rtp.Packet
for {
n, _, err := gb.netUDPListener.ReadFromUDP(buffer)
if err != nil {
return err
}
ps := buffer[:n]
if err := rtpPacket.Unmarshal(ps); err != nil {
continue
}
pub, ret := gb.udpPubs.Get(rtpPacket.SSRC)
if ret {
pub.Receiver.ReadUdpRTP(buffer[:n])
}
}
}

View File

@@ -12,6 +12,7 @@ import (
"m7s.live/v5/pkg"
"m7s.live/v5/pkg/task"
"m7s.live/v5/pkg/util"
"m7s.live/v5/plugin/gb28181/udputils"
rtp2 "m7s.live/v5/plugin/rtp/pkg"
)
@@ -47,6 +48,13 @@ type Receiver struct {
Listener net.Listener
StreamMode string // 数据流传输模式UDP:udp传输/TCP-ACTIVEtcp主动模式/TCP-PASSIVEtcp被动模式
SSRC uint32 // RTP SSRC
ListenerUdp *net.UDPConn
RTPReaderUdp *rtp2.UDP
IsSinglePort bool
SingleStop chan struct{}
udpCache *udputils.PriorityQueueRtp
UdpCacheSize int
lastSeq uint16
}
func NewPSPublisher(puber *m7s.Publisher) *PSPublisher {
@@ -142,6 +150,10 @@ func (dec *PSPublisher) decProgramStreamMap() (err error) {
return nil
}
func (p *PSPublisher) GetKey() uint32 {
return p.Receiver.SSRC
}
func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
lastSeq := p.SequenceNumber
if err = p.Unmarshal(rtp); err != nil {
@@ -172,16 +184,71 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) {
return task.ErrTaskComplete
}
return
}
} else {
p.Error("rtp seq mismatch,", "lastSeq", lastSeq, "seq", p.SequenceNumber)
return ErrRTPReceiveLost
}
}
func (p *Receiver) ReadUdpRTP(rtp util.Buffer) (err error) {
//解析rtp
if err = p.Unmarshal(rtp); err != nil {
p.Error("unmarshal error", "err", err)
return nil
}
//判断ssrc
if p.SSRC != 0 && p.SSRC != p.Packet.SSRC {
p.Info("ReadUdpRTP, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC)
if p.TraceEnabled() {
p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC)
}
return nil
}
if p.UdpCacheSize > 0 && p.udpCache == nil {
p.udpCache = udputils.NewPqRtp()
}
//p.Info("ReadUdpRTP, seq", "rtpSeq", p.SequenceNumber)
//加入缓存,自动排序
rtpTmpCache := p.Packet
rtpTmpCache.Payload = make([]byte, len(p.Payload))
copy(rtpTmpCache.Payload, p.Payload)
p.udpCache.Push(rtpTmpCache)
rtpTmp := p.Packet
if p.udpCache.Len() < p.UdpCacheSize-1 {
return nil
} else {
rtpTmp, _ = p.udpCache.Pop()
}
//p.Info("ReadUdpRTP, seq", "rtpTmpSeq", rtpTmp.SequenceNumber)
p.lastSeq = rtpTmp.SequenceNumber
if p.TraceEnabled() {
p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC)
}
copyData := make([]byte, len(rtpTmp.Payload))
copy(copyData, rtpTmp.Payload)
select {
case p.FeedChan <- copyData:
// 成功发送数据
case <-p.Done():
// 任务已停止,返回错误
return task.ErrTaskComplete
}
return nil
}
func (p *Receiver) Start() (err error) {
if strings.ToUpper(p.StreamMode) == "TCP-ACTIVE" {
// TCP主动模式不需要监听直接返回
p.Info("TCP-ACTIVE mode, no need to listen")
return nil
}
} else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" {
// TCP被动模式
if p.Listener == nil {
p.Info("start new listener", "addr", p.ListenAddr)
@@ -192,6 +259,17 @@ func (p *Receiver) Start() (err error) {
}
}
p.Info("start listen", "addr", p.ListenAddr)
} else {
if p.ListenerUdp == nil {
p.Info("start new listener", "addr", p.ListenAddr)
p.ListenerUdp, err = util.ListenUDP(p.ListenAddr, 1024*1024*10)
if err != nil {
p.Error("start listen", "err", err)
return errors.New("start listen,err" + err.Error())
}
}
}
return
}
@@ -205,6 +283,13 @@ func (p *Receiver) Dispose() {
if p.RTPReader != nil {
p.RTPReader.Close()
}
if p.ListenerUdp != nil && !p.IsSinglePort {
p.ListenerUdp.Close()
}
if p.IsSinglePort {
close(p.SingleStop)
}
if p.FeedChan != nil {
close(p.FeedChan)
}
@@ -230,8 +315,7 @@ func (p *Receiver) Go() error {
p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn))
p.Info("connected to device", "addr", conn.RemoteAddr())
return p.RTPReader.Read(p.ReadRTP)
}
// TCP被动模式
} else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" { // TCP被动模式
p.Info("start accept")
conn, err := p.Listener.Accept()
if err != nil {
@@ -241,4 +325,17 @@ func (p *Receiver) Go() error {
p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn))
p.Info("accept", "addr", conn.RemoteAddr())
return p.RTPReader.Read(p.ReadRTP)
} else { //UDP模式
if p.IsSinglePort {
p.SingleStop = make(chan struct{})
<-p.SingleStop
p.Info("stop udp accept", "ssrc", p.SSRC)
return nil
} else {
p.Info("start udp accept")
p.RTPReaderUdp = (*rtp2.UDP)(p.ListenerUdp)
return p.RTPReaderUdp.Read(p.ReadUdpRTP)
}
}
}

View File

@@ -0,0 +1,98 @@
package udputils
import (
"container/heap"
"errors"
"github.com/pion/rtp"
)
const MaxRtpDiff = 65000 //相邻两个包之间的最大差值
type PriorityQueueRtp struct {
itemHeap *packets
current *rtp.Packet
priorityMap map[uint16]bool
lastPacket *rtp.Packet
}
func NewPqRtp() *PriorityQueueRtp {
return &PriorityQueueRtp{
itemHeap: &packets{},
priorityMap: make(map[uint16]bool),
}
}
func (p *PriorityQueueRtp) Len() int {
return p.itemHeap.Len()
}
func (p *PriorityQueueRtp) Push(v rtp.Packet) {
if p.priorityMap[v.SequenceNumber] {
return
}
newItem := &packet{
value: v,
priority: v.SequenceNumber,
}
heap.Push(p.itemHeap, newItem)
}
func (p *PriorityQueueRtp) Pop() (rtp.Packet, error) {
if len(*p.itemHeap) == 0 {
return rtp.Packet{}, errors.New("empty queue")
}
item := heap.Pop(p.itemHeap).(*packet)
return item.value, nil
}
func (p *PriorityQueueRtp) Empty() {
old := *p.itemHeap
*p.itemHeap = old[:0]
}
type packets []*packet
type packet struct {
value rtp.Packet
priority uint16
index int
}
func (p *packets) Len() int {
return len(*p)
}
func (p *packets) Less(i, j int) bool {
a, b := (*p)[i].priority, (*p)[j].priority
if int(a)-int(b) > MaxRtpDiff || int(b)-int(a) > MaxRtpDiff {
if a < b {
return false
}
return true
}
return a < b
}
func (p *packets) Swap(i, j int) {
(*p)[i], (*p)[j] = (*p)[j], (*p)[i]
(*p)[i].index = i
(*p)[j].index = j
}
func (p *packets) Push(x interface{}) {
it := x.(*packet)
it.index = len(*p)
*p = append(*p, it)
}
func (p *packets) Pop() interface{} {
old := *p
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*p = old[0 : n-1]
return item
}

25
plugin/rtp/pkg/udp.go Normal file
View File

@@ -0,0 +1,25 @@
package rtp
import (
"net"
"m7s.live/v5/pkg/util"
)
type UDP net.UDPConn
func (t *UDP) Read(onRTP func(util.Buffer) error) (err error) {
buffer := make(util.Buffer, 1024*1024)
for {
n, _, err := (*net.UDPConn)(t).ReadFromUDP(buffer)
if err != nil {
return err
}
err = onRTP(buffer[:n])
if err != nil {
//return err
}
}
}