diff --git a/client.go b/client.go index 2d7df98b..653b2a8d 100644 --- a/client.go +++ b/client.go @@ -13,7 +13,6 @@ import ( "net" "strconv" "strings" - "sync" "sync/atomic" "time" @@ -243,8 +242,7 @@ type Client struct { tcpLastFrameTime *int64 keepaliveTimer *time.Timer closeError error - writer clientWriter - writeMutex sync.RWMutex + writer writer // connCloser channels connCloserTerminate chan struct{} @@ -649,14 +647,21 @@ func (c *Client) playRecordStart() { } } + if c.state == clientStatePlay { + // when reading, buffer is only used to send RTCP receiver reports, + // that are much smaller than RTP packets and are sent at a fixed interval. + // decrease RAM consumption by allocating less buffers. + c.writer.allocateBuffer(8) + } else { + c.writer.allocateBuffer(c.WriteBufferCount) + } + + c.writer.start() + for _, cm := range c.medias { cm.start() } - c.writeMutex.Lock() - c.writer.start(c) - c.writeMutex.Unlock() - // for some reason, SetReadDeadline() must always be called in the same // goroutine, otherwise Read() freezes. // therefore, we disable the deadline and perform a check with a ticker. @@ -721,9 +726,7 @@ func (c *Client) playRecordStop(isClosing bool) { c.checkStreamTimer = emptyTimer() c.keepaliveTimer = emptyTimer() - c.writeMutex.Lock() c.writer.stop() - c.writeMutex.Unlock() for _, cm := range c.medias { cm.stop() diff --git a/clientmedia.go b/clientmedia.go index 8cf04926..a998008c 100644 --- a/clientmedia.go +++ b/clientmedia.go @@ -170,22 +170,16 @@ func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error { return err } - cm.c.writeMutex.RLock() - defer cm.c.writeMutex.RUnlock() + select { + case <-cm.c.done: + return cm.c.closeError + default: + } - ok := cm.c.writer.queue(func() { + cm.c.writer.queue(func() { cm.writePacketRTCPInQueue(byts) }) - if !ok { - select { - case <-cm.c.done: - return cm.c.closeError - default: - return nil - } - } - return nil } diff --git a/clienttrack.go b/clienttrack.go index 89f2ada8..602ec909 100644 --- a/clienttrack.go +++ b/clienttrack.go @@ -67,7 +67,6 @@ func (ct *clientFormat) stop() { if ct.rtcpSender != nil { ct.rtcpSender.Close() - ct.rtcpSender = nil } } @@ -79,22 +78,16 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er } byts = byts[:n] - ct.c.writeMutex.RLock() - defer ct.c.writeMutex.RUnlock() + select { + case <-ct.c.done: + return ct.c.closeError + default: + } - ok := ct.c.writer.queue(func() { + ct.c.writer.queue(func() { ct.cm.writePacketRTPInQueue(byts) }) - if !ok { - select { - case <-ct.c.done: - return ct.c.closeError - default: - return nil - } - } - ct.rtcpSender.ProcessPacket(pkt, ntp, ct.format.PTSEqualsDTS(pkt)) return nil } diff --git a/clientwriter.go b/clientwriter.go deleted file mode 100644 index a6723ade..00000000 --- a/clientwriter.go +++ /dev/null @@ -1,60 +0,0 @@ -package gortsplib - -import ( - "github.com/aler9/gortsplib/v2/pkg/ringbuffer" -) - -// this struct contains a queue that allows to detach the routine that is reading a stream -// from the routine that is writing a stream. -type clientWriter struct { - allowWriting bool - buffer *ringbuffer.RingBuffer - - done chan struct{} -} - -func (cw *clientWriter) start(c *Client) { - if c.state == clientStatePlay { - // when reading, buffer is only used to send RTCP receiver reports, - // that are much smaller than RTP packets and are sent at a fixed interval. - // decrease RAM consumption by allocating less buffers. - cw.buffer, _ = ringbuffer.New(8) - } else { - cw.buffer, _ = ringbuffer.New(uint64(c.WriteBufferCount)) - } - - cw.done = make(chan struct{}) - go cw.run() - - cw.allowWriting = true -} - -func (cw *clientWriter) stop() { - cw.allowWriting = false - - cw.buffer.Close() - <-cw.done - cw.buffer = nil -} - -func (cw *clientWriter) run() { - defer close(cw.done) - - for { - tmp, ok := cw.buffer.Pull() - if !ok { - return - } - - tmp.(func())() - } -} - -func (cw *clientWriter) queue(cb func()) bool { - if !cw.allowWriting { - return false - } - - cw.buffer.Push(cb) - return true -} diff --git a/examples/client-publish-format-vp8/main.go b/examples/client-publish-format-vp8/main.go index 27a95fb1..84c0b018 100644 --- a/examples/client-publish-format-vp8/main.go +++ b/examples/client-publish-format-vp8/main.go @@ -12,7 +12,7 @@ import ( // This example shows how to // 1. generate RTP/VP8 packets with GStreamer -// 2. connect to a RTSP server, announce an VP8 media +// 2. connect to a RTSP server, announce a VP8 media // 3. route the packets from GStreamer to the server func main() { diff --git a/examples/client-publish-format-vp9/main.go b/examples/client-publish-format-vp9/main.go index 5ad8798c..b720540b 100644 --- a/examples/client-publish-format-vp9/main.go +++ b/examples/client-publish-format-vp9/main.go @@ -12,7 +12,7 @@ import ( // This example shows how to // 1. generate RTP/VP9 packets with GStreamer -// 2. connect to a RTSP server, announce an VP9 media +// 2. connect to a RTSP server, announce a VP9 media // 3. route the packets from GStreamer to the server func main() { diff --git a/examples/client-read-format-vp8/main.go b/examples/client-read-format-vp8/main.go index 8a2f0dd4..08331a80 100644 --- a/examples/client-read-format-vp8/main.go +++ b/examples/client-read-format-vp8/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an VP8 media +// 2. check if there's a VP8 media // 3. get access units of that media func main() { diff --git a/examples/client-read-format-vp9/main.go b/examples/client-read-format-vp9/main.go index e19eebd4..ad04a38f 100644 --- a/examples/client-read-format-vp9/main.go +++ b/examples/client-read-format-vp9/main.go @@ -11,7 +11,7 @@ import ( // This example shows how to // 1. connect to a RTSP server -// 2. check if there's an VP9 media +// 2. check if there's a VP9 media // 3. get access units of that media func main() { diff --git a/serversession.go b/serversession.go index ba08be14..9322db6f 100644 --- a/serversession.go +++ b/serversession.go @@ -17,7 +17,6 @@ import ( "github.com/aler9/gortsplib/v2/pkg/headers" "github.com/aler9/gortsplib/v2/pkg/liberrors" "github.com/aler9/gortsplib/v2/pkg/media" - "github.com/aler9/gortsplib/v2/pkg/ringbuffer" "github.com/aler9/gortsplib/v2/pkg/sdp" "github.com/aler9/gortsplib/v2/pkg/url" ) @@ -173,7 +172,7 @@ type ServerSession struct { announcedMedias media.Medias // publish udpLastPacketTime *int64 // publish udpCheckStreamTimer *time.Timer - writer serverWriter + writer writer rtpPacketBuffer *rtpPacketMultiBuffer // in @@ -826,7 +825,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // inside the callback. if ss.state != ServerSessionStatePlay && *ss.setuppedTransport != TransportUDPMulticast { - ss.writer.buffer, _ = ringbuffer.New(uint64(ss.s.WriteBufferCount)) + ss.writer.allocateBuffer(ss.s.WriteBufferCount) } res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{ @@ -923,7 +922,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base // when recording, writeBuffer is only used to send RTCP receiver reports, // that are much smaller than RTP packets and are sent at a fixed interval. // decrease RAM consumption by allocating less buffers. - ss.writer.buffer, _ = ringbuffer.New(uint64(8)) + ss.writer.allocateBuffer(8) res, err := ss.s.Handler.(ServerHandlerOnRecord).OnRecord(&ServerHandlerOnRecordCtx{ Session: ss, diff --git a/serverwriter.go b/serverwriter.go deleted file mode 100644 index 8417280d..00000000 --- a/serverwriter.go +++ /dev/null @@ -1,45 +0,0 @@ -package gortsplib - -import ( - "github.com/aler9/gortsplib/v2/pkg/ringbuffer" -) - -type serverWriter struct { - running bool - buffer *ringbuffer.RingBuffer - - done chan struct{} -} - -func (sw *serverWriter) start() { - if !sw.running { - sw.running = true - sw.done = make(chan struct{}) - go sw.run() - } -} - -func (sw *serverWriter) stop() { - if sw.running { - sw.buffer.Close() - <-sw.done - sw.running = false - } -} - -func (sw *serverWriter) run() { - defer close(sw.done) - - for { - tmp, ok := sw.buffer.Pull() - if !ok { - return - } - - tmp.(func())() - } -} - -func (sw *serverWriter) queue(cb func()) { - sw.buffer.Push(cb) -} diff --git a/writer.go b/writer.go new file mode 100644 index 00000000..45480355 --- /dev/null +++ b/writer.go @@ -0,0 +1,49 @@ +package gortsplib + +import ( + "github.com/aler9/gortsplib/v2/pkg/ringbuffer" +) + +// this struct contains a queue that allows to detach the routine that is reading a stream +// from the routine that is writing a stream. +type writer struct { + running bool + buffer *ringbuffer.RingBuffer + + done chan struct{} +} + +func (w *writer) allocateBuffer(size int) { + w.buffer, _ = ringbuffer.New(uint64(size)) +} + +func (w *writer) start() { + w.running = true + w.done = make(chan struct{}) + go w.run() +} + +func (w *writer) stop() { + if w.running { + w.buffer.Close() + <-w.done + w.running = false + } +} + +func (w *writer) run() { + defer close(w.done) + + for { + tmp, ok := w.buffer.Pull() + if !ok { + return + } + + tmp.(func())() + } +} + +func (w *writer) queue(cb func()) { + w.buffer.Push(cb) +}