support KLV tracks (#808)

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
This commit is contained in:
Yaroslav Molochko
2025-07-06 20:56:52 +03:00
committed by GitHub
parent ff94dc956b
commit e02c447868
17 changed files with 705 additions and 0 deletions

View File

@@ -141,6 +141,7 @@ In RTSP, media streams are transmitted by using RTP packets, which are encoded i
|codec|documentation|encoder and decoder available| |codec|documentation|encoder and decoder available|
|------|-------------|-----------------------------| |------|-------------|-----------------------------|
|MPEG-TS|[link](https://pkg.go.dev/github.com/bluenviron/gortsplib/v4/pkg/format#MPEGTS)|| |MPEG-TS|[link](https://pkg.go.dev/github.com/bluenviron/gortsplib/v4/pkg/format#MPEGTS)||
|KLV|[link](https://pkg.go.dev/github.com/bluenviron/gortsplib/v4/pkg/format#KLV)|:heavy_check_mark:|
## Specifications ## Specifications
@@ -168,6 +169,7 @@ In RTSP, media streams are transmitted by using RTP packets, which are encoded i
|[RFC5574, RTP Payload Format for the Speex Codec](https://datatracker.ietf.org/doc/html/rfc5574)|payload formats / Speex| |[RFC5574, RTP Payload Format for the Speex Codec](https://datatracker.ietf.org/doc/html/rfc5574)|payload formats / Speex|
|[RFC3551, RTP Profile for Audio and Video Conferences with Minimal Control](https://datatracker.ietf.org/doc/html/rfc3551)|payload formats / G726, G722, G711, LPCM| |[RFC3551, RTP Profile for Audio and Video Conferences with Minimal Control](https://datatracker.ietf.org/doc/html/rfc3551)|payload formats / G726, G722, G711, LPCM|
|[RFC3190, RTP Payload Format for 12-bit DAT Audio and 20- and 24-bit Linear Sampled Audio](https://datatracker.ietf.org/doc/html/rfc3190)|payload formats / LPCM| |[RFC3190, RTP Payload Format for 12-bit DAT Audio and 20- and 24-bit Linear Sampled Audio](https://datatracker.ietf.org/doc/html/rfc3190)|payload formats / LPCM|
|[RFC6597, RTP Payload Format for Society of Motion Picture and Television Engineers (SMPTE) ST 336 Encoded Data](https://datatracker.ietf.org/doc/html/rfc6597)|payload formats / KLV|
|[Codec specifications](https://github.com/bluenviron/mediacommon#specifications)|codecs| |[Codec specifications](https://github.com/bluenviron/mediacommon#specifications)|codecs|
|[Golang project layout](https://github.com/golang-standards/project-layout)|project layout| |[Golang project layout](https://github.com/golang-standards/project-layout)|project layout|

View File

@@ -185,6 +185,11 @@ func Unmarshal(md *psdp.MediaDescription, payloadTypeStr string) (Format, error)
case codec == "l8", codec == "l16", codec == "l24" && payloadType >= 96 && payloadType <= 127: case codec == "l8", codec == "l16", codec == "l24" && payloadType >= 96 && payloadType <= 127:
return &LPCM{} return &LPCM{}
// application
case codec == "smtpe336m" && payloadType >= 96 && payloadType <= 127:
return &KLV{}
/* /*
* static payload types * static payload types
**/ **/

View File

@@ -1194,6 +1194,19 @@ var casesFormat = []struct {
"TP-LINK/90000", "TP-LINK/90000",
nil, nil,
}, },
{
"application klv",
"v=0\n" +
"s=\n" +
"m=application 0 RTP/AVP 97\n" +
"a=rtpmap:97 smtpe336m/90000\n",
&KLV{
PayloadTyp: 97,
},
97,
"smtpe336m/90000",
nil,
},
{ {
"audio aac from AVOIP (issue mediamtx/4183)", "audio aac from AVOIP (issue mediamtx/4183)",
"v=0\r\n" + "v=0\r\n" +

74
pkg/format/klv.go Normal file
View File

@@ -0,0 +1,74 @@
package format
import (
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpklv"
)
// KLV is the RTP format for KLV data.
// Specification: https://datatracker.ietf.org/doc/html/rfc6597
type KLV struct {
PayloadTyp uint8
}
func (f *KLV) unmarshal(ctx *unmarshalContext) error {
f.PayloadTyp = ctx.payloadType
return nil
}
// Codec implements Format.
func (f *KLV) Codec() string {
return "KLV"
}
// ClockRate implements Format.
func (f *KLV) ClockRate() int {
return 90000
}
// PayloadType implements Format.
func (f *KLV) PayloadType() uint8 {
return f.PayloadTyp
}
// RTPMap implements Format.
func (f *KLV) RTPMap() string {
return "smtpe336m/90000"
}
// FMTP implements Format.
func (f *KLV) FMTP() map[string]string {
return nil
}
// PTSEqualsDTS implements Format.
func (f *KLV) PTSEqualsDTS(*rtp.Packet) bool {
return true
}
// CreateDecoder creates a decoder able to decode the content of the format.
func (f *KLV) CreateDecoder() (*rtpklv.Decoder, error) {
d := &rtpklv.Decoder{}
err := d.Init()
if err != nil {
return nil, err
}
return d, nil
}
// CreateEncoder creates an encoder able to encode the content of the format.
func (f *KLV) CreateEncoder() (*rtpklv.Encoder, error) {
e := &rtpklv.Encoder{
PayloadType: f.PayloadTyp,
}
err := e.Init()
if err != nil {
return nil, err
}
return e, nil
}

17
pkg/format/klv_test.go Normal file
View File

@@ -0,0 +1,17 @@
package format
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestKLVAttributes(t *testing.T) {
format := &KLV{
PayloadTyp: 96,
}
require.Equal(t, "KLV", format.Codec())
require.Equal(t, 90000, format.ClockRate())
require.Equal(t, true, format.PTSEqualsDTS(&rtp.Packet{}))
}

View File

@@ -0,0 +1,195 @@
package rtpklv
import (
"errors"
"fmt"
"github.com/pion/rtp"
)
// ErrMorePacketsNeeded is returned when more packets are needed to complete a KLV unit.
var ErrMorePacketsNeeded = errors.New("need more packets")
// ErrNonStartingPacketAndNoPrevious is returned when we received a non-starting
// packet of a fragmented KLV unit and we didn't receive anything before.
// It's normal to receive this when decoding a stream that has been already
// running for some time.
var ErrNonStartingPacketAndNoPrevious = errors.New(
"received a non-starting fragment without any previous starting fragment")
// Decoder is a RTP/KLV decoder.
// Specification: https://datatracker.ietf.org/doc/html/rfc6597
type Decoder struct {
// buffer for accumulating KLV unit data across multiple packets
buffer []byte
// expected total size of the current KLV unit being assembled
expectedSize int
// timestamp of the current KLV unit being assembled
currentTimestamp uint32
// whether we're currently assembling a KLV unit
assembling bool
// sequence number of the last processed packet
lastSeqNum uint16
// whether we've received the first packet
firstPacketReceived bool
}
// Init initializes the decoder.
func (d *Decoder) Init() error {
d.reset()
return nil
}
// reset clears the decoder state.
func (d *Decoder) reset() {
d.buffer = d.buffer[:0]
d.expectedSize = 0
d.currentTimestamp = 0
d.assembling = false
d.firstPacketReceived = false
}
// parseKLVLength parses the KLV length field according to SMPTE ST 336.
// Returns the length value and the number of bytes consumed for the length field.
func parseKLVLength(data []byte) (uint, uint, error) {
if len(data) < 1 {
return 0, 0, fmt.Errorf("buffer is too short")
}
firstByte := data[0]
// Short form: if bit 7 is 0, the length is in the lower 7 bits
if (firstByte & 0x80) == 0 {
return uint(firstByte & 0x7f), 1, nil
}
// Long form: bit 7 is 1, lower 7 bits indicate number of subsequent length bytes
lengthBytes := uint(firstByte & 0x7f)
if lengthBytes == 0 || lengthBytes > 8 {
return 0, 0, fmt.Errorf("invalid length field: %d bytes", lengthBytes)
}
totalLengthSize := 1 + lengthBytes
if int(totalLengthSize) > len(data) {
return 0, 0, fmt.Errorf("insufficient data for length field")
}
// Parse the length value from the subsequent bytes
var lengthValue uint
for i := range lengthBytes {
lengthValue = (lengthValue << 8) | uint(data[1+i])
}
return lengthValue, totalLengthSize, nil
}
// isKLVStart checks if the payload starts with a KLV Universal Label Key.
// KLV Universal Label Keys start with the 4-byte prefix: 0x060e2b34
func isKLVStart(payload []byte) bool {
if len(payload) < 4 {
return false
}
return payload[0] == 0x06 && payload[1] == 0x0e && payload[2] == 0x2b && payload[3] == 0x34
}
func splitKLVUnit(buf []byte) ([][]byte, error) {
var klvUnit [][]byte
for {
if len(buf) < 16 {
return nil, fmt.Errorf("buffer is too short")
}
n := uint(16)
le, leSize, err := parseKLVLength(buf[n:])
if err != nil {
return nil, err
}
n += leSize
if uint(len(buf[n:])) < le {
return nil, fmt.Errorf("buffer is too short")
}
n += le
klv := buf[:n]
buf = buf[n:]
klvUnit = append(klvUnit, klv)
if len(buf) == 0 {
break
}
}
return klvUnit, nil
}
// Decode decodes a KLV unit from RTP packets.
// It returns the complete KLV unit when all packets have been received,
// or ErrMorePacketsNeeded if more packets are needed.
func (d *Decoder) Decode(pkt *rtp.Packet) ([][]byte, error) {
payload := pkt.Payload
marker := pkt.Marker
timestamp := pkt.Timestamp
seqNum := pkt.SequenceNumber
// Check for sequence number gaps (packet loss)
if d.firstPacketReceived {
expectedSeq := d.lastSeqNum + 1
if seqNum != expectedSeq {
// Packet loss detected, reset state
d.reset()
return nil, fmt.Errorf("packet loss detected: expected seq %d, got %d", expectedSeq, seqNum)
}
}
d.lastSeqNum = seqNum
d.firstPacketReceived = true
// If we're not currently assembling and this packet doesn't start a new KLV unit
if !d.assembling {
// Check if this looks like the start of a KLV unit
if !isKLVStart(payload) {
return nil, ErrNonStartingPacketAndNoPrevious
}
// This is the start of a new KLV unit
d.currentTimestamp = timestamp
d.assembling = true
d.buffer = append(d.buffer[:0], payload...)
// Try to determine the expected size if we have enough data
if len(payload) >= 17 { // 16 bytes for Universal Label Key + at least 1 byte for length
valueLength, lengthSize, err := parseKLVLength(payload[16:])
if err == nil {
d.expectedSize = 16 + int(lengthSize) + int(valueLength)
}
}
} else {
// We're assembling a KLV unit
if timestamp != d.currentTimestamp {
// Timestamp changed, this is a new KLV unit
// The previous unit was incomplete
d.reset()
return nil, fmt.Errorf("incomplete KLV unit: timestamp changed from %d to %d", d.currentTimestamp, timestamp)
}
// Append this packet's payload to the buffer
d.buffer = append(d.buffer, payload...)
}
// Check if we have a complete KLV unit
if marker {
result := d.buffer
d.reset()
return splitKLVUnit(result)
}
// If we know the expected size and have reached it, return the complete unit
if d.expectedSize > 0 && len(d.buffer) >= d.expectedSize {
result := d.buffer[:d.expectedSize]
d.reset()
return splitKLVUnit(result)
}
// Need more packets
return nil, ErrMorePacketsNeeded
}

View File

@@ -0,0 +1,77 @@
package rtpklv
import (
"errors"
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestDecode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
d := &Decoder{}
err := d.Init()
require.NoError(t, err)
var klvUnit [][]byte
for _, pkt := range ca.pkts {
clone := pkt.Clone()
addUnits, err := d.Decode(pkt)
// test input integrity
require.Equal(t, clone, pkt)
if errors.Is(err, ErrMorePacketsNeeded) {
continue
}
require.NoError(t, err)
klvUnit = append(klvUnit, addUnits...)
}
require.Equal(t, ca.klvUnit, klvUnit)
})
}
}
func FuzzDecoder(f *testing.F) {
f.Fuzz(func(t *testing.T, a []byte, am bool, b []byte, bm bool) {
d := &Decoder{}
err := d.Init()
require.NoError(t, err)
klvUnit, err := d.Decode(&rtp.Packet{
Header: rtp.Header{
Marker: am,
SequenceNumber: 17645,
},
Payload: a,
})
if errors.Is(err, ErrMorePacketsNeeded) {
klvUnit, err = d.Decode(&rtp.Packet{
Header: rtp.Header{
Marker: bm,
SequenceNumber: 17646,
},
Payload: b,
})
}
if err == nil {
if len(klvUnit) == 0 {
t.Errorf("should not happen")
}
for _, nalu := range klvUnit {
if len(nalu) == 0 {
t.Errorf("should not happen")
}
}
}
})
}

View File

@@ -0,0 +1,129 @@
package rtpklv
import (
"crypto/rand"
"github.com/pion/rtp"
)
const (
rtpVersion = 2
defaultPayloadMaxSize = 1450 // 1500 (UDP MTU) - 20 (IP header) - 8 (UDP header) - 12 (RTP header) - 10 (SRTP overhead)
)
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
// Encoder is a RTP/KLV encoder.
// Specification: https://datatracker.ietf.org/doc/html/rfc6597
type Encoder struct {
// payload type of packets.
PayloadType uint8
// SSRC of packets (optional).
// It defaults to a random value.
SSRC *uint32
// initial sequence number of packets (optional).
// It defaults to a random value.
InitialSequenceNumber *uint16
// maximum size of packet payloads (optional).
// It defaults to 1450.
PayloadMaxSize int
sequenceNumber uint16
}
// Init initializes the encoder.
func (e *Encoder) Init() error {
if e.SSRC == nil {
v, err := randUint32()
if err != nil {
return err
}
e.SSRC = &v
}
if e.InitialSequenceNumber == nil {
v, err := randUint32()
if err != nil {
return err
}
v2 := uint16(v)
e.InitialSequenceNumber = &v2
}
if e.PayloadMaxSize == 0 {
e.PayloadMaxSize = defaultPayloadMaxSize
}
e.sequenceNumber = *e.InitialSequenceNumber
return nil
}
// Encode encodes a KLV unit into RTP packets.
func (e *Encoder) Encode(unit [][]byte) ([]*rtp.Packet, error) {
size := 0
for _, item := range unit {
size += len(item)
}
encodedUnit := make([]byte, size)
n := 0
for _, item := range unit {
n += copy(encodedUnit[n:], item)
}
var packets []*rtp.Packet
if len(encodedUnit) <= e.PayloadMaxSize {
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
SSRC: *e.SSRC,
Marker: true, // Single packet, so this is the last (and only) packet
},
Payload: encodedUnit,
}
e.sequenceNumber++
return []*rtp.Packet{pkt}, nil
}
// KLV unit needs to be fragmented across multiple packets
offset := 0
for offset < len(encodedUnit) {
// Calculate payload size for this packet
payloadSize := e.PayloadMaxSize
if offset+payloadSize > len(encodedUnit) {
payloadSize = len(encodedUnit) - offset
}
// Determine if this is the last packet
isLast := (offset + payloadSize) >= len(encodedUnit)
// Create the packet
pkt := &rtp.Packet{
Header: rtp.Header{
Version: rtpVersion,
PayloadType: e.PayloadType,
SequenceNumber: e.sequenceNumber,
SSRC: *e.SSRC,
Marker: isLast, // Set marker bit only on the last packet
},
Payload: encodedUnit[offset : offset+payloadSize],
}
packets = append(packets, pkt)
e.sequenceNumber++
offset += payloadSize
}
return packets, nil
}

View File

@@ -0,0 +1,156 @@
package rtpklv
import (
"bytes"
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func uint32Ptr(v uint32) *uint32 {
return &v
}
func uint16Ptr(v uint16) *uint16 {
return &v
}
func mergeBytes(vals ...[]byte) []byte {
size := 0
for _, v := range vals {
size += len(v)
}
res := make([]byte, size)
pos := 0
for _, v := range vals {
n := copy(res[pos:], v)
pos += n
}
return res
}
var cases = []struct {
name string
klvUnit [][]byte
pkts []*rtp.Packet
}{
{
"single",
[][]byte{
{
0x06, 0x0e, 0x2b, 0x34, 0x01, 0x01, 0x01, 0x01,
0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
0x03, // Length = 3
0x41, 0x42, 0x43, // "ABC"
},
{
0x06, 0x0e, 0x2b, 0x34, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x03, // Length = 3
0x44, 0x45, 0x46, // "DEF"
},
},
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17645,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x06, 0x0e, 0x2b, 0x34, 0x01, 0x01, 0x01, 0x01,
0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
0x03, 0x41, 0x42, 0x43, 0x06, 0x0e, 0x2b, 0x34,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x03, 0x44, 0x45, 0x46,
},
},
},
},
{
"fragmented",
[][]byte{
append(
[]byte{
0x06, 0x0e, 0x2b, 0x34, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0b10000001, 240, // Length = 240
},
bytes.Repeat([]byte{1, 2, 3, 4}, 240/4)...,
),
},
[]*rtp.Packet{
{
Header: rtp.Header{
Version: 2,
Marker: false,
PayloadType: 96,
SequenceNumber: 17645,
SSRC: 0x9dbb7812,
},
Payload: mergeBytes(
[]byte{
0x06, 0x0e, 0x2b, 0x34, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x81, 0xf0,
},
bytes.Repeat([]byte{1, 2, 3, 4}, 182/4),
[]byte{1, 2},
),
},
{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 17646,
SSRC: 0x9dbb7812,
},
Payload: []byte{
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04, 0x01, 0x02, 0x03, 0x04, 0x01, 0x02,
0x03, 0x04,
},
},
},
},
}
func TestEncode(t *testing.T) {
for _, ca := range cases {
t.Run(ca.name, func(t *testing.T) {
e := &Encoder{
PayloadType: 96,
SSRC: uint32Ptr(0x9dbb7812),
InitialSequenceNumber: uint16Ptr(0x44ed),
PayloadMaxSize: 200,
}
err := e.Init()
require.NoError(t, err)
pkts, err := e.Encode(ca.klvUnit)
require.NoError(t, err)
require.Equal(t, ca.pkts, pkts)
})
}
}
func TestEncodeRandomInitialState(t *testing.T) {
e := &Encoder{
PayloadType: 96,
}
err := e.Init()
require.NoError(t, err)
require.NotEqual(t, nil, e.SSRC)
require.NotEqual(t, nil, e.InitialSequenceNumber)
}

View File

@@ -0,0 +1,2 @@
// Package rtpklv contains a RTP decoder and encoder for KLV data.
package rtpklv

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+4000000000000\x88%%%%\x88000000")
bool(true)
[]byte("\x06")
bool(true)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+4")
bool(true)
[]byte("0")
bool(true)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+4000000000000\x00")
bool(false)
[]byte("0")
bool(false)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+400000000000")
bool(false)
[]byte("0")
bool(true)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("0")
bool(false)
[]byte("")
bool(false)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+4000000000000\x88\x88000000")
bool(false)
[]byte("0")
bool(true)

View File

@@ -0,0 +1,5 @@
go test fuzz v1
[]byte("\x06\x0e+4")
bool(false)
[]byte("000000000000\xc5")
bool(true)