server: support reading back channels (#597) (#777)

This commit is contained in:
Alessandro Ros
2025-05-04 17:04:15 +02:00
committed by GitHub
parent cc0c96626c
commit b407cb1dd0
14 changed files with 894 additions and 88 deletions

View File

@@ -40,6 +40,7 @@ Features:
* Write streams with the UDP, UDP-multicast or TCP transport protocol
* Write TLS-encrypted streams (TCP only)
* Compute and provide SSRC, RTP-Info to clients
* Read ONVIF back channels
* Utilities
* Parse RTSP elements
* Encode/decode RTP packets into/from codec-specific frames
@@ -97,7 +98,9 @@ Features:
* [server-auth](examples/server-auth/main.go)
* [server-record-format-h264-to-disk](examples/server-record-format-h264-to-disk/main.go)
* [server-play-format-h264-from-disk](examples/server-play-format-h264-from-disk/main.go)
* [server-play-backchannel](examples/server-play-backchannel/main.go)
* [proxy](examples/proxy/main.go)
* [proxy-backchannel](examples/proxy-backchannel/main.go)
## API Documentation

View File

@@ -0,0 +1,112 @@
package main
import (
"log"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
)
const (
existingStream = "rtsp://127.0.0.1:8554/mystream"
reconnectPause = 2 * time.Second
)
func findG711BackChannel(desc *description.Session) (*description.Media, *format.G711) {
for _, media := range desc.Medias {
if media.IsBackChannel {
for _, forma := range media.Formats {
if g711, ok := forma.(*format.G711); ok {
return media, g711
}
}
}
}
return nil, nil
}
type client struct {
server *server
}
func (c *client) initialize() {
// start a separated routine
go c.run()
}
func (c *client) run() {
for {
err := c.read()
log.Printf("ERR: %s\n", err)
time.Sleep(reconnectPause)
}
}
func (c *client) read() error {
rc := gortsplib.Client{
RequestBackChannels: true,
}
// parse URL
u, err := base.ParseURL(existingStream)
if err != nil {
return err
}
// connect to the server
err = rc.Start(u.Scheme, u.Host)
if err != nil {
return err
}
defer rc.Close()
// find available medias
desc, _, err := rc.Describe(u)
if err != nil {
return err
}
// find the back channel
backChannelMedia, _ := findG711BackChannel(desc)
if backChannelMedia == nil {
panic("back channel not found")
}
writeToClient := func(pkt *rtp.Packet) {
rc.WritePacketRTP(backChannelMedia, pkt)
}
// setup all medias
err = rc.SetupAll(desc.BaseURL, desc.Medias)
if err != nil {
return err
}
// notify the server that we are ready
stream := c.server.setStreamReady(desc, writeToClient)
defer c.server.setStreamUnready()
log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n")
// called when a RTP packet arrives
rc.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) {
log.Printf("received RTP packet from the client, routing to readers")
// route incoming packets to the server stream
stream.WritePacketRTP(medi, pkt)
})
// start playing
_, err = rc.Play(nil)
if err != nil {
return err
}
// wait until a fatal error
return rc.Wait()
}

View File

@@ -0,0 +1,24 @@
package main
import "log"
// This example shows how to
// 1. create a server that serves a single stream.
// 2. create a client, that reads an existing stream from another server or camera, containing a back channel.
// 3. route the stream from the client to the server, and from the server to all connected readers.
// 4. route the back channel from connected readers to the server, and from the server to the client.
func main() {
// allocate the server.
s := &server{}
s.initialize()
// allocate the client.
// allow client to use the server.
c := &client{server: s}
c.initialize()
// start server and wait until a fatal error
log.Printf("server is ready on %s", s.server.RTSPAddress)
panic(s.server.StartAndWait())
}

View File

@@ -0,0 +1,134 @@
package main
import (
"log"
"sync"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
)
type server struct {
server *gortsplib.Server
mutex sync.RWMutex
stream *gortsplib.ServerStream
writeToClient func(*rtp.Packet)
}
func (s *server) initialize() {
// configure the server
s.server = &gortsplib.Server{
Handler: s,
RTSPAddress: ":8556",
UDPRTPAddress: ":8002",
UDPRTCPAddress: ":8003",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8002,
MulticastRTCPPort: 8003,
}
}
// called when a connection is opened.
func (s *server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("conn opened")
}
// called when a connection is closed.
func (s *server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("conn closed (%v)", ctx.Error)
}
// called when a session is opened.
func (s *server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
log.Printf("session opened")
}
// called when a session is closed.
func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
}
// called when receiving a DESCRIBE request.
func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("DESCRIBE request")
s.mutex.RLock()
defer s.mutex.RUnlock()
// stream is not available yet
if s.stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, s.stream, nil
}
// called when receiving a SETUP request.
func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("SETUP request")
s.mutex.RLock()
defer s.mutex.RUnlock()
// stream is not available yet
if s.stream == nil {
return &base.Response{
StatusCode: base.StatusNotFound,
}, nil, nil
}
return &base.Response{
StatusCode: base.StatusOK,
}, s.stream, nil
}
// called when receiving a PLAY request.
func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("PLAY request")
ctx.Session.OnPacketRTPAny(func(m *description.Media, f format.Format, pkt *rtp.Packet) {
log.Printf("received RTP packet from readers, routing to the client")
s.writeToClient(pkt)
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
func (s *server) setStreamReady(
desc *description.Session,
writeToClient func(*rtp.Packet),
) *gortsplib.ServerStream {
s.mutex.Lock()
defer s.mutex.Unlock()
s.stream = &gortsplib.ServerStream{
Server: s.server,
Desc: desc,
}
err := s.stream.Initialize()
if err != nil {
panic(err)
}
s.writeToClient = writeToClient
return s.stream
}
func (s *server) setStreamUnready() {
s.mutex.Lock()
defer s.mutex.Unlock()
s.stream.Close()
s.stream = nil
}

View File

@@ -0,0 +1,103 @@
package main
import (
"crypto/rand"
"fmt"
"log"
"time"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/mediacommon/v2/pkg/codecs/g711"
"github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts"
)
func multiplyAndDivide(v, m, d int64) int64 {
secs := v / d
dec := v % d
return (secs*m + dec*m/d)
}
func randUint32() (uint32, error) {
var b [4]byte
_, err := rand.Read(b[:])
if err != nil {
return 0, err
}
return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil
}
func findTrack(r *mpegts.Reader) (*mpegts.Track, error) {
for _, track := range r.Tracks() {
if _, ok := track.Codec.(*mpegts.CodecH264); ok {
return track, nil
}
}
return nil, fmt.Errorf("H264 track not found")
}
type audioStreamer struct {
stream *gortsplib.ServerStream
}
func (r *audioStreamer) initialize() {
go r.run()
}
func (r *audioStreamer) close() {
}
func (r *audioStreamer) run() {
// setup G711 -> RTP encoder
rtpEnc, err := r.stream.Desc.Medias[0].Formats[0].(*format.G711).CreateEncoder()
if err != nil {
panic(err)
}
start := time.Now()
prevPTS := int64(0)
randomStart, err := randUint32()
if err != nil {
panic(err)
}
// setup a ticker to sleep between writings
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
// get current timestamp
pts := multiplyAndDivide(int64(time.Since(start)), int64(r.stream.Desc.Medias[0].Formats[0].ClockRate()), int64(time.Second))
// generate dummy LPCM audio samples
samples := createDummyAudio(pts, prevPTS)
// encode samples with G711
samples, err = g711.Mulaw(samples).Marshal()
if err != nil {
panic(err)
}
// generate RTP packets from G711 samples
pkts, err := rtpEnc.Encode(samples)
if err != nil {
panic(err)
}
log.Printf("writing RTP packets with PTS=%d, sample size=%d, pkt count=%d", prevPTS, len(samples), len(pkts))
// write RTP packets to the server
for _, pkt := range pkts {
pkt.Timestamp += uint32(int64(randomStart) + prevPTS)
err = r.stream.WritePacketRTP(r.stream.Desc.Medias[0], pkt)
if err != nil {
panic(err)
}
}
prevPTS = pts
}
}

View File

@@ -0,0 +1,24 @@
package main
import "math"
const (
sampleRate = 8000
frequency = 400
amplitude = (1 << 14) - 1
)
func createDummyAudio(pts int64, prevPTS int64) []byte {
sampleCount := (pts - prevPTS)
n := 0
ret := make([]byte, sampleCount*2)
for i := int64(0); i < sampleCount; i++ {
v := int16(amplitude * math.Sin((float64(prevPTS+i)*frequency*math.Pi*2)/sampleRate))
ret[n] = byte(v >> 8)
ret[n+1] = byte(v)
n += 2
}
return ret
}

View File

@@ -0,0 +1,162 @@
package main
import (
"log"
"sync"
"github.com/bluenviron/gortsplib/v4"
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/pion/rtp"
)
// This example shows how to
// 1. create a RTSP server which accepts plain connections.
// 2. create a stream with an audio direct channel and an audio back channel.
// 3. write the audio direct channel to readers, read the back channel from readers.
type serverHandler struct {
server *gortsplib.Server
stream *gortsplib.ServerStream
mutex sync.RWMutex
}
// called when a connection is opened.
func (sh *serverHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) {
log.Printf("conn opened")
}
// called when a connection is closed.
func (sh *serverHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) {
log.Printf("conn closed (%v)", ctx.Error)
}
// called when a session is opened.
func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) {
log.Printf("session opened")
}
// called when a session is closed.
func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) {
log.Printf("session closed")
}
// called when receiving a DESCRIBE request.
func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("DESCRIBE request")
sh.mutex.RLock()
defer sh.mutex.RUnlock()
return &base.Response{
StatusCode: base.StatusOK,
}, sh.stream, nil
}
// called when receiving a SETUP request.
func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) {
log.Printf("SETUP request")
sh.mutex.RLock()
defer sh.mutex.RUnlock()
return &base.Response{
StatusCode: base.StatusOK,
}, sh.stream, nil
}
// called when receiving a PLAY request.
func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) {
log.Printf("PLAY request")
// called when receiving a RTP packet
ctx.Session.OnPacketRTPAny(func(m *description.Media, f format.Format, pkt *rtp.Packet) {
// decode timestamp
pts, ok := ctx.Session.PacketPTS2(m, pkt)
if !ok {
return
}
log.Printf("incoming RTP packet with PTS=%v size=%v", pts, len(pkt.Payload))
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
}
func main() {
h := &serverHandler{}
// prevent clients from connecting to the server until the stream is properly set up
h.mutex.Lock()
// create the server
h.server = &gortsplib.Server{
Handler: h,
RTSPAddress: ":8554",
UDPRTPAddress: ":8000",
UDPRTCPAddress: ":8001",
MulticastIPRange: "224.1.0.0/16",
MulticastRTPPort: 8002,
MulticastRTCPPort: 8003,
}
// start the server
err := h.server.Start()
if err != nil {
panic(err)
}
defer h.server.Close()
// create a RTSP description
desc := &description.Session{
Medias: []*description.Media{
// direct channel
{
Type: description.MediaTypeAudio,
Formats: []format.Format{&format.G711{
PayloadTyp: 8,
MULaw: false,
SampleRate: 8000,
ChannelCount: 1,
}},
},
// back channel
{
Type: description.MediaTypeAudio,
IsBackChannel: true,
Formats: []format.Format{&format.G711{
PayloadTyp: 8,
MULaw: false,
SampleRate: 8000,
ChannelCount: 1,
}},
},
},
}
// create a server stream
h.stream = &gortsplib.ServerStream{
Server: h.server,
Desc: desc,
}
err = h.stream.Initialize()
if err != nil {
panic(err)
}
defer h.stream.Close()
// create audio streamer
r := &audioStreamer{stream: h.stream}
r.initialize()
defer r.close()
// allow clients to connect
h.mutex.Unlock()
// wait until a fatal error
log.Printf("server is ready on %s", h.server.RTSPAddress)
panic(h.server.Wait())
}

View File

@@ -48,6 +48,9 @@ type Session struct {
// Title of the stream (optional).
Title string
// Whether to use multicast.
Multicast bool
// FEC groups (RFC5109).
FECGroups []SessionFECGroup
@@ -115,8 +118,9 @@ func (d *Session) Unmarshal(ssd *sdp.SessionDescription) error {
return nil
}
// Marshal encodes the description in SDP.
func (d Session) Marshal(multicast bool) ([]byte, error) {
// Marshal encodes the description in SDP format.
// The argument is deprecated and has no effect. Set Session.Multicast to enable multicast.
func (d Session) Marshal(_ bool) ([]byte, error) {
var sessionName psdp.SessionName
if d.Title != "" {
sessionName = psdp.SessionName(d.Title)
@@ -127,7 +131,7 @@ func (d Session) Marshal(multicast bool) ([]byte, error) {
}
var address string
if multicast {
if d.Multicast {
address = "224.1.0.0"
} else {
address = "0.0.0.0"
@@ -150,11 +154,6 @@ func (d Session) Marshal(multicast bool) ([]byte, error) {
TimeDescriptions: []psdp.TimeDescription{
{Timing: psdp.Timing{StartTime: 0, StopTime: 0}},
},
MediaDescriptions: make([]*psdp.MediaDescription, len(d.Medias)),
}
for i, media := range d.Medias {
sout.MediaDescriptions[i] = media.Marshal()
}
for _, group := range d.FECGroups {
@@ -164,5 +163,11 @@ func (d Session) Marshal(multicast bool) ([]byte, error) {
})
}
sout.MediaDescriptions = make([]*psdp.MediaDescription, len(d.Medias))
for i, media := range d.Medias {
sout.MediaDescriptions[i] = media.Marshal()
}
return sout.Marshal()
}

View File

@@ -26,22 +26,49 @@ func getSessionID(header base.Header) string {
return ""
}
func serverSideDescription(d *description.Session) *description.Session {
func checkMulticastEnabled(multicastIPRange string, query string) bool {
// VLC uses multicast if the SDP contains a multicast address.
// therefore, we introduce a special query (vlcmulticast) that allows
// to return a SDP that contains a multicast address.
if multicastIPRange != "" {
if q, err2 := gourl.ParseQuery(query); err2 == nil {
if _, ok := q["vlcmulticast"]; ok {
return true
}
}
}
return false
}
func checkBackChannelsEnabled(header base.Header) bool {
if vals, ok := header["Require"]; ok {
for _, val := range vals {
if val == "www.onvif.org/ver20/backchannel" {
return true
}
}
}
return false
}
func prepareForDescribe(d *description.Session, multicast bool, backChannels bool) *description.Session {
out := &description.Session{
Title: d.Title,
Multicast: multicast,
FECGroups: d.FECGroups,
Medias: make([]*description.Media, len(d.Medias)),
}
for i, medi := range d.Medias {
out.Medias[i] = &description.Media{
Type: medi.Type,
ID: medi.ID,
IsBackChannel: medi.IsBackChannel,
// we have to use trackID=number in order to support clients
// like the Grandstream GXV3500.
Control: "trackID=" + strconv.FormatInt(int64(i), 10),
Formats: medi.Formats,
if !medi.IsBackChannel || backChannels {
out.Medias = append(out.Medias, &description.Media{
Type: medi.Type,
ID: medi.ID,
IsBackChannel: medi.IsBackChannel,
// we have to use trackID=number in order to support clients
// like the Grandstream GXV3500.
Control: "trackID=" + strconv.FormatInt(int64(i), 10),
Formats: medi.Formats,
})
}
}
@@ -343,19 +370,13 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err
return res, err
}
// VLC uses multicast if the SDP contains a multicast address.
// therefore, we introduce a special query (vlcmulticast) that allows
// to return a SDP that contains a multicast address.
multicast := false
if sc.s.MulticastIPRange != "" {
if q, err2 := gourl.ParseQuery(query); err2 == nil {
if _, ok := q["vlcmulticast"]; ok {
multicast = true
}
}
}
desc := prepareForDescribe(
stream.Desc,
checkMulticastEnabled(sc.s.MulticastIPRange, query),
checkBackChannelsEnabled(req.Header),
)
byts, _ := serverSideDescription(stream.Desc).Marshal(multicast)
byts, _ := desc.Marshal(false)
res.Body = byts
}

View File

@@ -68,30 +68,6 @@ func mediaURL(t *testing.T, baseURL *base.URL, media *description.Media) *base.U
return u
}
func doDescribe(t *testing.T, conn *conn.Conn) *description.Session {
res, err := writeReqReadRes(conn, base.Request{
Method: base.Describe,
URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"),
Header: base.Header{
"CSeq": base.HeaderValue{"1"},
},
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body)
require.NoError(t, err)
var desc2 description.Session
err = desc2.Unmarshal(&desc)
require.NoError(t, err)
desc2.BaseURL = mustParseURL(res.Header["Content-Base"][0])
return &desc2
}
func doSetup(t *testing.T, conn *conn.Conn, u string,
inTH *headers.Transport, session string,
) (*base.Response, *headers.Transport) {
@@ -311,7 +287,7 @@ func TestServerPlayPath(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
th := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -432,7 +408,7 @@ func TestServerPlaySetupErrors(t *testing.T) {
require.Equal(t, base.StatusBadRequest, res.StatusCode)
default:
desc = doDescribe(t, conn)
desc = doDescribe(t, conn, false)
th = &headers.Transport{
Protocol: headers.TransportProtocolUDP,
@@ -579,7 +555,7 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) {
ClientPorts: &[2]int{35466, 35467},
}
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
res, err := writeReqReadRes(conn, base.Request{
Method: base.Setup,
@@ -760,7 +736,7 @@ func TestServerPlay(t *testing.T) {
<-nconnOpened
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -1061,7 +1037,7 @@ func TestServerPlaySocketError(t *testing.T) {
}()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -1225,7 +1201,7 @@ func TestServerPlayDecodeErrors(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -1348,7 +1324,7 @@ func TestServerPlayRTCPReport(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -1558,7 +1534,7 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -1650,7 +1626,7 @@ func TestServerPlayPause(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -1748,7 +1724,7 @@ func TestServerPlayPlayPausePausePlay(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -1836,7 +1812,7 @@ func TestServerPlayTimeout(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -1927,7 +1903,7 @@ func TestServerPlayWithoutTeardown(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
@@ -2007,7 +1983,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
@@ -2093,7 +2069,7 @@ func TestServerPlayPartialMedias(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
@@ -2121,7 +2097,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
@@ -2346,7 +2322,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Delivery: deliveryPtr(headers.TransportDeliveryUnicast),
@@ -2423,7 +2399,7 @@ func TestServerPlayStreamStats(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
@@ -2453,3 +2429,148 @@ func TestServerPlayStreamStats(t *testing.T) {
st := stream.Stats()
require.Equal(t, uint64(16*2), st.BytesSent)
}
func TestServerPlayBackChannel(t *testing.T) {
for _, transport := range []string{
"udp",
"tcp",
} {
t.Run(transport, func(t *testing.T) {
serverOk := make(chan struct{})
var stream *ServerStream
s := &Server{
Handler: &testServerHandler{
onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) {
return &base.Response{
StatusCode: base.StatusOK,
}, stream, nil
},
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
ctx.Session.OnPacketRTPAny(func(_ *description.Media, _ format.Format, _ *rtp.Packet) {
close(serverOk)
})
return &base.Response{
StatusCode: base.StatusOK,
}, nil
},
},
RTSPAddress: "127.0.0.1:8554",
}
if transport == "udp" {
s.UDPRTPAddress = "127.0.0.1:8000"
s.UDPRTCPAddress = "127.0.0.1:8001"
}
err := s.Start()
require.NoError(t, err)
defer s.Close()
stream = &ServerStream{
Server: s,
Desc: &description.Session{Medias: []*description.Media{
testH264Media,
{
Type: description.MediaTypeAudio,
IsBackChannel: true,
Formats: []format.Format{&format.G711{
PayloadTyp: 8,
MULaw: false,
SampleRate: 8000,
ChannelCount: 1,
}},
},
}},
}
err = stream.Initialize()
require.NoError(t, err)
defer stream.Close()
nconn, err := net.Dial("tcp", "127.0.0.1:8554")
require.NoError(t, err)
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn, true)
var session string
var serverPorts [2]*[2]int
var l1s [2]net.PacketConn
var l2s [2]net.PacketConn
for i := 0; i < 2; i++ {
inTH := &headers.Transport{
Mode: transportModePtr(headers.TransportModePlay),
}
if transport == "udp" {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolUDP
inTH.ClientPorts = &[2]int{35466 + i*2, 35467 + i*2}
} else {
v := headers.TransportDeliveryUnicast
inTH.Delivery = &v
inTH.Protocol = headers.TransportProtocolTCP
inTH.InterleavedIDs = &[2]int{0 + i*2, 1 + i*2}
}
res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[i]).String(), inTH, "")
if transport == "udp" {
serverPorts[i] = th.ServerPorts
l1s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35466+i*2), 10)))
require.NoError(t, err)
defer l1s[i].Close()
l2s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35467+i*2), 10)))
require.NoError(t, err)
defer l2s[i].Close()
}
session = readSession(t, res)
}
doPlay(t, conn, "rtsp://127.0.0.1:8554/teststream", session)
// client -> server RTP packet
pkt := &rtp.Packet{
Header: rtp.Header{
Version: 2,
PayloadType: 8,
},
Payload: []byte{1, 2, 3, 4},
}
buf, err := pkt.Marshal()
require.NoError(t, err)
if transport == "udp" {
_, err = l1s[1].WriteTo(buf, &net.UDPAddr{
IP: net.ParseIP("127.0.0.1"),
Port: serverPorts[1][0],
})
require.NoError(t, err)
} else {
err = conn.WriteInterleavedFrame(&base.InterleavedFrame{
Channel: 2,
Payload: buf,
}, make([]byte, 1024))
require.NoError(t, err)
}
<-serverOk
doTeardown(t, conn, "rtsp://127.0.0.1:8554/teststream", session)
})
}
}

View File

@@ -37,6 +37,15 @@ func stringsReverseIndex(s, substr string) int {
return -1
}
func hasBackChannel(desc description.Session) bool {
for _, medi := range desc.Medias {
if medi.IsBackChannel {
return true
}
}
return false
}
// used for all methods except SETUP
func getPathAndQuery(u *base.URL, isAnnounce bool) (string, string) {
if !isAnnounce {
@@ -245,13 +254,13 @@ type ServerSession struct {
setuppedMediasOrdered []*serverSessionMedia
tcpCallbackByChannel map[int]readFunc
setuppedTransport *Transport
setuppedStream *ServerStream // read
setuppedStream *ServerStream // play
setuppedPath string
setuppedQuery string
lastRequestTime time.Time
tcpConn *ServerConn
announcedDesc *description.Session // publish
udpLastPacketTime *int64 // publish
announcedDesc *description.Session // record
udpLastPacketTime *int64 // record
udpCheckStreamTimer *time.Timer
writer *asyncProcessor
writerMutex sync.RWMutex
@@ -863,6 +872,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) (
}, liberrors.ErrServerSDPInvalid{Err: err}
}
if hasBackChannel(desc) {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, liberrors.ErrServerSDPInvalid{Err: fmt.Errorf("back channels cannot be recorded")}
}
res, err := ss.s.Handler.(ServerHandlerOnAnnounce).OnAnnounce(&ServerHandlerOnAnnounceCtx{
Session: ss,
Conn: sc,

View File

@@ -20,7 +20,7 @@ type serverSessionFormat struct {
format format.Format
onPacketRTP OnPacketRTPFunc
udpReorderer *rtpreorderer.Reorderer
udpReorderer *rtpreorderer.Reorderer // publish or back channel
tcpLossDetector *rtplossdetector.LossDetector
rtcpReceiver *rtcpreceiver.RTCPReceiver
writePacketRTPInQueue func([]byte) error
@@ -44,7 +44,7 @@ func (sf *serverSessionFormat) start() {
sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP
}
if sf.sm.ss.state != ServerSessionStatePlay {
if sf.sm.ss.state == ServerSessionStateRecord || sf.sm.media.IsBackChannel {
if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast {
sf.udpReorderer = &rtpreorderer.Reorderer{}
sf.udpReorderer.Initialize()

View File

@@ -69,7 +69,9 @@ func (sm *serverSessionMedia) start() {
if sm.ss.state == ServerSessionStatePlay {
// firewall opening is performed with RTCP sender reports generated by ServerStream
// readers can send RTCP packets only
if sm.media.IsBackChannel {
sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readPacketRTPUDPPlay)
}
sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPPlay)
} else {
// open the firewall by sending empty packets to the counterpart.
@@ -147,6 +149,34 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error {
return nil
}
func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
if len(payload) == (udpMaxPayloadSize + 1) {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{})
return false
}
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
now := sm.ss.s.timeNow()
forma.readPacketRTPUDP(pkt, now)
return true
}
func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool {
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
@@ -235,8 +265,29 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool {
return true
}
func (sm *serverSessionMedia) readPacketRTPTCPPlay(_ []byte) bool {
return false
func (sm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool {
if !sm.media.IsBackChannel {
return false
}
atomic.AddUint64(sm.bytesReceived, uint64(len(payload)))
pkt := &rtp.Packet{}
err := pkt.Unmarshal(payload)
if err != nil {
sm.onPacketRTPDecodeError(err)
return false
}
forma, ok := sm.formats[pkt.PayloadType]
if !ok {
sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType})
return false
}
forma.readPacketRTPTCP(pkt)
return true
}
func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool {

View File

@@ -15,6 +15,7 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
)
var serverCert = []byte(`-----BEGIN CERTIFICATE-----
@@ -83,6 +84,36 @@ func writeReqReadRes(
return conn.ReadResponse()
}
func doDescribe(t *testing.T, conn *conn.Conn, backChannels bool) *description.Session {
header := base.Header{
"CSeq": base.HeaderValue{"1"},
}
if backChannels {
header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"}
}
res, err := writeReqReadRes(conn, base.Request{
Method: base.Describe,
URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"),
Header: header,
})
require.NoError(t, err)
require.Equal(t, base.StatusOK, res.StatusCode)
var desc sdp.SessionDescription
err = desc.Unmarshal(res.Body)
require.NoError(t, err)
var desc2 description.Session
err = desc2.Unmarshal(&desc)
require.NoError(t, err)
desc2.BaseURL = mustParseURL(res.Header["Content-Base"][0])
return &desc2
}
type testServerHandler struct {
onConnOpen func(*ServerHandlerOnConnOpenCtx)
onConnClose func(*ServerHandlerOnConnCloseCtx)
@@ -438,7 +469,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
var session string
@@ -534,7 +565,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
defer nconn1.Close()
conn1 := conn.NewConn(nconn1)
desc1 := doDescribe(t, conn1)
desc1 := doDescribe(t, conn1, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -554,7 +585,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) {
defer nconn2.Close()
conn2 := conn.NewConn(nconn2)
desc2 := doDescribe(t, conn2)
desc2 := doDescribe(t, conn2, false)
res, err = writeReqReadRes(conn2, base.Request{
Method: base.Setup,
@@ -620,7 +651,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -688,7 +719,7 @@ func TestServerSetupMultipleTransports(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTHS := headers.Transports{
{
@@ -789,7 +820,7 @@ func TestServerGetSetParameter(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
var session string
@@ -1078,7 +1109,7 @@ func TestServerSessionClose(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -1157,7 +1188,7 @@ func TestServerSessionAutoClose(t *testing.T) {
require.NoError(t, err)
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,
@@ -1225,7 +1256,7 @@ func TestServerSessionTeardown(t *testing.T) {
defer nconn.Close()
conn := conn.NewConn(nconn)
desc := doDescribe(t, conn)
desc := doDescribe(t, conn, false)
inTH := &headers.Transport{
Protocol: headers.TransportProtocolTCP,