support routing KLV metadata (#2693) (#4670)

Co-authored-by: aler9 <46489434+aler9@users.noreply.github.com>
This commit is contained in:
Yaroslav Molochko
2025-07-06 21:34:41 +03:00
committed by GitHub
parent 1083eea307
commit 0df5e2c81a
10 changed files with 301 additions and 6 deletions

4
go.mod
View File

@@ -10,8 +10,8 @@ require (
github.com/alecthomas/kong v1.12.0
github.com/asticode/go-astits v1.13.0
github.com/bluenviron/gohlslib/v2 v2.2.0
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97
github.com/bluenviron/mediacommon/v2 v2.2.0
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e
github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9
github.com/datarhei/gosrt v0.9.0
github.com/fsnotify/fsnotify v1.9.0
github.com/gin-contrib/pprof v1.5.3

8
go.sum
View File

@@ -35,10 +35,10 @@ github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c h1:8XZeJrs4+ZYh
github.com/benburkert/openpgp v0.0.0-20160410205803-c2471f86866c/go.mod h1:x1vxHcL/9AVzuk5HOloOEPrtJY0MaalYr78afXZ+pWI=
github.com/bluenviron/gohlslib/v2 v2.2.0 h1:eIsCai3IHP0F538h2tCPCRkhQ7XSOaxeceMyPns0o1k=
github.com/bluenviron/gohlslib/v2 v2.2.0/go.mod h1:sLyKB5iM6Su1kucNHuDUU9aeN/Hw4WxsV2y9k2IHMGs=
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97 h1:V8m1pyQOYVEJK5RBy1SLg/Y+hgXYFFiMZOd7NhWWLAE=
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250705110245-9c1011567a97/go.mod h1:rur2QGh1wRU6KINZn8LwU8qTPFt1XafJGtsfs0KYzRo=
github.com/bluenviron/mediacommon/v2 v2.2.0 h1:fGXEX0OEvv5VhGHOv3Q2ABzOtSkIpl9UbwOHrnKWNTk=
github.com/bluenviron/mediacommon/v2 v2.2.0/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0=
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e h1:XSCp4Q0DZcv2pQs2knAnB/mIr095QtPbshPSGRiPteg=
github.com/bluenviron/gortsplib/v4 v4.14.2-0.20250706181149-52489821375e/go.mod h1:rur2QGh1wRU6KINZn8LwU8qTPFt1XafJGtsfs0KYzRo=
github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9 h1:cleSKsYkXx8y36auvXw3zone9t9JSFTT/4Kr+VLKvGw=
github.com/bluenviron/mediacommon/v2 v2.2.1-0.20250706163316-d1fe0aa1b8d9/go.mod h1:a6MbPmXtYda9mKibKVMZlW20GYLLrX2R7ZkUE+1pwV0=
github.com/bytedance/sonic v1.13.2 h1:8/H1FempDZqC4VqjptGo14QQlJx8VdZJegxs6wwfqpQ=
github.com/bytedance/sonic v1.13.2/go.mod h1:o68xyaF9u2gvVBuGHPlUVCy+ZfmNNO5ETf1+KgkJhz4=
github.com/bytedance/sonic/loader v0.1.1/go.mod h1:ncP89zfokxS5LZrJxl5z0UJcsk4M4yY2JpfqGeCtNLU=

View File

@@ -0,0 +1,119 @@
package formatprocessor
import (
"fmt"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/format/rtpklv"
"github.com/pion/rtp"
"github.com/bluenviron/mediamtx/internal/logger"
"github.com/bluenviron/mediamtx/internal/unit"
)
type klv struct {
RTPMaxPayloadSize int
Format *format.KLV
GenerateRTPPackets bool
Parent logger.Writer
encoder *rtpklv.Encoder
decoder *rtpklv.Decoder
randomStart uint32
}
func (t *klv) initialize() error {
if t.GenerateRTPPackets {
err := t.createEncoder()
if err != nil {
return err
}
t.randomStart, err = randUint32()
if err != nil {
return err
}
}
return nil
}
func (t *klv) createEncoder() error {
t.encoder = &rtpklv.Encoder{
PayloadMaxSize: t.RTPMaxPayloadSize,
PayloadType: t.Format.PayloadTyp,
}
return t.encoder.Init()
}
func (t *klv) ProcessUnit(uu unit.Unit) error { //nolint:dupl
u := uu.(*unit.KLV)
if u.Unit != nil {
// ensure the format processor's encoder is initialized
if t.encoder == nil {
err := t.createEncoder()
if err != nil {
return err
}
}
pkts, err := t.encoder.Encode(u.Unit)
if err != nil {
return err
}
u.RTPPackets = pkts
for _, pkt := range u.RTPPackets {
pkt.Timestamp += t.randomStart + uint32(u.PTS)
}
}
return nil
}
func (t *klv) ProcessRTPPacket( //nolint:dupl
pkt *rtp.Packet,
ntp time.Time,
pts int64,
hasNonRTSPReaders bool,
) (unit.Unit, error) {
u := &unit.KLV{
Base: unit.Base{
RTPPackets: []*rtp.Packet{pkt},
NTP: ntp,
PTS: pts,
},
}
// remove padding
pkt.Padding = false
pkt.PaddingSize = 0
if len(pkt.Payload) > t.RTPMaxPayloadSize {
return nil, fmt.Errorf("RTP payload size (%d) is greater than maximum allowed (%d)",
len(pkt.Payload), t.RTPMaxPayloadSize)
}
// decode from RTP
if hasNonRTSPReaders || t.decoder != nil {
if t.decoder == nil {
var err error
t.decoder, err = t.Format.CreateDecoder()
if err != nil {
return nil, err
}
}
unit, err := t.decoder.Decode(pkt)
if err != nil {
return nil, err
}
u.Unit = unit
}
// route packet as is
return u, nil
}

View File

@@ -0,0 +1,84 @@
package formatprocessor
import (
"testing"
"time"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediamtx/internal/unit"
"github.com/pion/rtp"
"github.com/stretchr/testify/require"
)
func TestKlvCreateEncoder(t *testing.T) {
forma := &format.KLV{
PayloadTyp: 96,
}
p, err := New(1472, forma, false, nil)
require.NoError(t, err)
klvProc := p.(*klv)
err = klvProc.createEncoder()
require.NoError(t, err)
}
func TestKlvProcessUnit(t *testing.T) {
forma := &format.KLV{
PayloadTyp: 96,
}
p, err := New(1472, forma, true, nil)
require.NoError(t, err)
// create test Unit
theTime := time.Now()
when := int64(5000000000) // 5 seconds in nanoseconds
u := unit.KLV{
Base: unit.Base{
RTPPackets: nil,
NTP: theTime,
PTS: when,
},
Unit: []byte{1, 2, 3, 4},
}
uu := &u
// process the unit
err = p.ProcessUnit(uu)
require.NoError(t, err)
}
func TestKlvProcessRTPPacket(t *testing.T) {
forma := &format.KLV{
PayloadTyp: 96,
}
p, err := New(1472, forma, false, nil)
require.NoError(t, err)
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 3446,
Timestamp: 175349,
SSRC: 563423,
Padding: true,
},
Payload: []byte{1, 2, 3, 4},
PaddingSize: 20,
}
_, err = p.ProcessRTPPacket(pkt, time.Time{}, 0, false)
require.NoError(t, err)
require.Equal(t, &rtp.Packet{
Header: rtp.Header{
Version: 2,
Marker: true,
PayloadType: 96,
SequenceNumber: 3446,
Timestamp: 175349,
SSRC: 563423,
},
Payload: []byte{1, 2, 3, 4},
}, pkt)
}

View File

@@ -119,6 +119,14 @@ func New(
Parent: parent,
}
case *format.KLV:
proc = &klv{
RTPMaxPayloadSize: rtpMaxPayloadSize,
Format: forma,
GenerateRTPPackets: generateRTPPackets,
Parent: parent,
}
case *format.MPEG4Audio:
proc = &mpeg4Audio{
RTPMaxPayloadSize: rtpMaxPayloadSize,

View File

@@ -83,6 +83,13 @@ func TestNew(t *testing.T) {
&format.LPCM{},
&lpcm{},
},
{
"klv",
&format.KLV{
PayloadTyp: 96,
},
&klv{},
},
{
"generic",
&format.Generic{},

View File

@@ -229,6 +229,30 @@ func FromStream(
}
return bw.Flush()
})
case *format.KLV:
track := &mcmpegts.Track{
Codec: &mcmpegts.CodecKLV{
Synchronous: true,
},
}
addTrack(
media,
forma,
track,
func(u unit.Unit) error {
tunit := u.(*unit.KLV)
if tunit.Unit == nil {
return nil
}
sconn.SetWriteDeadline(time.Now().Add(writeTimeout))
err := (*w).WriteKLV(track, multiplyAndDivide(tunit.PTS, 90000, 90000), tunit.Unit)
if err != nil {
return err
}
return bw.Flush()
})
case *format.MPEG4Audio:
co := forma.GetConfig()

View File

@@ -139,6 +139,26 @@ func ToStream(
return nil
})
case *mpegts.CodecKLV:
medi = &description.Media{
Type: description.MediaTypeApplication,
Formats: []format.Format{&format.KLV{
PayloadTyp: 96,
}},
}
r.OnDataKLV(track, func(pts int64, uni []byte) error {
pts = td.Decode(pts)
(*stream).WriteUnit(medi, medi.Formats[0], &unit.KLV{
Base: unit.Base{
NTP: time.Now(),
PTS: pts,
},
Unit: uni,
})
return nil
})
case *mpegts.CodecMPEG4Audio:
medi = &description.Media{
Type: description.MediaTypeAudio,

View File

@@ -279,6 +279,32 @@ func (f *formatMPEGTS) initialize() bool {
)
})
case *rtspformat.KLV:
track := addTrack(forma, &mpegts.CodecKLV{
Synchronous: true,
})
f.ri.stream.AddReader(
f.ri,
media,
forma,
func(u unit.Unit) error {
tunit := u.(*unit.KLV)
if tunit.Unit == nil {
return nil
}
return f.write(
timestampToDuration(tunit.PTS, 90000),
tunit.NTP,
false,
true,
func() error {
return f.mw.WriteKLV(track, multiplyAndDivide(tunit.PTS, 90000, 90000), tunit.Unit)
},
)
})
case *rtspformat.MPEG4Audio:
co := forma.GetConfig()
if co == nil {

7
internal/unit/klv.go Normal file
View File

@@ -0,0 +1,7 @@
package unit
// KLV is a KLV data unit.
type KLV struct {
Base
Unit []byte
}