diff --git a/plugin/gb28181/dialog.go b/plugin/gb28181/dialog.go index a4276b9..65d5a2d 100644 --- a/plugin/gb28181/dialog.go +++ b/plugin/gb28181/dialog.go @@ -104,17 +104,33 @@ func (d *Dialog) Start() (err error) { } //defer d.gb.dialogs.Remove(d) - if d.gb.tcpPort > 0 { - d.MediaPort = d.gb.tcpPort - } else { - if d.gb.MediaPort.Valid() { - select { - case d.MediaPort = <-d.gb.tcpPorts: - default: - return fmt.Errorf("no available tcp port") - } + if d.StreamMode == "TCP-PASSIVE" { + if d.gb.tcpPort > 0 { + d.MediaPort = d.gb.tcpPort } else { - d.MediaPort = d.gb.MediaPort[0] + if d.gb.MediaPort.Valid() { + select { + case d.MediaPort = <-d.gb.tcpPorts: + default: + return fmt.Errorf("no available tcp port") + } + } else { + d.MediaPort = d.gb.MediaPort[0] + } + } + } else if d.StreamMode == "UDP" { + if d.gb.udpPort > 0 { + d.MediaPort = d.gb.udpPort + } else { + if d.gb.MediaPort.Valid() { + select { + case d.MediaPort = <-d.gb.udpPorts: + default: + return fmt.Errorf("no available udp port") + } + } else { + d.MediaPort = d.gb.MediaPort[0] + } } } @@ -179,7 +195,9 @@ func (d *Dialog) Start() (err error) { "a=connection:new", ) case "UDP": + /* 支持udp收流 yjx return errors.New("do not support udp mode") + */ default: sdpInfo = append(sdpInfo, "a=setup:passive", @@ -310,9 +328,9 @@ func (d *Dialog) Run() (err error) { pub := gb28181.NewPSPublisher(d.pullCtx.Publisher) if d.StreamMode == "TCP-ACTIVE" { pub.Receiver.ListenAddr = fmt.Sprintf("%s:%d", d.targetIP, d.targetPort) - } else { + } else if d.StreamMode == "TCP-PASSIVE" { if d.gb.tcpPort > 0 { - d.Info("into single port mode,use gb.tcpPort", d.gb.tcpPort) + d.Info("into single port mode, use gb.tcpPort", d.gb.tcpPort) if d.gb.netListener != nil { d.Info("use gb.netListener", d.gb.netListener.Addr()) pub.Receiver.Listener = d.gb.netListener @@ -324,14 +342,44 @@ func (d *Dialog) Run() (err error) { pub.Receiver.SSRC = d.SSRC } pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) + } else if d.StreamMode == "UDP" { + pub.Receiver.IsSinglePort = false + if d.gb.udpPort > 0 { + d.Info("into single port mode, use gb.udpPort", d.gb.udpPort) + if d.gb.netUDPListener != nil { + d.Info("use gb.netUDPListener", d.gb.netUDPListener.LocalAddr()) + pub.Receiver.ListenerUdp = d.gb.netUDPListener + } else { + d.Info("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort)) + pub.Receiver.ListenerUdp, err = util.ListenUDP(fmt.Sprintf(":%d", d.gb.udpPort), 1024*1024*4) + if err != nil { + d.Error("listen udp4", fmt.Sprintf(":%d", d.gb.udpPort), "err", err) + return errors.New("start udp listen, err" + err.Error()) + } + + d.gb.netUDPListener = pub.Receiver.ListenerUdp + } + + pub.Receiver.IsSinglePort = true + + } + pub.Receiver.SSRC = d.SSRC + pub.Receiver.ListenAddr = fmt.Sprintf(":%d", d.MediaPort) + pub.Receiver.UdpCacheSize = 10 } + pub.Receiver.StreamMode = d.StreamMode d.AddTask(&pub.Receiver) startResult := pub.Receiver.WaitStarted() if startResult != nil { return fmt.Errorf("pub.Receiver.WaitStarted %s", startResult) } + + d.gb.udpPubs.Set(pub) + pub.Demux() + + d.gb.udpPubs.Remove(pub) return } @@ -340,9 +388,17 @@ func (d *Dialog) GetKey() string { } func (d *Dialog) Dispose() { - if d.gb.tcpPort == 0 { - // 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用 - d.gb.tcpPorts <- d.MediaPort + + if d.StreamMode == "UDP" { + if d.gb.udpPort == 0 { + // 如果没有设置udp端口,则将MediaPort设置为0,表示不再使用 + d.gb.udpPorts <- d.MediaPort + } + } else { + if d.gb.tcpPort == 0 { + // 如果没有设置tcp端口,则将MediaPort设置为0,表示不再使用 + d.gb.tcpPorts <- d.MediaPort + } } d.Info("dialog dispose", "ssrc", d.SSRC, "mediaPort", d.MediaPort, "streamMode", d.StreamMode, "deviceId", d.Channel.DeviceId, "channelId", d.Channel.ChannelId) if d.session != nil { diff --git a/plugin/gb28181/index.go b/plugin/gb28181/index.go index d8afeef..a8e2606 100644 --- a/plugin/gb28181/index.go +++ b/plugin/gb28181/index.go @@ -17,6 +17,7 @@ import ( "github.com/emiago/sipgo" "github.com/emiago/sipgo/sip" + "github.com/pion/rtp" "github.com/rs/zerolog" m7s "m7s.live/v5" "m7s.live/v5/pkg/config" @@ -64,6 +65,10 @@ type GB28181Plugin struct { Platforms []*gb28181.PlatformModel channels util.Collection[string, *Channel] netListener net.Listener + udpPorts chan uint16 + udpPort uint16 + netUDPListener *net.UDPConn + udpPubs task.Manager[uint32, *gb28181.PSPublisher] } var _ = m7s.InstallPlugin[GB28181Plugin](m7s.PluginMeta{ @@ -177,15 +182,42 @@ func (gb *GB28181Plugin) OnInit() (err error) { if gb.MediaPort.Valid() { gb.SetDescription("tcp", fmt.Sprintf("%d-%d", gb.MediaPort[0], gb.MediaPort[1])) gb.tcpPorts = make(chan uint16, gb.MediaPort.Size()) + gb.udpPorts = make(chan uint16, gb.MediaPort.Size()) if gb.MediaPort.Size() == 0 { gb.tcpPort = gb.MediaPort[0] gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) + + //support udp + { + gb.udpPort = gb.MediaPort[0] + gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) + + if err != nil { + gb.Error("start listen", "err", err) + return errors.New("start udp listen, err" + err.Error()) + } + go gb.ReadUdpInsinglePort() + } } else if gb.MediaPort.Size() == 1 { gb.tcpPort = gb.MediaPort[0] + 1 gb.netListener, _ = net.Listen("tcp4", fmt.Sprintf(":%d", gb.tcpPort)) + + //support udp + { + gb.udpPort = gb.MediaPort[0] + 1 + gb.netUDPListener, err = util.ListenUDP(fmt.Sprintf(":%d", gb.udpPort), 1024*1024*4) + + if err != nil { + gb.Error("start listen", "err", err) + return errors.New("start udp listen, err" + err.Error()) + } + + go gb.ReadUdpInsinglePort() + } } else { for i := range gb.MediaPort.Size() { gb.tcpPorts <- gb.MediaPort[0] + i + gb.udpPorts <- gb.MediaPort[0] + i } } } else { @@ -1049,3 +1081,24 @@ func (gb *GB28181Plugin) OnAck(req *sip.Request, tx sip.ServerTransaction) { return } } + +func (gb *GB28181Plugin) ReadUdpInsinglePort() (err error) { + buffer := make(util.Buffer, 1024*1024) + var rtpPacket rtp.Packet + for { + n, _, err := gb.netUDPListener.ReadFromUDP(buffer) + if err != nil { + return err + } + + ps := buffer[:n] + if err := rtpPacket.Unmarshal(ps); err != nil { + continue + } + + pub, ret := gb.udpPubs.Get(rtpPacket.SSRC) + if ret { + pub.Receiver.ReadUdpRTP(buffer[:n]) + } + } +} diff --git a/plugin/gb28181/pkg/transceiver.go b/plugin/gb28181/pkg/transceiver.go index b95320e..4d41a92 100644 --- a/plugin/gb28181/pkg/transceiver.go +++ b/plugin/gb28181/pkg/transceiver.go @@ -12,6 +12,7 @@ import ( "m7s.live/v5/pkg" "m7s.live/v5/pkg/task" "m7s.live/v5/pkg/util" + "m7s.live/v5/plugin/gb28181/udputils" rtp2 "m7s.live/v5/plugin/rtp/pkg" ) @@ -36,17 +37,24 @@ var ErrRTPReceiveLost = errors.New("rtp receive lost") type Receiver struct { task.Task rtp.Packet - FeedChan chan []byte - psm util.Memory - dump *os.File - dumpLen []byte - psVideo PSVideo - psAudio PSAudio - RTPReader *rtp2.TCP - ListenAddr string - Listener net.Listener - StreamMode string // 数据流传输模式(UDP:udp传输/TCP-ACTIVE:tcp主动模式/TCP-PASSIVE:tcp被动模式) - SSRC uint32 // RTP SSRC + FeedChan chan []byte + psm util.Memory + dump *os.File + dumpLen []byte + psVideo PSVideo + psAudio PSAudio + RTPReader *rtp2.TCP + ListenAddr string + Listener net.Listener + StreamMode string // 数据流传输模式(UDP:udp传输/TCP-ACTIVE:tcp主动模式/TCP-PASSIVE:tcp被动模式) + SSRC uint32 // RTP SSRC + ListenerUdp *net.UDPConn + RTPReaderUdp *rtp2.UDP + IsSinglePort bool + SingleStop chan struct{} + udpCache *udputils.PriorityQueueRtp + UdpCacheSize int + lastSeq uint16 } func NewPSPublisher(puber *m7s.Publisher) *PSPublisher { @@ -142,6 +150,10 @@ func (dec *PSPublisher) decProgramStreamMap() (err error) { return nil } +func (p *PSPublisher) GetKey() uint32 { + return p.Receiver.SSRC +} + func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) { lastSeq := p.SequenceNumber if err = p.Unmarshal(rtp); err != nil { @@ -172,8 +184,63 @@ func (p *Receiver) ReadRTP(rtp util.Buffer) (err error) { return task.ErrTaskComplete } return + } else { + p.Error("rtp seq mismatch,", "lastSeq", lastSeq, "seq", p.SequenceNumber) + return ErrRTPReceiveLost } - return ErrRTPReceiveLost + +} + +func (p *Receiver) ReadUdpRTP(rtp util.Buffer) (err error) { + //解析rtp + if err = p.Unmarshal(rtp); err != nil { + p.Error("unmarshal error", "err", err) + return nil + } + //判断ssrc + if p.SSRC != 0 && p.SSRC != p.Packet.SSRC { + p.Info("ReadUdpRTP, ssrc mismatch", "expected", p.SSRC, "actual", p.Packet.SSRC) + if p.TraceEnabled() { + p.Trace("rtp ssrc mismatch, skip", "expected", p.SSRC, "actual", p.Packet.SSRC) + } + return nil + } + + if p.UdpCacheSize > 0 && p.udpCache == nil { + p.udpCache = udputils.NewPqRtp() + } + //p.Info("ReadUdpRTP, seq", "rtpSeq", p.SequenceNumber) + //加入缓存,自动排序 + rtpTmpCache := p.Packet + rtpTmpCache.Payload = make([]byte, len(p.Payload)) + copy(rtpTmpCache.Payload, p.Payload) + p.udpCache.Push(rtpTmpCache) + + rtpTmp := p.Packet + if p.udpCache.Len() < p.UdpCacheSize-1 { + return nil + } else { + rtpTmp, _ = p.udpCache.Pop() + } + + //p.Info("ReadUdpRTP, seq", "rtpTmpSeq", rtpTmp.SequenceNumber) + + p.lastSeq = rtpTmp.SequenceNumber + + if p.TraceEnabled() { + p.Trace("rtp", "len", rtp.Len(), "seq", p.SequenceNumber, "payloadType", p.PayloadType, "ssrc", p.Packet.SSRC) + } + + copyData := make([]byte, len(rtpTmp.Payload)) + copy(copyData, rtpTmp.Payload) + select { + case p.FeedChan <- copyData: + // 成功发送数据 + case <-p.Done(): + // 任务已停止,返回错误 + return task.ErrTaskComplete + } + return nil } func (p *Receiver) Start() (err error) { @@ -181,17 +248,28 @@ func (p *Receiver) Start() (err error) { // TCP主动模式不需要监听,直接返回 p.Info("TCP-ACTIVE mode, no need to listen") return nil - } - // TCP被动模式 - if p.Listener == nil { - p.Info("start new listener", "addr", p.ListenAddr) - p.Listener, err = net.Listen("tcp4", p.ListenAddr) - if err != nil { - p.Error("start listen", "err", err) - return errors.New("start listen,err" + err.Error()) + } else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" { + // TCP被动模式 + if p.Listener == nil { + p.Info("start new listener", "addr", p.ListenAddr) + p.Listener, err = net.Listen("tcp4", p.ListenAddr) + if err != nil { + p.Error("start listen", "err", err) + return errors.New("start listen,err" + err.Error()) + } + } + p.Info("start listen", "addr", p.ListenAddr) + } else { + if p.ListenerUdp == nil { + p.Info("start new listener", "addr", p.ListenAddr) + + p.ListenerUdp, err = util.ListenUDP(p.ListenAddr, 1024*1024*10) + if err != nil { + p.Error("start listen", "err", err) + return errors.New("start listen,err" + err.Error()) + } } } - p.Info("start listen", "addr", p.ListenAddr) return } @@ -205,6 +283,13 @@ func (p *Receiver) Dispose() { if p.RTPReader != nil { p.RTPReader.Close() } + if p.ListenerUdp != nil && !p.IsSinglePort { + p.ListenerUdp.Close() + } + if p.IsSinglePort { + close(p.SingleStop) + } + if p.FeedChan != nil { close(p.FeedChan) } @@ -230,15 +315,27 @@ func (p *Receiver) Go() error { p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn)) p.Info("connected to device", "addr", conn.RemoteAddr()) return p.RTPReader.Read(p.ReadRTP) + } else if strings.ToUpper(p.StreamMode) == "TCP-PASSIVE" { // TCP被动模式 + p.Info("start accept") + conn, err := p.Listener.Accept() + if err != nil { + p.Error("accept", "err", err) + return err + } + p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn)) + p.Info("accept", "addr", conn.RemoteAddr()) + return p.RTPReader.Read(p.ReadRTP) + } else { //UDP模式 + if p.IsSinglePort { + p.SingleStop = make(chan struct{}) + <-p.SingleStop + p.Info("stop udp accept", "ssrc", p.SSRC) + return nil + + } else { + p.Info("start udp accept") + p.RTPReaderUdp = (*rtp2.UDP)(p.ListenerUdp) + return p.RTPReaderUdp.Read(p.ReadUdpRTP) + } } - // TCP被动模式 - p.Info("start accept") - conn, err := p.Listener.Accept() - if err != nil { - p.Error("accept", "err", err) - return err - } - p.RTPReader = (*rtp2.TCP)(conn.(*net.TCPConn)) - p.Info("accept", "addr", conn.RemoteAddr()) - return p.RTPReader.Read(p.ReadRTP) } diff --git a/plugin/gb28181/udputils/rtp_sort.go b/plugin/gb28181/udputils/rtp_sort.go new file mode 100644 index 0000000..7b90872 --- /dev/null +++ b/plugin/gb28181/udputils/rtp_sort.go @@ -0,0 +1,98 @@ +package udputils + +import ( + "container/heap" + "errors" + + "github.com/pion/rtp" +) + +const MaxRtpDiff = 65000 //相邻两个包之间的最大差值 + +type PriorityQueueRtp struct { + itemHeap *packets + current *rtp.Packet + priorityMap map[uint16]bool + lastPacket *rtp.Packet +} + +func NewPqRtp() *PriorityQueueRtp { + return &PriorityQueueRtp{ + itemHeap: &packets{}, + priorityMap: make(map[uint16]bool), + } +} + +func (p *PriorityQueueRtp) Len() int { + return p.itemHeap.Len() +} + +func (p *PriorityQueueRtp) Push(v rtp.Packet) { + if p.priorityMap[v.SequenceNumber] { + return + } + newItem := &packet{ + value: v, + priority: v.SequenceNumber, + } + heap.Push(p.itemHeap, newItem) +} + +func (p *PriorityQueueRtp) Pop() (rtp.Packet, error) { + if len(*p.itemHeap) == 0 { + return rtp.Packet{}, errors.New("empty queue") + } + + item := heap.Pop(p.itemHeap).(*packet) + return item.value, nil +} + +func (p *PriorityQueueRtp) Empty() { + old := *p.itemHeap + *p.itemHeap = old[:0] +} + +type packets []*packet + +type packet struct { + value rtp.Packet + priority uint16 + index int +} + +func (p *packets) Len() int { + return len(*p) +} + +func (p *packets) Less(i, j int) bool { + a, b := (*p)[i].priority, (*p)[j].priority + if int(a)-int(b) > MaxRtpDiff || int(b)-int(a) > MaxRtpDiff { + if a < b { + return false + } + return true + } + return a < b +} + +func (p *packets) Swap(i, j int) { + (*p)[i], (*p)[j] = (*p)[j], (*p)[i] + (*p)[i].index = i + (*p)[j].index = j +} + +func (p *packets) Push(x interface{}) { + it := x.(*packet) + it.index = len(*p) + *p = append(*p, it) +} + +func (p *packets) Pop() interface{} { + old := *p + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *p = old[0 : n-1] + return item +} diff --git a/plugin/rtp/pkg/udp.go b/plugin/rtp/pkg/udp.go new file mode 100644 index 0000000..217b020 --- /dev/null +++ b/plugin/rtp/pkg/udp.go @@ -0,0 +1,25 @@ +package rtp + +import ( + "net" + + "m7s.live/v5/pkg/util" +) + +type UDP net.UDPConn + +func (t *UDP) Read(onRTP func(util.Buffer) error) (err error) { + buffer := make(util.Buffer, 1024*1024) + + for { + n, _, err := (*net.UDPConn)(t).ReadFromUDP(buffer) + if err != nil { + return err + } + + err = onRTP(buffer[:n]) + if err != nil { + //return err + } + } +}