merge serverWriter and clientWriter

This commit is contained in:
aler9
2022-12-11 22:54:16 +01:00
parent a1396206b5
commit 256877086b
11 changed files with 80 additions and 147 deletions

View File

@@ -13,7 +13,6 @@ import (
"net" "net"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@@ -243,8 +242,7 @@ type Client struct {
tcpLastFrameTime *int64 tcpLastFrameTime *int64
keepaliveTimer *time.Timer keepaliveTimer *time.Timer
closeError error closeError error
writer clientWriter writer writer
writeMutex sync.RWMutex
// connCloser channels // connCloser channels
connCloserTerminate chan struct{} connCloserTerminate chan struct{}
@@ -649,14 +647,21 @@ func (c *Client) playRecordStart() {
} }
} }
if c.state == clientStatePlay {
// when reading, buffer is only used to send RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers.
c.writer.allocateBuffer(8)
} else {
c.writer.allocateBuffer(c.WriteBufferCount)
}
c.writer.start()
for _, cm := range c.medias { for _, cm := range c.medias {
cm.start() cm.start()
} }
c.writeMutex.Lock()
c.writer.start(c)
c.writeMutex.Unlock()
// for some reason, SetReadDeadline() must always be called in the same // for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes. // goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform a check with a ticker. // therefore, we disable the deadline and perform a check with a ticker.
@@ -721,9 +726,7 @@ func (c *Client) playRecordStop(isClosing bool) {
c.checkStreamTimer = emptyTimer() c.checkStreamTimer = emptyTimer()
c.keepaliveTimer = emptyTimer() c.keepaliveTimer = emptyTimer()
c.writeMutex.Lock()
c.writer.stop() c.writer.stop()
c.writeMutex.Unlock()
for _, cm := range c.medias { for _, cm := range c.medias {
cm.stop() cm.stop()

View File

@@ -170,22 +170,16 @@ func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error {
return err return err
} }
cm.c.writeMutex.RLock() select {
defer cm.c.writeMutex.RUnlock() case <-cm.c.done:
return cm.c.closeError
default:
}
ok := cm.c.writer.queue(func() { cm.c.writer.queue(func() {
cm.writePacketRTCPInQueue(byts) cm.writePacketRTCPInQueue(byts)
}) })
if !ok {
select {
case <-cm.c.done:
return cm.c.closeError
default:
return nil
}
}
return nil return nil
} }

View File

@@ -67,7 +67,6 @@ func (ct *clientFormat) stop() {
if ct.rtcpSender != nil { if ct.rtcpSender != nil {
ct.rtcpSender.Close() ct.rtcpSender.Close()
ct.rtcpSender = nil
} }
} }
@@ -79,22 +78,16 @@ func (ct *clientFormat) writePacketRTPWithNTP(pkt *rtp.Packet, ntp time.Time) er
} }
byts = byts[:n] byts = byts[:n]
ct.c.writeMutex.RLock() select {
defer ct.c.writeMutex.RUnlock() case <-ct.c.done:
return ct.c.closeError
default:
}
ok := ct.c.writer.queue(func() { ct.c.writer.queue(func() {
ct.cm.writePacketRTPInQueue(byts) ct.cm.writePacketRTPInQueue(byts)
}) })
if !ok {
select {
case <-ct.c.done:
return ct.c.closeError
default:
return nil
}
}
ct.rtcpSender.ProcessPacket(pkt, ntp, ct.format.PTSEqualsDTS(pkt)) ct.rtcpSender.ProcessPacket(pkt, ntp, ct.format.PTSEqualsDTS(pkt))
return nil return nil
} }

View File

@@ -1,60 +0,0 @@
package gortsplib
import (
"github.com/aler9/gortsplib/v2/pkg/ringbuffer"
)
// this struct contains a queue that allows to detach the routine that is reading a stream
// from the routine that is writing a stream.
type clientWriter struct {
allowWriting bool
buffer *ringbuffer.RingBuffer
done chan struct{}
}
func (cw *clientWriter) start(c *Client) {
if c.state == clientStatePlay {
// when reading, buffer is only used to send RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers.
cw.buffer, _ = ringbuffer.New(8)
} else {
cw.buffer, _ = ringbuffer.New(uint64(c.WriteBufferCount))
}
cw.done = make(chan struct{})
go cw.run()
cw.allowWriting = true
}
func (cw *clientWriter) stop() {
cw.allowWriting = false
cw.buffer.Close()
<-cw.done
cw.buffer = nil
}
func (cw *clientWriter) run() {
defer close(cw.done)
for {
tmp, ok := cw.buffer.Pull()
if !ok {
return
}
tmp.(func())()
}
}
func (cw *clientWriter) queue(cb func()) bool {
if !cw.allowWriting {
return false
}
cw.buffer.Push(cb)
return true
}

View File

@@ -12,7 +12,7 @@ import (
// This example shows how to // This example shows how to
// 1. generate RTP/VP8 packets with GStreamer // 1. generate RTP/VP8 packets with GStreamer
// 2. connect to a RTSP server, announce an VP8 media // 2. connect to a RTSP server, announce a VP8 media
// 3. route the packets from GStreamer to the server // 3. route the packets from GStreamer to the server
func main() { func main() {

View File

@@ -12,7 +12,7 @@ import (
// This example shows how to // This example shows how to
// 1. generate RTP/VP9 packets with GStreamer // 1. generate RTP/VP9 packets with GStreamer
// 2. connect to a RTSP server, announce an VP9 media // 2. connect to a RTSP server, announce a VP9 media
// 3. route the packets from GStreamer to the server // 3. route the packets from GStreamer to the server
func main() { func main() {

View File

@@ -11,7 +11,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an VP8 media // 2. check if there's a VP8 media
// 3. get access units of that media // 3. get access units of that media
func main() { func main() {

View File

@@ -11,7 +11,7 @@ import (
// This example shows how to // This example shows how to
// 1. connect to a RTSP server // 1. connect to a RTSP server
// 2. check if there's an VP9 media // 2. check if there's a VP9 media
// 3. get access units of that media // 3. get access units of that media
func main() { func main() {

View File

@@ -17,7 +17,6 @@ import (
"github.com/aler9/gortsplib/v2/pkg/headers" "github.com/aler9/gortsplib/v2/pkg/headers"
"github.com/aler9/gortsplib/v2/pkg/liberrors" "github.com/aler9/gortsplib/v2/pkg/liberrors"
"github.com/aler9/gortsplib/v2/pkg/media" "github.com/aler9/gortsplib/v2/pkg/media"
"github.com/aler9/gortsplib/v2/pkg/ringbuffer"
"github.com/aler9/gortsplib/v2/pkg/sdp" "github.com/aler9/gortsplib/v2/pkg/sdp"
"github.com/aler9/gortsplib/v2/pkg/url" "github.com/aler9/gortsplib/v2/pkg/url"
) )
@@ -173,7 +172,7 @@ type ServerSession struct {
announcedMedias media.Medias // publish announcedMedias media.Medias // publish
udpLastPacketTime *int64 // publish udpLastPacketTime *int64 // publish
udpCheckStreamTimer *time.Timer udpCheckStreamTimer *time.Timer
writer serverWriter writer writer
rtpPacketBuffer *rtpPacketMultiBuffer rtpPacketBuffer *rtpPacketMultiBuffer
// in // in
@@ -826,7 +825,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
// inside the callback. // inside the callback.
if ss.state != ServerSessionStatePlay && if ss.state != ServerSessionStatePlay &&
*ss.setuppedTransport != TransportUDPMulticast { *ss.setuppedTransport != TransportUDPMulticast {
ss.writer.buffer, _ = ringbuffer.New(uint64(ss.s.WriteBufferCount)) ss.writer.allocateBuffer(ss.s.WriteBufferCount)
} }
res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{ res, err := sc.s.Handler.(ServerHandlerOnPlay).OnPlay(&ServerHandlerOnPlayCtx{
@@ -923,7 +922,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
// when recording, writeBuffer is only used to send RTCP receiver reports, // when recording, writeBuffer is only used to send RTCP receiver reports,
// that are much smaller than RTP packets and are sent at a fixed interval. // that are much smaller than RTP packets and are sent at a fixed interval.
// decrease RAM consumption by allocating less buffers. // decrease RAM consumption by allocating less buffers.
ss.writer.buffer, _ = ringbuffer.New(uint64(8)) ss.writer.allocateBuffer(8)
res, err := ss.s.Handler.(ServerHandlerOnRecord).OnRecord(&ServerHandlerOnRecordCtx{ res, err := ss.s.Handler.(ServerHandlerOnRecord).OnRecord(&ServerHandlerOnRecordCtx{
Session: ss, Session: ss,

View File

@@ -1,45 +0,0 @@
package gortsplib
import (
"github.com/aler9/gortsplib/v2/pkg/ringbuffer"
)
type serverWriter struct {
running bool
buffer *ringbuffer.RingBuffer
done chan struct{}
}
func (sw *serverWriter) start() {
if !sw.running {
sw.running = true
sw.done = make(chan struct{})
go sw.run()
}
}
func (sw *serverWriter) stop() {
if sw.running {
sw.buffer.Close()
<-sw.done
sw.running = false
}
}
func (sw *serverWriter) run() {
defer close(sw.done)
for {
tmp, ok := sw.buffer.Pull()
if !ok {
return
}
tmp.(func())()
}
}
func (sw *serverWriter) queue(cb func()) {
sw.buffer.Push(cb)
}

49
writer.go Normal file
View File

@@ -0,0 +1,49 @@
package gortsplib
import (
"github.com/aler9/gortsplib/v2/pkg/ringbuffer"
)
// this struct contains a queue that allows to detach the routine that is reading a stream
// from the routine that is writing a stream.
type writer struct {
running bool
buffer *ringbuffer.RingBuffer
done chan struct{}
}
func (w *writer) allocateBuffer(size int) {
w.buffer, _ = ringbuffer.New(uint64(size))
}
func (w *writer) start() {
w.running = true
w.done = make(chan struct{})
go w.run()
}
func (w *writer) stop() {
if w.running {
w.buffer.Close()
<-w.done
w.running = false
}
}
func (w *writer) run() {
defer close(w.done)
for {
tmp, ok := w.buffer.Pull()
if !ok {
return
}
tmp.(func())()
}
}
func (w *writer) queue(cb func()) {
w.buffer.Push(cb)
}