From 94aaa6719dbaace1b5ac0855536bf8a0b8c83f23 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 7 Mar 2022 22:36:19 +0100 Subject: [PATCH] client: fix support for ReadBufferCount > 1 --- client.go | 1 - clientudpl.go | 85 +++++++++++++++++++++-------------------- rtppacketmultibuffer.go | 24 ++++++++++++ serverudpl.go | 21 ---------- 4 files changed, 67 insertions(+), 64 deletions(-) create mode 100644 rtppacketmultibuffer.go diff --git a/client.go b/client.go index 695ec38d..1f505426 100644 --- a/client.go +++ b/client.go @@ -752,7 +752,6 @@ func (c *Client) runReader() { if c.state == clientStatePlay { tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) - tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) processFunc = func(trackID int, isRTP bool, payload []byte) { diff --git a/clientudpl.go b/clientudpl.go index 448e9f57..4dabc332 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -8,7 +8,6 @@ import ( "time" "github.com/pion/rtcp" - "github.com/pion/rtp" "golang.org/x/net/ipv4" "github.com/aler9/gortsplib/pkg/multibuffer" @@ -35,9 +34,9 @@ type clientUDPListener struct { isRTP bool running bool readBuffer *multibuffer.MultiBuffer + rtpPacketBuffer *rtpPacketMultiBuffer lastPacketTime *int64 processFunc func(time.Time, []byte) - rtpPkt rtp.Packet readerDone chan struct{} } @@ -110,9 +109,10 @@ func newClientUDPListener(c *Client, multicast bool, address string) (*clientUDP } return &clientUDPListener{ - c: c, - pc: pc, - readBuffer: multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)), + c: c, + pc: pc, + readBuffer: multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)), + rtpPacketBuffer: newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)), lastPacketTime: func() *int64 { v := int64(0) return &v @@ -120,100 +120,101 @@ func newClientUDPListener(c *Client, multicast bool, address string) (*clientUDP }, nil } -func (l *clientUDPListener) close() { - if l.running { - l.stop() +func (u *clientUDPListener) close() { + if u.running { + u.stop() } - l.pc.Close() + u.pc.Close() } -func (l *clientUDPListener) port() int { - return l.pc.LocalAddr().(*net.UDPAddr).Port +func (u *clientUDPListener) port() int { + return u.pc.LocalAddr().(*net.UDPAddr).Port } -func (l *clientUDPListener) start(forPlay bool) { +func (u *clientUDPListener) start(forPlay bool) { if forPlay { - if l.isRTP { - l.processFunc = l.processPlayRTP + if u.isRTP { + u.processFunc = u.processPlayRTP } else { - l.processFunc = l.processPlayRTCP + u.processFunc = u.processPlayRTCP } } else { - l.processFunc = l.processRecordRTCP + u.processFunc = u.processRecordRTCP } - l.running = true - l.pc.SetReadDeadline(time.Time{}) - l.readerDone = make(chan struct{}) - go l.runReader() + u.running = true + u.pc.SetReadDeadline(time.Time{}) + u.readerDone = make(chan struct{}) + go u.runReader() } -func (l *clientUDPListener) stop() { - l.pc.SetReadDeadline(time.Now()) - <-l.readerDone +func (u *clientUDPListener) stop() { + u.pc.SetReadDeadline(time.Now()) + <-u.readerDone } -func (l *clientUDPListener) runReader() { - defer close(l.readerDone) +func (u *clientUDPListener) runReader() { + defer close(u.readerDone) for { - buf := l.readBuffer.Next() - n, addr, err := l.pc.ReadFrom(buf) + buf := u.readBuffer.Next() + n, addr, err := u.pc.ReadFrom(buf) if err != nil { return } uaddr := addr.(*net.UDPAddr) - if !l.remoteReadIP.Equal(uaddr.IP) || (!l.c.AnyPortEnable && l.remotePort != uaddr.Port) { + if !u.remoteReadIP.Equal(uaddr.IP) || (!u.c.AnyPortEnable && u.remotePort != uaddr.Port) { continue } now := time.Now() - atomic.StoreInt64(l.lastPacketTime, now.Unix()) + atomic.StoreInt64(u.lastPacketTime, now.Unix()) - l.processFunc(now, buf[:n]) + u.processFunc(now, buf[:n]) } } -func (l *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { - err := l.rtpPkt.Unmarshal(payload) +func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { + pkt := u.rtpPacketBuffer.next() + err := pkt.Unmarshal(payload) if err != nil { return } - l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, &l.rtpPkt) - l.c.OnPacketRTP(l.trackID, &l.rtpPkt) + u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTP(now, pkt) + u.c.OnPacketRTP(u.trackID, pkt) } -func (l *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { +func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { packets, err := rtcp.Unmarshal(payload) if err != nil { return } for _, pkt := range packets { - l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt) - l.c.OnPacketRTCP(l.trackID, pkt) + u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt) + u.c.OnPacketRTCP(u.trackID, pkt) } } -func (l *clientUDPListener) processRecordRTCP(now time.Time, payload []byte) { +func (u *clientUDPListener) processRecordRTCP(now time.Time, payload []byte) { packets, err := rtcp.Unmarshal(payload) if err != nil { return } for _, pkt := range packets { - l.c.OnPacketRTCP(l.trackID, pkt) + u.c.OnPacketRTCP(u.trackID, pkt) } } -func (l *clientUDPListener) write(payload []byte) error { +func (u *clientUDPListener) write(payload []byte) error { // no mutex is needed here since Write() has an internal lock. // https://github.com/golang/go/issues/27203#issuecomment-534386117 - l.pc.SetWriteDeadline(time.Now().Add(l.c.WriteTimeout)) - _, err := l.pc.WriteTo(payload, l.remoteWriteAddr) + u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout)) + _, err := u.pc.WriteTo(payload, u.remoteWriteAddr) return err } diff --git a/rtppacketmultibuffer.go b/rtppacketmultibuffer.go new file mode 100644 index 00000000..a36f2233 --- /dev/null +++ b/rtppacketmultibuffer.go @@ -0,0 +1,24 @@ +package gortsplib + +import ( + "github.com/pion/rtp" +) + +type rtpPacketMultiBuffer struct { + count uint64 + buffers []rtp.Packet + cur uint64 +} + +func newRTPPacketMultiBuffer(count uint64) *rtpPacketMultiBuffer { + return &rtpPacketMultiBuffer{ + count: count, + buffers: make([]rtp.Packet, count), + } +} + +func (mb *rtpPacketMultiBuffer) next() *rtp.Packet { + ret := &mb.buffers[mb.cur%mb.count] + mb.cur++ + return ret +} diff --git a/serverudpl.go b/serverudpl.go index 3531647d..e7fbe7b7 100644 --- a/serverudpl.go +++ b/serverudpl.go @@ -9,32 +9,11 @@ import ( "time" "github.com/pion/rtcp" - "github.com/pion/rtp" "golang.org/x/net/ipv4" "github.com/aler9/gortsplib/pkg/multibuffer" ) -type rtpPacketMultiBuffer struct { - count uint64 - buffers []rtp.Packet - cur uint64 -} - -func newRTPPacketMultiBuffer(count uint64) *rtpPacketMultiBuffer { - buffers := make([]rtp.Packet, count) - return &rtpPacketMultiBuffer{ - count: count, - buffers: buffers, - } -} - -func (mb *rtpPacketMultiBuffer) next() *rtp.Packet { - ret := &mb.buffers[mb.cur%mb.count] - mb.cur++ - return ret -} - type clientData struct { ss *ServerSession trackID int