Update rtmp server, use DurationQueue pubsub

This commit is contained in:
Ingo Oppermann
2025-08-18 21:35:11 +02:00
parent c960227f1b
commit f8cf04ee20
7 changed files with 251 additions and 76 deletions

2
go.mod
View File

@@ -12,7 +12,7 @@ require (
github.com/atrox/haikunatorgo/v2 v2.0.1
github.com/caddyserver/certmagic v0.23.0
github.com/datarhei/gosrt v0.9.0
github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439
github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363
github.com/dolthub/swiss v0.2.1
github.com/fujiwara/shapeio v1.0.0
github.com/go-playground/validator/v10 v10.27.0

6
go.sum
View File

@@ -50,8 +50,10 @@ github.com/cpuguy83/go-md2man/v2 v2.0.7 h1:zbFlGlXEAKlwXpmvle3d8Oe3YnkKIK4xSRTd3
github.com/cpuguy83/go-md2man/v2 v2.0.7/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g=
github.com/datarhei/gosrt v0.9.0 h1:FW8A+F8tBiv7eIa57EBHjtTJKFX+OjvLogF/tFXoOiA=
github.com/datarhei/gosrt v0.9.0/go.mod h1:rqTRK8sDZdN2YBgp1EEICSV4297mQk0oglwvpXhaWdk=
github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439 h1:8HMGWtglP1I23Qd4PLLeDthkLY2Ye67ePW5zY61v+Hs=
github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw=
github.com/datarhei/joy4 v0.0.0-20250818192407-7b9c68697ad4 h1:coBWFnjU8DZpG4RHCCz2fQcdfuEVUfyeL7RHVXVflnM=
github.com/datarhei/joy4 v0.0.0-20250818192407-7b9c68697ad4/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw=
github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363 h1:tHC7HzHIgs4I+YTCs0IkIJjP1lmygtSBNr73DiNrY3I=
github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363/go.mod h1:Jcw/6jZDQQmPx8A7INEkXmuEF7E9jjBbSTfVSLwmiQw=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=

View File

@@ -72,7 +72,7 @@ func (c *client) Close() {
// channel represents a stream that is sent to the server
type channel struct {
// The packet queue for the stream
queue *pubsub.Queue
queue *pubsub.DurationQueue
// The metadata of the stream
streams []av.CodecData
@@ -102,7 +102,7 @@ func newChannel(conn connection, playPath, reference string, remote net.Addr, st
subscriber: make(map[string]*client),
collector: collector,
streams: streams,
queue: pubsub.NewQueue(),
queue: pubsub.NewDurationQueue(),
isProxy: isProxy,
}

View File

@@ -0,0 +1,174 @@
// Packege pubsub implements publisher-subscribers model used in multi-channel streaming.
package pubsub
import (
"io"
"sync"
"time"
"github.com/datarhei/joy4/av"
"github.com/datarhei/joy4/av/pktque"
)
// time
// ----------------->
//
// V-A-V-V-A-V-V-A-V-V
// | |
// 0 5 10
// head tail
// oldest latest
//
// One publisher and multiple subscribers thread-safe packet buffer queue.
type DurationQueue struct {
buf *pktque.Buf
lock *sync.RWMutex
cond *sync.Cond
mintime, maxtime, targettime time.Duration
streams []av.CodecData
closed bool
}
func NewDurationQueue() *DurationQueue {
q := &DurationQueue{}
q.buf = pktque.NewBuf()
q.targettime = 2 * time.Second
q.lock = &sync.RWMutex{}
q.cond = sync.NewCond(q.lock.RLocker())
return q
}
func (q *DurationQueue) SetTargetTime(t time.Duration) {
q.lock.Lock()
q.targettime = t
q.lock.Unlock()
}
func (q *DurationQueue) WriteHeader(streams []av.CodecData) error {
q.lock.Lock()
q.streams = streams
q.cond.Broadcast()
q.lock.Unlock()
return nil
}
func (q *DurationQueue) WriteTrailer() error {
return nil
}
// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
func (q *DurationQueue) Close() (err error) {
q.lock.Lock()
q.closed = true
q.cond.Broadcast()
q.lock.Unlock()
return
}
// Put packet into buffer, old packets will be discared.
func (q *DurationQueue) WritePacket(pkt av.Packet) (err error) {
q.lock.Lock()
q.buf.Push(pkt)
if q.buf.Count == 0 {
q.mintime = pkt.Time
}
q.maxtime = pkt.Time
for q.maxtime-q.mintime > q.targettime && q.buf.Count > 1 {
pkt := q.buf.Pop()
q.mintime = pkt.Time
if q.maxtime-q.mintime <= q.targettime {
break
}
}
q.cond.Broadcast()
q.lock.Unlock()
return
}
type DurationQueueCursor struct {
que *DurationQueue
pos pktque.BufPos
gotpos bool
init func(buf *pktque.Buf) pktque.BufPos
}
func (q *DurationQueue) newCursor() *DurationQueueCursor {
return &DurationQueueCursor{
que: q,
}
}
// Create cursor position at latest packet.
func (q *DurationQueue) Latest() *DurationQueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf) pktque.BufPos {
return buf.Tail
}
return cursor
}
// Create cursor position at oldest buffered packet.
func (q *DurationQueue) Oldest() *DurationQueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf) pktque.BufPos {
return buf.Head
}
return cursor
}
func (qc *DurationQueueCursor) Streams() (streams []av.CodecData, err error) {
qc.que.cond.L.Lock()
for qc.que.streams == nil && !qc.que.closed {
qc.que.cond.Wait()
}
if qc.que.streams != nil {
streams = qc.que.streams
} else {
err = io.EOF
}
qc.que.cond.L.Unlock()
return
}
// ReadPacket will not consume packets in Queue, it's just a cursor.
func (qc *DurationQueueCursor) ReadPacket() (pkt av.Packet, err error) {
qc.que.cond.L.Lock()
buf := qc.que.buf
if !qc.gotpos {
qc.pos = qc.init(buf)
qc.gotpos = true
}
for {
if qc.pos.LT(buf.Head) {
qc.pos = buf.Head
} else if qc.pos.GT(buf.Tail) {
qc.pos = buf.Tail
}
if buf.IsValidPos(qc.pos) {
pkt = buf.Get(qc.pos)
qc.pos++
break
}
if qc.que.closed {
err = io.EOF
break
}
qc.que.cond.Wait()
}
qc.que.cond.L.Unlock()
return
}

View File

@@ -23,7 +23,6 @@ import (
// One publisher and multiple subscribers thread-safe packet buffer queue.
type Queue struct {
buf *pktque.Buf
head, tail int
lock *sync.RWMutex
cond *sync.Cond
curgopcount, maxgopcount int
@@ -42,82 +41,82 @@ func NewQueue() *Queue {
return q
}
func (self *Queue) SetMaxGopCount(n int) {
self.lock.Lock()
self.maxgopcount = n
self.lock.Unlock()
func (q *Queue) SetMaxGopCount(n int) {
q.lock.Lock()
q.maxgopcount = n
q.lock.Unlock()
}
func (self *Queue) WriteHeader(streams []av.CodecData) error {
self.lock.Lock()
func (q *Queue) WriteHeader(streams []av.CodecData) error {
q.lock.Lock()
self.streams = streams
q.streams = streams
for i, stream := range streams {
if stream.Type().IsVideo() {
self.videoidx = i
q.videoidx = i
}
}
self.cond.Broadcast()
q.cond.Broadcast()
self.lock.Unlock()
q.lock.Unlock()
return nil
}
func (self *Queue) WriteTrailer() error {
func (q *Queue) WriteTrailer() error {
return nil
}
// After Close() called, all QueueCursor's ReadPacket will return io.EOF.
func (self *Queue) Close() (err error) {
self.lock.Lock()
func (q *Queue) Close() (err error) {
q.lock.Lock()
self.closed = true
self.cond.Broadcast()
q.closed = true
q.cond.Broadcast()
self.lock.Unlock()
q.lock.Unlock()
return
}
// Put packet into buffer, old packets will be discared.
func (self *Queue) WritePacket(pkt av.Packet) (err error) {
self.lock.Lock()
func (q *Queue) WritePacket(pkt av.Packet) (err error) {
q.lock.Lock()
self.buf.Push(pkt)
q.buf.Push(pkt)
if self.videoidx == -1 { // audio only stream
if q.videoidx == -1 { // audio only stream
if pkt.IsKeyFrame {
self.curgopcount++
q.curgopcount++
}
for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 {
pkt := self.buf.Pop()
for q.curgopcount >= q.maxgopcount && q.buf.Count > 1 {
pkt := q.buf.Pop()
if pkt.IsKeyFrame {
self.curgopcount--
q.curgopcount--
}
if self.curgopcount < self.maxgopcount {
if q.curgopcount < q.maxgopcount {
break
}
}
} else { // video only or video+audio stream
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
self.curgopcount++
if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame {
q.curgopcount++
}
for self.curgopcount >= self.maxgopcount && self.buf.Count > 1 {
pkt := self.buf.Pop()
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
self.curgopcount--
for q.curgopcount >= q.maxgopcount && q.buf.Count > 1 {
pkt := q.buf.Pop()
if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame {
q.curgopcount--
}
if self.curgopcount < self.maxgopcount {
if q.curgopcount < q.maxgopcount {
break
}
}
}
self.cond.Broadcast()
q.cond.Broadcast()
self.lock.Unlock()
q.lock.Unlock()
return
}
@@ -128,15 +127,15 @@ type QueueCursor struct {
init func(buf *pktque.Buf, videoidx int) pktque.BufPos
}
func (self *Queue) newCursor() *QueueCursor {
func (q *Queue) newCursor() *QueueCursor {
return &QueueCursor{
que: self,
que: q,
}
}
// Create cursor position at latest packet.
func (self *Queue) Latest() *QueueCursor {
cursor := self.newCursor()
func (q *Queue) Latest() *QueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
return buf.Tail
}
@@ -144,8 +143,8 @@ func (self *Queue) Latest() *QueueCursor {
}
// Create cursor position at oldest buffered packet.
func (self *Queue) Oldest() *QueueCursor {
cursor := self.newCursor()
func (q *Queue) Oldest() *QueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
return buf.Head
}
@@ -153,8 +152,8 @@ func (self *Queue) Oldest() *QueueCursor {
}
// Create cursor position at specific time in buffered packets.
func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
cursor := self.newCursor()
func (q *Queue) DelayedTime(dur time.Duration) *QueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := buf.Tail - 1
if buf.IsValidPos(i) {
@@ -172,14 +171,14 @@ func (self *Queue) DelayedTime(dur time.Duration) *QueueCursor {
}
// Create cursor position at specific delayed GOP count in buffered packets.
func (self *Queue) DelayedGopCount(n int) *QueueCursor {
cursor := self.newCursor()
func (q *Queue) DelayedGopCount(n int) *QueueCursor {
cursor := q.newCursor()
cursor.init = func(buf *pktque.Buf, videoidx int) pktque.BufPos {
i := buf.Tail - 1
if videoidx != -1 {
for gop := 0; buf.IsValidPos(i) && gop < n; i-- {
pkt := buf.Get(i)
if pkt.Idx == int8(self.videoidx) && pkt.IsKeyFrame {
if pkt.Idx == int8(q.videoidx) && pkt.IsKeyFrame {
gop++
}
}
@@ -189,45 +188,45 @@ func (self *Queue) DelayedGopCount(n int) *QueueCursor {
return cursor
}
func (self *QueueCursor) Streams() (streams []av.CodecData, err error) {
self.que.cond.L.Lock()
for self.que.streams == nil && !self.que.closed {
self.que.cond.Wait()
func (qc *QueueCursor) Streams() (streams []av.CodecData, err error) {
qc.que.cond.L.Lock()
for qc.que.streams == nil && !qc.que.closed {
qc.que.cond.Wait()
}
if self.que.streams != nil {
streams = self.que.streams
if qc.que.streams != nil {
streams = qc.que.streams
} else {
err = io.EOF
}
self.que.cond.L.Unlock()
qc.que.cond.L.Unlock()
return
}
// ReadPacket will not consume packets in Queue, it's just a cursor.
func (self *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
self.que.cond.L.Lock()
buf := self.que.buf
if !self.gotpos {
self.pos = self.init(buf, self.que.videoidx)
self.gotpos = true
func (qc *QueueCursor) ReadPacket() (pkt av.Packet, err error) {
qc.que.cond.L.Lock()
buf := qc.que.buf
if !qc.gotpos {
qc.pos = qc.init(buf, qc.que.videoidx)
qc.gotpos = true
}
for {
if self.pos.LT(buf.Head) {
self.pos = buf.Head
} else if self.pos.GT(buf.Tail) {
self.pos = buf.Tail
if qc.pos.LT(buf.Head) {
qc.pos = buf.Head
} else if qc.pos.GT(buf.Tail) {
qc.pos = buf.Tail
}
if buf.IsValidPos(self.pos) {
pkt = buf.Get(self.pos)
self.pos++
if buf.IsValidPos(qc.pos) {
pkt = buf.Get(qc.pos)
qc.pos++
break
}
if self.que.closed {
if qc.que.closed {
err = io.EOF
break
}
self.que.cond.Wait()
qc.que.cond.Wait()
}
self.que.cond.L.Unlock()
qc.que.cond.L.Unlock()
return
}

View File

@@ -1102,7 +1102,7 @@ func (conn *Conn) WritePacket(pkt av.Packet) (err error) {
fmt.Println("rtmp: WritePacket", pkt.Idx, pkt.Time, pkt.CompositionTime)
}
if err = conn.writeAVTag(tag, int32(timestamp)); err != nil {
if err = conn.writeAVTag(tag, timestamp); err != nil {
return
}

2
vendor/modules.txt vendored
View File

@@ -78,7 +78,7 @@ github.com/datarhei/gosrt/crypto
github.com/datarhei/gosrt/net
github.com/datarhei/gosrt/packet
github.com/datarhei/gosrt/rand
# github.com/datarhei/joy4 v0.0.0-20250806135534-393d6bb11439
# github.com/datarhei/joy4 v0.0.0-20250818192923-6dc77ee81363
## explicit; go 1.14
github.com/datarhei/joy4/av
github.com/datarhei/joy4/av/avutil