improve UDP performance

This commit is contained in:
aler9
2021-11-12 21:56:32 +01:00
parent b89bb8d08e
commit ed2f712db2
3 changed files with 106 additions and 110 deletions

View File

@@ -736,66 +736,52 @@ func (c *Client) runReader() error {
} }
} }
} else { } else {
var processFunc func(int, bool, []byte)
if c.state == clientStatePlay { if c.state == clientStatePlay {
for { processFunc = func(trackID int, isRTP bool, payload []byte) {
frame := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
return err
}
channel := frame.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
trackID, ok := c.tracksByChannel[channel]
if !ok {
continue
}
now := time.Now() now := time.Now()
atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix()) atomic.StoreInt64(&c.tcpLastFrameTime, now.Unix())
if isRTP { if isRTP {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, frame.Payload) c.tracks[trackID].rtcpReceiver.ProcessPacketRTP(now, payload)
c.OnPacketRTP(trackID, frame.Payload) c.OnPacketRTP(trackID, payload)
} else { } else {
c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, frame.Payload) c.tracks[trackID].rtcpReceiver.ProcessPacketRTCP(now, payload)
c.OnPacketRTCP(trackID, frame.Payload) c.OnPacketRTCP(trackID, payload)
} }
} }
} else { // Record } else {
for { processFunc = func(trackID int, isRTP bool, payload []byte) {
frame := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
return err
}
channel := frame.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
trackID, ok := c.tracksByChannel[channel]
if !ok {
continue
}
if !isRTP { if !isRTP {
c.OnPacketRTCP(trackID, frame.Payload) c.OnPacketRTCP(trackID, payload)
} }
} }
} }
for {
frame := base.InterleavedFrame{
Payload: c.tcpFrameBuffer.Next(),
}
err := frame.Read(c.br)
if err != nil {
return err
}
channel := frame.Channel
isRTP := true
if (channel % 2) != 0 {
channel--
isRTP = false
}
trackID, ok := c.tracksByChannel[channel]
if !ok {
continue
}
processFunc(trackID, isRTP, frame.Payload)
}
} }
} }

View File

@@ -41,6 +41,7 @@ type clientUDPListener struct {
frameBuffer *multibuffer.MultiBuffer frameBuffer *multibuffer.MultiBuffer
lastFrameTime *int64 lastFrameTime *int64
writeMutex sync.Mutex writeMutex sync.Mutex
processFunc func(time.Time, []byte)
// out // out
done chan struct{} done chan struct{}
@@ -136,6 +137,16 @@ func (l *clientUDPListener) port() int {
} }
func (l *clientUDPListener) start() { func (l *clientUDPListener) start() {
if l.c.state == clientStatePlay {
if l.isRTP {
l.processFunc = l.processPlayRTP
} else {
l.processFunc = l.processPlayRTCP
}
} else {
l.processFunc = l.processRecord
}
l.running = true l.running = true
l.pc.SetReadDeadline(time.Time{}) l.pc.SetReadDeadline(time.Time{})
l.done = make(chan struct{}) l.done = make(chan struct{})
@@ -150,52 +161,40 @@ func (l *clientUDPListener) stop() {
func (l *clientUDPListener) run() { func (l *clientUDPListener) run() {
defer close(l.done) defer close(l.done)
if l.c.state == clientStatePlay { for {
for { buf := l.frameBuffer.Next()
buf := l.frameBuffer.Next() n, addr, err := l.pc.ReadFrom(buf)
n, addr, err := l.pc.ReadFrom(buf) if err != nil {
if err != nil { return
return
}
uaddr := addr.(*net.UDPAddr)
if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) {
continue
}
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
if l.isRTP {
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n])
l.c.OnPacketRTP(l.trackID, buf[:n])
} else {
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n])
l.c.OnPacketRTCP(l.trackID, buf[:n])
}
} }
} else { // record
for {
buf := l.frameBuffer.Next()
n, addr, err := l.pc.ReadFrom(buf)
if err != nil {
return
}
uaddr := addr.(*net.UDPAddr) uaddr := addr.(*net.UDPAddr)
if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) { if !l.remoteReadIP.Equal(uaddr.IP) || (!isAnyPort(l.remotePort) && l.remotePort != uaddr.Port) {
continue continue
}
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
l.c.OnPacketRTCP(l.trackID, buf[:n])
} }
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
l.processFunc(now, buf[:n])
} }
} }
func (l *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTP(now, payload)
l.c.OnPacketRTP(l.trackID, payload)
}
func (l *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) {
l.c.tracks[l.trackID].rtcpReceiver.ProcessPacketRTCP(now, payload)
l.c.OnPacketRTCP(l.trackID, payload)
}
func (l *clientUDPListener) processRecord(now time.Time, payload []byte) {
l.c.OnPacketRTCP(l.trackID, payload)
}
func (l *clientUDPListener) write(buf []byte) error { func (l *clientUDPListener) write(buf []byte) error {
l.writeMutex.Lock() l.writeMutex.Lock()
defer l.writeMutex.Unlock() defer l.writeMutex.Unlock()

View File

@@ -60,6 +60,7 @@ type serverUDPListener struct {
clientsMutex sync.RWMutex clientsMutex sync.RWMutex
clients map[clientAddr]*clientData clients map[clientAddr]*clientData
ringBuffer *ringbuffer.RingBuffer ringBuffer *ringbuffer.RingBuffer
processFunc func(time.Time, *clientData, []byte)
} }
func newServerUDPListenerMulticastPair(s *Server) (*serverUDPListener, *serverUDPListener, error) { func newServerUDPListenerMulticastPair(s *Server) (*serverUDPListener, *serverUDPListener, error) {
@@ -157,6 +158,12 @@ func newServerUDPListener(
ringBuffer: ringbuffer.New(uint64(s.ReadBufferCount)), ringBuffer: ringbuffer.New(uint64(s.ReadBufferCount)),
} }
if isRTP {
u.processFunc = u.processRTP
} else {
u.processFunc = u.processRTCP
}
u.wg.Add(1) u.wg.Add(1)
go u.run() go u.run()
@@ -206,29 +213,7 @@ func (u *serverUDPListener) run() {
atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix()) atomic.StoreInt64(clientData.ss.udpLastFrameTime, now.Unix())
} }
if u.isRTP { u.processFunc(now, clientData, buf[:n])
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, buf[:n])
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok {
h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{
Session: clientData.ss,
TrackID: clientData.trackID,
Payload: buf[:n],
})
}
} else {
if clientData.isPublishing {
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, buf[:n])
}
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok {
h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{
Session: clientData.ss,
TrackID: clientData.trackID,
Payload: buf[:n],
})
}
}
}() }()
} }
}() }()
@@ -255,6 +240,32 @@ func (u *serverUDPListener) run() {
u.ringBuffer.Close() u.ringBuffer.Close()
} }
func (u *serverUDPListener) processRTP(now time.Time, clientData *clientData, payload []byte) {
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTP(now, payload)
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok {
h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{
Session: clientData.ss,
TrackID: clientData.trackID,
Payload: payload,
})
}
}
func (u *serverUDPListener) processRTCP(now time.Time, clientData *clientData, payload []byte) {
if clientData.isPublishing {
clientData.ss.announcedTracks[clientData.trackID].rtcpReceiver.ProcessPacketRTCP(now, payload)
}
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok {
h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{
Session: clientData.ss,
TrackID: clientData.trackID,
Payload: payload,
})
}
}
func (u *serverUDPListener) write(buf []byte, addr *net.UDPAddr) { func (u *serverUDPListener) write(buf []byte, addr *net.UDPAddr) {
u.ringBuffer.Push(bufAddrPair{buf, addr}) u.ringBuffer.Push(bufAddrPair{buf, addr})
} }