diff --git a/go.mod b/go.mod index 7ed5cb0a..2ca32f5f 100644 --- a/go.mod +++ b/go.mod @@ -12,7 +12,7 @@ require ( github.com/atrox/haikunatorgo/v2 v2.0.1 github.com/caddyserver/certmagic v0.23.0 github.com/datarhei/gosrt v0.9.0 - github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439 + github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363 github.com/dolthub/swiss v0.2.1 github.com/fujiwara/shapeio v1.0.0 github.com/go-playground/validator/v10 v10.27.0 diff --git a/go.sum b/go.sum index 493d07dd..a213112d 100644 --- a/go.sum +++ b/go.sum @@ -50,8 +50,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3 github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/datarhei/gosrt v0.9.0 h1:FW8A+F8tBiv7eIa57EBHjtTJKFX+OjvLogF/tFXoOiA= github.com/datarhei/gosrt v0.9.0/go.mod h1:rqTRK8sDZdN2YBgp1EEICSV4297mQk0oglwvpXhaWdk= -github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439 h1:8HMGWtglP1I23Qd4PLLeDthkLY2Ye67ePW5zY61v+Hs= -github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= +github.com/datarhei/joy4 v0.0.0-20250818192407-7b9c68697ad4 h1:coBWFnjU8DZpG4RHCCz2fQcdfuEVUfyeL7RHVXVflnM= +github.com/datarhei/joy4 v0.0.0-20250818192407-7b9c68697ad4/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= +github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363 h1:tHC7HzHIgs4I+YTCs0IkIJjP1lmygtSBNr73DiNrY3I= +github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= diff --git a/rtmp/channel.go b/rtmp/channel.go index fa34448e..7a1123b3 100644 --- a/rtmp/channel.go +++ b/rtmp/channel.go @@ -72,7 +72,7 @@ func (c *client) Close() { // channel represents a stream that is sent to the server type channel struct { // The packet queue for the stream - queue *pubsub.Queue + queue *pubsub.DurationQueue // The metadata of the stream streams []av.CodecData @@ -102,7 +102,7 @@ func newChannel(conn connection, playPath, reference string, remote net.Addr, st subscriber: make(map[string]*client), collector: collector, streams: streams, - queue: pubsub.NewQueue(), + queue: pubsub.NewDurationQueue(), isProxy: isProxy, } diff --git a/vendor/github.com/datarhei/joy4/av/pubsub/durationqueue.go b/vendor/github.com/datarhei/joy4/av/pubsub/durationqueue.go new file mode 100644 index 00000000..8bebc64c --- /dev/null +++ b/vendor/github.com/datarhei/joy4/av/pubsub/durationqueue.go @@ -0,0 +1,174 @@ +// Packege pubsub implements publisher-subscribers model used in multi-channel streaming. +package pubsub + +import ( + "io" + "sync" + "time" + + "github.com/datarhei/joy4/av" + "github.com/datarhei/joy4/av/pktque" +) + +// time +// -----------------> +// +// V-A-V-V-A-V-V-A-V-V +// | | +// 0 5 10 +// head tail +// oldest latest +// + +// One publisher and multiple subscribers thread-safe packet buffer queue. +type DurationQueue struct { + buf *pktque.Buf + lock *sync.RWMutex + cond *sync.Cond + mintime, maxtime, targettime time.Duration + streams []av.CodecData + closed bool +} + +func NewDurationQueue() *DurationQueue { + q := &DurationQueue{} + q.buf = pktque.NewBuf() + q.targettime = 2 * time.Second + q.lock = &sync.RWMutex{} + q.cond = sync.NewCond(q.lock.RLocker()) + return q +} + +func (q *DurationQueue) SetTargetTime(t time.Duration) { + q.lock.Lock() + q.targettime = t + q.lock.Unlock() +} + +func (q *DurationQueue) WriteHeader(streams []av.CodecData) error { + q.lock.Lock() + + q.streams = streams + + q.cond.Broadcast() + + q.lock.Unlock() + + return nil +} + +func (q *DurationQueue) WriteTrailer() error { + return nil +} + +// After Close() called, all QueueCursor's ReadPacket will return io.EOF. +func (q *DurationQueue) Close() (err error) { + q.lock.Lock() + + q.closed = true + q.cond.Broadcast() + + q.lock.Unlock() + return +} + +// Put packet into buffer, old packets will be discared. +func (q *DurationQueue) WritePacket(pkt av.Packet) (err error) { + q.lock.Lock() + + q.buf.Push(pkt) + + if q.buf.Count == 0 { + q.mintime = pkt.Time + } + + q.maxtime = pkt.Time + + for q.maxtime-q.mintime > q.targettime && q.buf.Count > 1 { + pkt := q.buf.Pop() + q.mintime = pkt.Time + + if q.maxtime-q.mintime <= q.targettime { + break + } + } + + q.cond.Broadcast() + + q.lock.Unlock() + return +} + +type DurationQueueCursor struct { + que *DurationQueue + pos pktque.BufPos + gotpos bool + init func(buf *pktque.Buf) pktque.BufPos +} + +func (q *DurationQueue) newCursor() *DurationQueueCursor { + return &DurationQueueCursor{ + que: q, + } +} + +// Create cursor position at latest packet. +func (q *DurationQueue) Latest() *DurationQueueCursor { + cursor := q.newCursor() + cursor.init = func(buf *pktque.Buf) pktque.BufPos { + return buf.Tail + } + return cursor +} + +// Create cursor position at oldest buffered packet. +func (q *DurationQueue) Oldest() *DurationQueueCursor { + cursor := q.newCursor() + cursor.init = func(buf *pktque.Buf) pktque.BufPos { + return buf.Head + } + return cursor +} + +func (qc *DurationQueueCursor) Streams() (streams []av.CodecData, err error) { + qc.que.cond.L.Lock() + for qc.que.streams == nil && !qc.que.closed { + qc.que.cond.Wait() + } + if qc.que.streams != nil { + streams = qc.que.streams + } else { + err = io.EOF + } + qc.que.cond.L.Unlock() + return +} + +// ReadPacket will not consume packets in Queue, it's just a cursor. +func (qc *DurationQueueCursor) ReadPacket() (pkt av.Packet, err error) { + qc.que.cond.L.Lock() + buf := qc.que.buf + if !qc.gotpos { + qc.pos = qc.init(buf) + qc.gotpos = true + } + for { + if qc.pos.LT(buf.Head) { + qc.pos = buf.Head + } else if qc.pos.GT(buf.Tail) { + qc.pos = buf.Tail + } + if buf.IsValidPos(qc.pos) { + pkt = buf.Get(qc.pos) + qc.pos++ + break + } + if qc.que.closed { + err = io.EOF + break + } + qc.que.cond.Wait() + } + qc.que.cond.L.Unlock() + return +} diff --git a/vendor/github.com/datarhei/joy4/av/pubsub/queue.go b/vendor/github.com/datarhei/joy4/av/pubsub/qopqueue.go similarity index 53% rename from vendor/github.com/datarhei/joy4/av/pubsub/queue.go rename to vendor/github.com/datarhei/joy4/av/pubsub/qopqueue.go index 37510c78..862983b6 100644 --- a/vendor/github.com/datarhei/joy4/av/pubsub/queue.go +++ b/vendor/github.com/datarhei/joy4/av/pubsub/qopqueue.go @@ -23,7 +23,6 @@ import ( // One publisher and multiple subscribers thread-safe packet buffer queue. type Queue struct { buf *pktque.Buf - head, tail int lock *sync.RWMutex cond *sync.Cond curgopcount, maxgopcount int @@ -42,82 +41,82 @@ func NewQueue() *Queue { return q } -func (self *Queue) SetMaxGopCount(n int) { - self.lock.Lock() - self.maxgopcount = n - self.lock.Unlock() +func (q *Queue) SetMaxGopCount(n int) { + q.lock.Lock() + q.maxgopcount = n + q.lock.Unlock() } -func (self *Queue) WriteHeader(streams []av.CodecData) error { - self.lock.Lock() +func (q *Queue) WriteHeader(streams []av.CodecData) error { + q.lock.Lock() - self.streams = streams + q.streams = streams for i, stream := range streams { if stream.Type().IsVideo() { - self.videoidx = i + q.videoidx = i } } - self.cond.Broadcast() + q.cond.Broadcast() - self.lock.Unlock() + q.lock.Unlock() return nil } -func (self *Queue) WriteTrailer() error { +func (q *Queue) WriteTrailer() error { return nil } // After Close() called, all QueueCursor's ReadPacket will return io.EOF. -func (self *Queue) Close() (err error) { - self.lock.Lock() +func (q *Queue) Close() (err error) { + q.lock.Lock() - self.closed = true - self.cond.Broadcast() + q.closed = true + q.cond.Broadcast() - self.lock.Unlock() + q.lock.Unlock() return } // Put packet into buffer, old packets will be discared. -func (self *Queue) WritePacket(pkt av.Packet) (err error) { - self.lock.Lock() +func (q *Queue) WritePacket(pkt av.Packet) (err error) { + q.lock.Lock() - self.buf.Push(pkt) + q.buf.Push(pkt) - if self.videoidx == -1 { // audio only stream + if q.videoidx == -1 { // audio only stream if pkt.IsKeyFrame { - self.curgopcount++ + q.curgopcount++ } - for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 { - pkt := self.buf.Pop() + for q.curgopcount >= q.maxgopcount && q.buf.Count > 1 { + pkt := q.buf.Pop() if pkt.IsKeyFrame { - self.curgopcount-- + q.curgopcount-- } - if self.curgopcount < self.maxgopcount { + if q.curgopcount < q.maxgopcount { break } } } else { // video only or video+audio stream - if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { - self.curgopcount++ + if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame { + q.curgopcount++ } - for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 { - pkt := self.buf.Pop() - if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { - self.curgopcount-- + for q.curgopcount >= q.maxgopcount && q.buf.Count > 1 { + pkt := q.buf.Pop() + if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame { + q.curgopcount-- } - if self.curgopcount < self.maxgopcount { + if q.curgopcount < q.maxgopcount { break } } } - self.cond.Broadcast() + q.cond.Broadcast() - self.lock.Unlock() + q.lock.Unlock() return } @@ -128,15 +127,15 @@ type QueueCursor struct { init func(buf *pktque.Buf, videoidx int) pktque.BufPos } -func (self *Queue) newCursor() *QueueCursor { +func (q *Queue) newCursor() *QueueCursor { return &QueueCursor{ - que: self, + que: q, } } // Create cursor position at latest packet. -func (self *Queue) Latest() *QueueCursor { - cursor := self.newCursor() +func (q *Queue) Latest() *QueueCursor { + cursor := q.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { return buf.Tail } @@ -144,8 +143,8 @@ func (self *Queue) Latest() *QueueCursor { } // Create cursor position at oldest buffered packet. -func (self *Queue) Oldest() *QueueCursor { - cursor := self.newCursor() +func (q *Queue) Oldest() *QueueCursor { + cursor := q.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { return buf.Head } @@ -153,8 +152,8 @@ func (self *Queue) Oldest() *QueueCursor { } // Create cursor position at specific time in buffered packets. -func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { - cursor := self.newCursor() +func (q *Queue) DelayedTime(dur time.Duration) *QueueCursor { + cursor := q.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { i := buf.Tail - 1 if buf.IsValidPos(i) { @@ -172,14 +171,14 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor { } // Create cursor position at specific delayed GOP count in buffered packets. -func (self *Queue) DelayedGopCount(n int) *QueueCursor { - cursor := self.newCursor() +func (q *Queue) DelayedGopCount(n int) *QueueCursor { + cursor := q.newCursor() cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos { i := buf.Tail - 1 if videoidx != -1 { for gop := 0; buf.IsValidPos(i) && gop < n; i-- { pkt := buf.Get(i) - if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame { + if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame { gop++ } } @@ -189,45 +188,45 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor { return cursor } -func (self *QueueCursor) Streams() (streams []av.CodecData, err error) { - self.que.cond.L.Lock() - for self.que.streams == nil && !self.que.closed { - self.que.cond.Wait() +func (qc *QueueCursor) Streams() (streams []av.CodecData, err error) { + qc.que.cond.L.Lock() + for qc.que.streams == nil && !qc.que.closed { + qc.que.cond.Wait() } - if self.que.streams != nil { - streams = self.que.streams + if qc.que.streams != nil { + streams = qc.que.streams } else { err = io.EOF } - self.que.cond.L.Unlock() + qc.que.cond.L.Unlock() return } // ReadPacket will not consume packets in Queue, it's just a cursor. -func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) { - self.que.cond.L.Lock() - buf := self.que.buf - if !self.gotpos { - self.pos = self.init(buf, self.que.videoidx) - self.gotpos = true +func (qc *QueueCursor) ReadPacket() (pkt av.Packet, err error) { + qc.que.cond.L.Lock() + buf := qc.que.buf + if !qc.gotpos { + qc.pos = qc.init(buf, qc.que.videoidx) + qc.gotpos = true } for { - if self.pos.LT(buf.Head) { - self.pos = buf.Head - } else if self.pos.GT(buf.Tail) { - self.pos = buf.Tail + if qc.pos.LT(buf.Head) { + qc.pos = buf.Head + } else if qc.pos.GT(buf.Tail) { + qc.pos = buf.Tail } - if buf.IsValidPos(self.pos) { - pkt = buf.Get(self.pos) - self.pos++ + if buf.IsValidPos(qc.pos) { + pkt = buf.Get(qc.pos) + qc.pos++ break } - if self.que.closed { + if qc.que.closed { err = io.EOF break } - self.que.cond.Wait() + qc.que.cond.Wait() } - self.que.cond.L.Unlock() + qc.que.cond.L.Unlock() return } diff --git a/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go b/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go index 48f94285..ce6ae693 100644 --- a/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go +++ b/vendor/github.com/datarhei/joy4/format/rtmp/rtmp.go @@ -1102,7 +1102,7 @@ func (conn *Conn) WritePacket(pkt av.Packet) (err error) { fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime) } - if err = conn.writeAVTag(tag, int32(timestamp)); err != nil { + if err = conn.writeAVTag(tag, timestamp); err != nil { return } diff --git a/vendor/modules.txt b/vendor/modules.txt index 8e22e093..f00c6e5c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -78,7 +78,7 @@ github.com/datarhei/gosrt/crypto github.com/datarhei/gosrt/net github.com/datarhei/gosrt/packet github.com/datarhei/gosrt/rand -# github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439 +# github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363 ## explicit; go 1.14 github.com/datarhei/joy4/av github.com/datarhei/joy4/av/avutil