rtmp: fix compatibility with GStreamer rtmp2src / rtmp2sink (#1433) (#4718)

This commit is contained in:
Alessandro Ros
2025-07-12 14:17:04 +02:00
committed by GitHub
parent 5ac2342914
commit 82956a4abb
4 changed files with 111 additions and 23 deletions

View File

@@ -2260,7 +2260,7 @@ go tool pprof -text http://localhost:9999/debug/pprof/profile?seconds=30
#### Standard stream ID syntax
In SRT, the stream ID is a string that is sent to the counterpart in order to advertise what action the caller is gonna do (publish or read), the path and the credentials. All these informations have to be encoded into a single string. This server supports two stream ID syntaxes, a custom one (that is the one reported in rest of the README) and also a [standard one](https://github.com/Haivision/srt/blob/master/docs/features/access-control.md) proposed by the authors of the protocol and sometimes enforced by some hardware. The standard syntax can be used in this way:
In SRT, the stream ID is a string that is sent to the remote part in order to advertise what action the caller is gonna do (publish or read), the path and the credentials. All these informations have to be encoded into a single string. This server supports two stream ID syntaxes, a custom one (that is the one reported in rest of the README) and also a [standard one](https://github.com/Haivision/srt/blob/master/docs/features/access-control.md) proposed by the authors of the protocol and enforced by some hardware. The standard syntax can be used in this way:
```
srt://localhost:8890?streamid=#!::m=publish,r=mypath,u=myuser,s=mypass&pkt_size=1316

View File

@@ -13,3 +13,13 @@ type Message struct {
MessageStreamID uint32
Body []byte
}
func (m *Message) clone() *Message {
return &Message{
ChunkStreamID: m.ChunkStreamID,
Timestamp: m.Timestamp,
Type: m.Type,
MessageStreamID: m.MessageStreamID,
Body: m.Body,
}
}

View File

@@ -27,17 +27,16 @@ func joinFragments(fragments [][]byte, size uint32) []byte {
}
type readerChunkStream struct {
mr *Reader
curTimestamp uint32
curTimestampAvailable bool
curType uint8
curMessageStreamID uint32
curBodyLen uint32
curBodyFragments [][]byte
curBodyRecv uint32
curTimestampDelta uint32
curTimestampDeltaAvailable bool
hasExtendedTimestamp bool
mr *Reader
curTimestamp uint32
curTimestampAvailable bool
curType uint8
curMessageStreamID uint32
curBodyLen uint32
curBodyFragments [][]byte
curBodyRecv uint32
curTimestampDelta uint32
hasExtendedTimestamp bool
}
func (rc *readerChunkStream) readChunk(c chunk.Chunk, bodySize uint32, hasExtendedTimestamp bool) error {
@@ -80,7 +79,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.curType = rc.mr.c0.Type
rc.curTimestamp = rc.mr.c0.Timestamp
rc.curTimestampAvailable = true
rc.curTimestampDeltaAvailable = false
rc.curTimestampDelta = 0
rc.curBodyLen = rc.mr.c0.BodyLen
rc.hasExtendedTimestamp = rc.mr.c0.Timestamp >= 0xFFFFFF
@@ -100,7 +99,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.mr.msg.Type = rc.mr.c0.Type
rc.mr.msg.MessageStreamID = rc.mr.c0.MessageStreamID
rc.mr.msg.Body = rc.mr.c0.Body
return &rc.mr.msg, nil
return rc.mr.msg.clone(), nil
case 1:
if !rc.curTimestampAvailable {
@@ -119,7 +118,6 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.curType = rc.mr.c1.Type
rc.curTimestamp += rc.mr.c1.TimestampDelta
rc.curTimestampDelta = rc.mr.c1.TimestampDelta
rc.curTimestampDeltaAvailable = true
rc.curBodyLen = rc.mr.c1.BodyLen
rc.hasExtendedTimestamp = rc.mr.c1.TimestampDelta >= 0xFFFFFF
@@ -139,7 +137,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.mr.msg.Type = rc.mr.c1.Type
rc.mr.msg.MessageStreamID = rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c1.Body
return &rc.mr.msg, nil
return rc.mr.msg.clone(), nil
case 2:
if !rc.curTimestampAvailable {
@@ -162,7 +160,6 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.curTimestamp += rc.mr.c2.TimestampDelta
rc.curTimestampDelta = rc.mr.c2.TimestampDelta
rc.curTimestampDeltaAvailable = true
rc.hasExtendedTimestamp = rc.mr.c2.TimestampDelta >= 0xFFFFFF
le := uint32(len(rc.mr.c2.Body))
@@ -177,7 +174,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.mr.msg.Type = rc.curType
rc.mr.msg.MessageStreamID = rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c2.Body
return &rc.mr.msg, nil
return rc.mr.msg.clone(), nil
default: // 3
if rc.curBodyRecv != 0 {
@@ -204,10 +201,10 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.mr.msg.Body = joinFragments(rc.curBodyFragments, rc.curBodyRecv)
rc.curBodyFragments = rc.curBodyFragments[:0]
rc.curBodyRecv = 0
return &rc.mr.msg, nil
return rc.mr.msg.clone(), nil
}
if !rc.curTimestampDeltaAvailable {
if !rc.curTimestampAvailable {
return nil, fmt.Errorf("received type 3 chunk without previous chunk")
}
@@ -235,7 +232,7 @@ func (rc *readerChunkStream) readMessage(typ byte) (*Message, error) {
rc.mr.msg.Type = rc.curType
rc.mr.msg.MessageStreamID = rc.curMessageStreamID
rc.mr.msg.Body = rc.mr.c3.Body
return &rc.mr.msg, nil
return rc.mr.msg.clone(), nil
}
}

View File

@@ -2,6 +2,8 @@ package rawmessage
import (
"bytes"
"errors"
"io"
"testing"
"time"
@@ -218,11 +220,90 @@ func TestReader(t *testing.T) {
hasExtendedTimestamp = chunkHasExtendedTimestamp(cach)
}
for _, camsg := range ca.messages {
var msgs []*Message
for {
msg, err := r.Read()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
require.Equal(t, camsg, msg)
msgs = append(msgs, msg)
}
require.Equal(t, ca.messages, msgs)
})
}
}
func TestReaderAdditional(t *testing.T) {
for _, ca := range []struct {
name string
messages []*Message
chunks []chunk.Chunk
}{
{
"(chunk0) + (chunk3)",
[]*Message{
{
ChunkStreamID: 27,
Timestamp: 4279383126000000,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{5}, 15),
},
{
ChunkStreamID: 27,
Timestamp: 4279383126000000,
Type: 6,
MessageStreamID: 3123,
Body: bytes.Repeat([]byte{6}, 15),
},
},
[]chunk.Chunk{
&chunk.Chunk0{
ChunkStreamID: 27,
Timestamp: 4279383126,
Type: 6,
MessageStreamID: 3123,
BodyLen: 15,
Body: bytes.Repeat([]byte{5}, 15),
},
&chunk.Chunk3{
ChunkStreamID: 27,
Body: bytes.Repeat([]byte{6}, 15),
},
},
},
} {
t.Run(ca.name, func(t *testing.T) {
var buf bytes.Buffer
br := bytecounter.NewReader(&buf)
r := NewReader(br, br, func(_ uint32) error {
return nil
})
hasExtendedTimestamp := false
for _, cach := range ca.chunks {
buf2, err := cach.Marshal(hasExtendedTimestamp)
require.NoError(t, err)
buf.Write(buf2)
hasExtendedTimestamp = chunkHasExtendedTimestamp(cach)
}
var msgs []*Message
for {
msg, err := r.Read()
if errors.Is(err, io.EOF) {
break
}
require.NoError(t, err)
msgs = append(msgs, msg)
}
require.Equal(t, ca.messages, msgs)
})
}
}