client: fix support for ReadBufferCount > 1

This commit is contained in:
aler9
2022-03-07 22:36:19 +01:00
parent b399656856
commit 94aaa6719d
4 changed files with 67 additions and 64 deletions

View File

@@ -752,7 +752,6 @@ func (c *Client) runReader() {
if c.state == clientStatePlay { if c.state == clientStatePlay {
tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)) tcpReadBuffer = multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize))
tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)) tcpRTPPacketBuffer := newRTPPacketMultiBuffer(uint64(c.ReadBufferCount))
processFunc = func(trackID int, isRTP bool, payload []byte) { processFunc = func(trackID int, isRTP bool, payload []byte) {

View File

@@ -8,7 +8,6 @@ import (
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/multibuffer"
@@ -35,9 +34,9 @@ type clientUDPListener struct {
isRTP bool isRTP bool
running bool running bool
readBuffer *multibuffer.MultiBuffer readBuffer *multibuffer.MultiBuffer
rtpPacketBuffer *rtpPacketMultiBuffer
lastPacketTime *int64 lastPacketTime *int64
processFunc func(time.Time, []byte) processFunc func(time.Time, []byte)
rtpPkt rtp.Packet
readerDone chan struct{} readerDone chan struct{}
} }
@@ -113,6 +112,7 @@ func newClientUDPListener(c *Client, multicast bool, address string) (*clientUDP
c: c, c: c,
pc: pc, pc: pc,
readBuffer: multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)), readBuffer: multibuffer.New(uint64(c.ReadBufferCount), uint64(c.ReadBufferSize)),
rtpPacketBuffer: newRTPPacketMultiBuffer(uint64(c.ReadBufferCount)),
lastPacketTime: func() *int64 { lastPacketTime: func() *int64 {
v := int64(0) v := int64(0)
return &v return &v
@@ -120,100 +120,101 @@ func newClientUDPListener(c *Client, multicast bool, address string) (*clientUDP
}, nil }, nil
} }
func (l *clientUDPListener) close() { func (u *clientUDPListener) close() {
if l.running { if u.running {
l.stop() u.stop()
} }
l.pc.Close() u.pc.Close()
} }
func (l *clientUDPListener) port() int { func (u *clientUDPListener) port() int {
return l.pc.LocalAddr().(*net.UDPAddr).Port return u.pc.LocalAddr().(*net.UDPAddr).Port
} }
func (l *clientUDPListener) start(forPlay bool) { func (u *clientUDPListener) start(forPlay bool) {
if forPlay { if forPlay {
if l.isRTP { if u.isRTP {
l.processFunc = l.processPlayRTP u.processFunc = u.processPlayRTP
} else { } else {
l.processFunc = l.processPlayRTCP u.processFunc = u.processPlayRTCP
} }
} else { } else {
l.processFunc = l.processRecordRTCP u.processFunc = u.processRecordRTCP
} }
l.running = true u.running = true
l.pc.SetReadDeadline(time.Time{}) u.pc.SetReadDeadline(time.Time{})
l.readerDone = make(chan struct{}) u.readerDone = make(chan struct{})
go l.runReader() go u.runReader()
} }
func (l *clientUDPListener) stop() { func (u *clientUDPListener) stop() {
l.pc.SetReadDeadline(time.Now()) u.pc.SetReadDeadline(time.Now())
<-l.readerDone <-u.readerDone
} }
func (l *clientUDPListener) runReader() { func (u *clientUDPListener) runReader() {
defer close(l.readerDone) defer close(u.readerDone)
for { for {
buf := l.readBuffer.Next() buf := u.readBuffer.Next()
n, addr, err := l.pc.ReadFrom(buf) n, addr, err := u.pc.ReadFrom(buf)
if err != nil { if err != nil {
return return
} }
uaddr := addr.(*net.UDPAddr) uaddr := addr.(*net.UDPAddr)
if !l.remoteReadIP.Equal(uaddr.IP) || (!l.c.AnyPortEnable && l.remotePort != uaddr.Port) { if !u.remoteReadIP.Equal(uaddr.IP) || (!u.c.AnyPortEnable && u.remotePort != uaddr.Port) {
continue continue
} }
now := time.Now() now := time.Now()
atomic.StoreInt64(l.lastPacketTime, now.Unix()) atomic.StoreInt64(u.lastPacketTime, now.Unix())
l.processFunc(now, buf[:n]) u.processFunc(now, buf[:n])
} }
} }
func (l *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
err := l.rtpPkt.Unmarshal(payload) pkt := u.rtpPacketBuffer.next()
err := pkt.Unmarshal(payload)
if err != nil { if err != nil {
return return
} }
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, &l.rtpPkt) u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTP(now, pkt)
l.c.OnPacketRTP(l.trackID, &l.rtpPkt) u.c.OnPacketRTP(u.trackID, pkt)
} }
func (l *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) {
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
return return
} }
for _, pkt := range packets { for _, pkt := range packets {
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt) u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt)
l.c.OnPacketRTCP(l.trackID, pkt) u.c.OnPacketRTCP(u.trackID, pkt)
} }
} }
func (l *clientUDPListener) processRecordRTCP(now time.Time, payload []byte) { func (u *clientUDPListener) processRecordRTCP(now time.Time, payload []byte) {
packets, err := rtcp.Unmarshal(payload) packets, err := rtcp.Unmarshal(payload)
if err != nil { if err != nil {
return return
} }
for _, pkt := range packets { for _, pkt := range packets {
l.c.OnPacketRTCP(l.trackID, pkt) u.c.OnPacketRTCP(u.trackID, pkt)
} }
} }
func (l *clientUDPListener) write(payload []byte) error { func (u *clientUDPListener) write(payload []byte) error {
// no mutex is needed here since Write() has an internal lock. // no mutex is needed here since Write() has an internal lock.
// https://github.com/golang/go/issues/27203#issuecomment-534386117 // https://github.com/golang/go/issues/27203#issuecomment-534386117
l.pc.SetWriteDeadline(time.Now().Add(l.c.WriteTimeout)) u.pc.SetWriteDeadline(time.Now().Add(u.c.WriteTimeout))
_, err := l.pc.WriteTo(payload, l.remoteWriteAddr) _, err := u.pc.WriteTo(payload, u.remoteWriteAddr)
return err return err
} }

24
rtppacketmultibuffer.go Normal file
View File

@@ -0,0 +1,24 @@
package gortsplib
import (
"github.com/pion/rtp"
)
type rtpPacketMultiBuffer struct {
count uint64
buffers []rtp.Packet
cur uint64
}
func newRTPPacketMultiBuffer(count uint64) *rtpPacketMultiBuffer {
return &rtpPacketMultiBuffer{
count: count,
buffers: make([]rtp.Packet, count),
}
}
func (mb *rtpPacketMultiBuffer) next() *rtp.Packet {
ret := &mb.buffers[mb.cur%mb.count]
mb.cur++
return ret
}

View File

@@ -9,32 +9,11 @@ import (
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp"
"golang.org/x/net/ipv4" "golang.org/x/net/ipv4"
"github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/multibuffer"
) )
type rtpPacketMultiBuffer struct {
count uint64
buffers []rtp.Packet
cur uint64
}
func newRTPPacketMultiBuffer(count uint64) *rtpPacketMultiBuffer {
buffers := make([]rtp.Packet, count)
return &rtpPacketMultiBuffer{
count: count,
buffers: buffers,
}
}
func (mb *rtpPacketMultiBuffer) next() *rtp.Packet {
ret := &mb.buffers[mb.cur%mb.count]
mb.cur++
return ret
}
type clientData struct { type clientData struct {
ss *ServerSession ss *ServerSession
trackID int trackID int