diff --git a/server_multicast_writer.go b/server_multicast_writer.go index b32d5a3c..b0b95763 100644 --- a/server_multicast_writer.go +++ b/server_multicast_writer.go @@ -2,21 +2,14 @@ package gortsplib import ( "net" - - "github.com/bluenviron/gortsplib/v3/pkg/ringbuffer" ) -type typeAndPayload struct { - isRTP bool - payload []byte -} - type serverMulticastWriter struct { - rtpl *serverUDPListener - rtcpl *serverUDPListener - writeBuffer *ringbuffer.RingBuffer - - writerDone chan struct{} + rtpl *serverUDPListener + rtcpl *serverUDPListener + writer asyncProcessor + rtpAddr *net.UDPAddr + rtcpAddr *net.UDPAddr } func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { @@ -36,16 +29,25 @@ func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { return nil, err } - wb, _ := ringbuffer.New(uint64(s.WriteBufferCount)) - - h := &serverMulticastWriter{ - rtpl: rtpl, - rtcpl: rtcpl, - writeBuffer: wb, - writerDone: make(chan struct{}), + rtpAddr := &net.UDPAddr{ + IP: rtpl.ip(), + Port: rtpl.port(), } - go h.runWriter() + rtcpAddr := &net.UDPAddr{ + IP: rtcpl.ip(), + Port: rtcpl.port(), + } + + h := &serverMulticastWriter{ + rtpl: rtpl, + rtcpl: rtcpl, + rtpAddr: rtpAddr, + rtcpAddr: rtcpAddr, + } + + h.writer.allocateBuffer(s.WriteBufferCount) + h.writer.start() return h, nil } @@ -53,52 +55,21 @@ func newServerMulticastWriter(s *Server) (*serverMulticastWriter, error) { func (h *serverMulticastWriter) close() { h.rtpl.close() h.rtcpl.close() - h.writeBuffer.Close() - <-h.writerDone + h.writer.stop() } func (h *serverMulticastWriter) ip() net.IP { return h.rtpl.ip() } -func (h *serverMulticastWriter) runWriter() { - defer close(h.writerDone) - - rtpAddr := &net.UDPAddr{ - IP: h.rtpl.ip(), - Port: h.rtpl.port(), - } - - rtcpAddr := &net.UDPAddr{ - IP: h.rtcpl.ip(), - Port: h.rtcpl.port(), - } - - for { - tmp, ok := h.writeBuffer.Pull() - if !ok { - return - } - data := tmp.(typeAndPayload) - - if data.isRTP { - h.rtpl.write(data.payload, rtpAddr) //nolint:errcheck - } else { - h.rtcpl.write(data.payload, rtcpAddr) //nolint:errcheck - } - } -} - func (h *serverMulticastWriter) writePacketRTP(payload []byte) { - h.writeBuffer.Push(typeAndPayload{ - isRTP: true, - payload: payload, + h.writer.queue(func() { + h.rtpl.write(payload, h.rtpAddr) //nolint:errcheck }) } func (h *serverMulticastWriter) writePacketRTCP(payload []byte) { - h.writeBuffer.Push(typeAndPayload{ - isRTP: false, - payload: payload, + h.writer.queue(func() { + h.rtcpl.write(payload, h.rtcpAddr) //nolint:errcheck }) }