rtsp source: support cameras that don't provide SPS and PPS inside the SDP (#411) (#707)

This commit is contained in:
aler9
2021-11-22 22:55:12 +01:00
parent 29ee78ce38
commit c7b2ae83df
2 changed files with 268 additions and 0 deletions

View File

@@ -12,6 +12,9 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/h264"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/pion/rtp"
"github.com/aler9/rtsp-simple-server/internal/conf" "github.com/aler9/rtsp-simple-server/internal/conf"
"github.com/aler9/rtsp-simple-server/internal/logger" "github.com/aler9/rtsp-simple-server/internal/logger"
@@ -183,6 +186,11 @@ func (s *rtspSource) runInner() bool {
} }
} }
err = s.handleMissingH264Params(c, tracks)
if err != nil {
return err
}
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{ res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s, Source: s,
Tracks: c.Tracks(), Tracks: c.Tracks(),
@@ -226,6 +234,150 @@ func (s *rtspSource) runInner() bool {
} }
} }
func (s *rtspSource) handleMissingH264Params(c *gortsplib.Client, tracks gortsplib.Tracks) error {
h264TrackID := func() int {
for i, t := range tracks {
if t.IsH264() {
return i
}
}
return -1
}()
if h264TrackID < 0 {
return nil
}
_, err := tracks[h264TrackID].ExtractConfigH264()
if err == nil {
return nil
}
s.log(logger.Info, "source has not provided H264 parameters (SPS and PPS)"+
" inside the SDP; extracting them from the stream...")
var streamMutex sync.RWMutex
var stream *stream
var payloadType uint8
decoder := rtph264.NewDecoder()
var sps []byte
var pps []byte
paramsReceived := make(chan struct{})
c.OnPacketRTP = func(trackID int, payload []byte) {
streamMutex.RLock()
defer streamMutex.RUnlock()
if stream == nil {
if trackID != h264TrackID {
return
}
select {
case <-paramsReceived:
return
default:
}
var pkt rtp.Packet
err := pkt.Unmarshal(payload)
if err != nil {
return
}
nalus, _, err := decoder.Decode(&pkt)
if err != nil {
return
}
payloadType = pkt.Header.PayloadType
for _, nalu := range nalus {
typ := h264.NALUType(nalu[0] & 0x1F)
switch typ {
case h264.NALUTypeSPS:
sps = nalu
if sps != nil && pps != nil {
close(paramsReceived)
}
case h264.NALUTypePPS:
pps = nalu
if sps != nil && pps != nil {
close(paramsReceived)
}
}
}
} else {
stream.onPacketRTP(trackID, payload)
}
}
c.OnPacketRTCP = func(trackID int, payload []byte) {
streamMutex.RLock()
defer streamMutex.RUnlock()
if stream != nil {
stream.onPacketRTCP(trackID, payload)
}
}
_, err = c.Play(nil)
if err != nil {
return err
}
waitError := make(chan error)
go func() {
waitError <- c.Wait()
}()
timeout := time.NewTimer(15 * time.Second)
defer timeout.Stop()
select {
case err := <-waitError:
return err
case <-timeout.C:
return fmt.Errorf("source did not send H264 parameters in time")
case <-paramsReceived:
s.log(logger.Info, "H264 parameters extracted")
track, err := gortsplib.NewTrackH264(payloadType, &gortsplib.TrackConfigH264{
SPS: sps,
PPS: pps,
})
if err != nil {
return err
}
tracks[h264TrackID] = track
res := s.parent.onSourceStaticSetReady(pathSourceStaticSetReadyReq{
Source: s,
Tracks: tracks,
})
if res.Err != nil {
return res.Err
}
func() {
streamMutex.Lock()
defer streamMutex.Unlock()
stream = res.Stream
}()
s.log(logger.Info, "ready")
defer func() {
s.parent.OnSourceStaticSetNotReady(pathSourceStaticSetNotReadyReq{Source: s})
}()
}
return <-waitError
}
// onSourceAPIDescribe implements source. // onSourceAPIDescribe implements source.
func (*rtspSource) onSourceAPIDescribe() interface{} { func (*rtspSource) onSourceAPIDescribe() interface{} {
return struct { return struct {

View File

@@ -9,6 +9,9 @@ import (
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
"github.com/aler9/gortsplib/pkg/auth" "github.com/aler9/gortsplib/pkg/auth"
"github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/rtph264"
"github.com/pion/rtp"
psdp "github.com/pion/sdp/v3"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
) )
@@ -205,3 +208,116 @@ func TestRTSPSourceNoPassword(t *testing.T) {
<-done <-done
} }
func TestRTSPSourceMissingH264Params(t *testing.T) {
track, _ := gortsplib.NewTrackH264(96,
&gortsplib.TrackConfigH264{SPS: []byte{0x01, 0x02, 0x03, 0x04}, PPS: []byte{0x05, 0x06}})
var newattrs []psdp.Attribute
for _, attr := range track.Media.Attributes {
if attr.Key != "fmtp" {
newattrs = append(newattrs, attr)
}
}
track.Media.Attributes = newattrs
stream := gortsplib.NewServerStream(gortsplib.Tracks{track})
s := gortsplib.Server{
Handler: &testServer{
onDescribe: func(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() {
time.Sleep(500 * time.Millisecond)
enc := rtph264.NewEncoder(96, nil, nil, nil)
pkts, err := enc.Encode([][]byte{{5}}, 0) // IDR
require.NoError(t, err)
byts, _ := pkts[0].Marshal()
stream.WritePacketRTP(0, byts)
pkts, err = enc.Encode([][]byte{{7, 1, 2, 3}}, 0) // SPS
require.NoError(t, err)
byts, _ = pkts[0].Marshal()
stream.WritePacketRTP(0, byts)
pkts, err = enc.Encode([][]byte{{8}}, 0) // PPS
require.NoError(t, err)
byts, _ = pkts[0].Marshal()
stream.WritePacketRTP(0, byts)
pkts, err = enc.Encode([][]byte{{5, 1}}, 0) // IDR
require.NoError(t, err)
byts, _ = pkts[0].Marshal()
stream.WritePacketRTP(0, byts)
time.Sleep(500 * time.Millisecond)
pkts, err = enc.Encode([][]byte{{5, 2}}, 0) // IDR
require.NoError(t, err)
byts, _ = pkts[0].Marshal()
stream.WritePacketRTP(0, byts)
}()
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8555",
}
err := s.Start()
require.NoError(t, err)
defer s.Wait()
defer s.Close()
p, ok := newInstance("rtmpDisable: yes\n" +
"hlsDisable: yes\n" +
"paths:\n" +
" proxied:\n" +
" source: rtsp://127.0.0.1:8555/teststream\n" +
" sourceOnDemand: yes\n")
require.Equal(t, true, ok)
defer p.close()
received := make(chan struct{})
decoder := rtph264.NewDecoder()
c := gortsplib.Client{
OnPacketRTP: func(trackID int, payload []byte) {
var pkt rtp.Packet
err := pkt.Unmarshal(payload)
if err != nil {
return
}
nalus, _, err := decoder.Decode(&pkt)
if err != nil {
return
}
require.Equal(t, [][]byte{{0x05, 0x02}}, nalus)
close(received)
},
}
err = c.StartReading("rtsp://127.0.0.1:8554/proxied")
require.NoError(t, err)
defer c.Close()
conf, err := c.Tracks()[0].ExtractConfigH264()
require.NoError(t, err)
require.Equal(t, []byte{7, 1, 2, 3}, conf.SPS)
require.Equal(t, []byte{8}, conf.PPS)
<-received
}