improve performance of multicast writer (#385)

This commit is contained in:
Alessandro Ros
2023-08-26 12:32:09 +02:00
committed by GitHub
parent dffc241e63
commit e87c6d66e5

View File

@@ -2,21 +2,14 @@ package gortsplib
import ( import (
"net" "net"
"github.com/bluenviron/gortsplib/v3/pkg/ringbuffer"
) )
type typeAndPayload struct {
isRTP bool
payload []byte
}
type serverMulticastWriter struct { type serverMulticastWriter struct {
rtpl *serverUDPListener rtpl *serverUDPListener
rtcpl *serverUDPListener rtcpl *serverUDPListener
writeBuffer *ringbuffer.RingBuffer writer asyncProcessor
rtpAddr *net.UDPAddr
writerDone chan struct{} rtcpAddr *net.UDPAddr
} }
func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) {
@@ -36,16 +29,25 @@ func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) {
return nil, err return nil, err
} }
wb, _ := ringbuffer.New(uint64(s.WriteBufferCount)) rtpAddr := &net.UDPAddr{
IP: rtpl.ip(),
Port: rtpl.port(),
}
rtcpAddr := &net.UDPAddr{
IP: rtcpl.ip(),
Port: rtcpl.port(),
}
h := &serverMulticastWriter{ h := &serverMulticastWriter{
rtpl: rtpl, rtpl: rtpl,
rtcpl: rtcpl, rtcpl: rtcpl,
writeBuffer: wb, rtpAddr: rtpAddr,
writerDone: make(chan struct{}), rtcpAddr: rtcpAddr,
} }
go h.runWriter() h.writer.allocateBuffer(s.WriteBufferCount)
h.writer.start()
return h, nil return h, nil
} }
@@ -53,52 +55,21 @@ func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) {
func (h *serverMulticastWriter) close() { func (h *serverMulticastWriter) close() {
h.rtpl.close() h.rtpl.close()
h.rtcpl.close() h.rtcpl.close()
h.writeBuffer.Close() h.writer.stop()
<-h.writerDone
} }
func (h *serverMulticastWriter) ip() net.IP { func (h *serverMulticastWriter) ip() net.IP {
return h.rtpl.ip() return h.rtpl.ip()
} }
func (h *serverMulticastWriter) runWriter() {
defer close(h.writerDone)
rtpAddr := &net.UDPAddr{
IP: h.rtpl.ip(),
Port: h.rtpl.port(),
}
rtcpAddr := &net.UDPAddr{
IP: h.rtcpl.ip(),
Port: h.rtcpl.port(),
}
for {
tmp, ok := h.writeBuffer.Pull()
if !ok {
return
}
data := tmp.(typeAndPayload)
if data.isRTP {
h.rtpl.write(data.payload, rtpAddr) //nolint:errcheck
} else {
h.rtcpl.write(data.payload, rtcpAddr) //nolint:errcheck
}
}
}
func (h *serverMulticastWriter) writePacketRTP(payload []byte) { func (h *serverMulticastWriter) writePacketRTP(payload []byte) {
h.writeBuffer.Push(typeAndPayload{ h.writer.queue(func() {
isRTP: true, h.rtpl.write(payload, h.rtpAddr) //nolint:errcheck
payload: payload,
}) })
} }
func (h *serverMulticastWriter) writePacketRTCP(payload []byte) { func (h *serverMulticastWriter) writePacketRTCP(payload []byte) {
h.writeBuffer.Push(typeAndPayload{ h.writer.queue(func() {
isRTP: false, h.rtcpl.write(payload, h.rtcpAddr) //nolint:errcheck
payload: payload,
}) })
} }