From b407cb1dd0cd62962ccefb4d7115cf4e0784acac Mon Sep 17 00:00:00 2001 From: Alessandro Ros Date: Sun, 4 May 2025 17:04:15 +0200 Subject: [PATCH] server: support reading back channels (#597) (#777) --- README.md | 3 + examples/proxy-backchannel/client.go | 112 ++++++++++ examples/proxy-backchannel/main.go | 24 +++ examples/proxy-backchannel/server.go | 134 ++++++++++++ .../server-play-backchannel/audio_streamer.go | 103 +++++++++ .../server-play-backchannel/dummy_audio.go | 24 +++ examples/server-play-backchannel/main.go | 162 ++++++++++++++ pkg/description/session.go | 21 +- server_conn.go | 65 ++++-- server_play_test.go | 203 ++++++++++++++---- server_session.go | 21 +- server_session_format.go | 4 +- server_session_media.go | 57 ++++- server_test.go | 49 ++++- 14 files changed, 894 insertions(+), 88 deletions(-) create mode 100644 examples/proxy-backchannel/client.go create mode 100644 examples/proxy-backchannel/main.go create mode 100644 examples/proxy-backchannel/server.go create mode 100644 examples/server-play-backchannel/audio_streamer.go create mode 100644 examples/server-play-backchannel/dummy_audio.go create mode 100644 examples/server-play-backchannel/main.go diff --git a/README.md b/README.md index 348e714b..41bf5bb6 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ Features: * Write streams with the UDP, UDP-multicast or TCP transport protocol * Write TLS-encrypted streams (TCP only) * Compute and provide SSRC, RTP-Info to clients + * Read ONVIF back channels * Utilities * Parse RTSP elements * Encode/decode RTP packets into/from codec-specific frames @@ -97,7 +98,9 @@ Features: * [server-auth](examples/server-auth/main.go) * [server-record-format-h264-to-disk](examples/server-record-format-h264-to-disk/main.go) * [server-play-format-h264-from-disk](examples/server-play-format-h264-from-disk/main.go) +* [server-play-backchannel](examples/server-play-backchannel/main.go) * [proxy](examples/proxy/main.go) +* [proxy-backchannel](examples/proxy-backchannel/main.go) ## API Documentation diff --git a/examples/proxy-backchannel/client.go b/examples/proxy-backchannel/client.go new file mode 100644 index 00000000..8d626592 --- /dev/null +++ b/examples/proxy-backchannel/client.go @@ -0,0 +1,112 @@ +package main + +import ( + "log" + "time" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/pion/rtp" +) + +const ( + existingStream = "rtsp://127.0.0.1:8554/mystream" + reconnectPause = 2 * time.Second +) + +func findG711BackChannel(desc *description.Session) (*description.Media, *format.G711) { + for _, media := range desc.Medias { + if media.IsBackChannel { + for _, forma := range media.Formats { + if g711, ok := forma.(*format.G711); ok { + return media, g711 + } + } + } + } + return nil, nil +} + +type client struct { + server *server +} + +func (c *client) initialize() { + // start a separated routine + go c.run() +} + +func (c *client) run() { + for { + err := c.read() + log.Printf("ERR: %s\n", err) + + time.Sleep(reconnectPause) + } +} + +func (c *client) read() error { + rc := gortsplib.Client{ + RequestBackChannels: true, + } + + // parse URL + u, err := base.ParseURL(existingStream) + if err != nil { + return err + } + + // connect to the server + err = rc.Start(u.Scheme, u.Host) + if err != nil { + return err + } + defer rc.Close() + + // find available medias + desc, _, err := rc.Describe(u) + if err != nil { + return err + } + + // find the back channel + backChannelMedia, _ := findG711BackChannel(desc) + if backChannelMedia == nil { + panic("back channel not found") + } + + writeToClient := func(pkt *rtp.Packet) { + rc.WritePacketRTP(backChannelMedia, pkt) + } + + // setup all medias + err = rc.SetupAll(desc.BaseURL, desc.Medias) + if err != nil { + return err + } + + // notify the server that we are ready + stream := c.server.setStreamReady(desc, writeToClient) + defer c.server.setStreamUnready() + + log.Printf("stream is ready and can be read from the server at rtsp://localhost:8554/stream\n") + + // called when a RTP packet arrives + rc.OnPacketRTPAny(func(medi *description.Media, forma format.Format, pkt *rtp.Packet) { + log.Printf("received RTP packet from the client, routing to readers") + + // route incoming packets to the server stream + stream.WritePacketRTP(medi, pkt) + }) + + // start playing + _, err = rc.Play(nil) + if err != nil { + return err + } + + // wait until a fatal error + return rc.Wait() +} diff --git a/examples/proxy-backchannel/main.go b/examples/proxy-backchannel/main.go new file mode 100644 index 00000000..ca2cf5e7 --- /dev/null +++ b/examples/proxy-backchannel/main.go @@ -0,0 +1,24 @@ +package main + +import "log" + +// This example shows how to +// 1. create a server that serves a single stream. +// 2. create a client, that reads an existing stream from another server or camera, containing a back channel. +// 3. route the stream from the client to the server, and from the server to all connected readers. +// 4. route the back channel from connected readers to the server, and from the server to the client. + +func main() { + // allocate the server. + s := &server{} + s.initialize() + + // allocate the client. + // allow client to use the server. + c := &client{server: s} + c.initialize() + + // start server and wait until a fatal error + log.Printf("server is ready on %s", s.server.RTSPAddress) + panic(s.server.StartAndWait()) +} diff --git a/examples/proxy-backchannel/server.go b/examples/proxy-backchannel/server.go new file mode 100644 index 00000000..bc0dbec3 --- /dev/null +++ b/examples/proxy-backchannel/server.go @@ -0,0 +1,134 @@ +package main + +import ( + "log" + "sync" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/pion/rtp" +) + +type server struct { + server *gortsplib.Server + mutex sync.RWMutex + stream *gortsplib.ServerStream + writeToClient func(*rtp.Packet) +} + +func (s *server) initialize() { + // configure the server + s.server = &gortsplib.Server{ + Handler: s, + RTSPAddress: ":8556", + UDPRTPAddress: ":8002", + UDPRTCPAddress: ":8003", + MulticastIPRange: "224.1.0.0/16", + MulticastRTPPort: 8002, + MulticastRTCPPort: 8003, + } +} + +// called when a connection is opened. +func (s *server) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + log.Printf("conn opened") +} + +// called when a connection is closed. +func (s *server) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + log.Printf("conn closed (%v)", ctx.Error) +} + +// called when a session is opened. +func (s *server) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + log.Printf("session opened") +} + +// called when a session is closed. +func (s *server) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + log.Printf("session closed") +} + +// called when receiving a DESCRIBE request. +func (s *server) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("DESCRIBE request") + + s.mutex.RLock() + defer s.mutex.RUnlock() + + // stream is not available yet + if s.stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, s.stream, nil +} + +// called when receiving a SETUP request. +func (s *server) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("SETUP request") + + s.mutex.RLock() + defer s.mutex.RUnlock() + + // stream is not available yet + if s.stream == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + }, s.stream, nil +} + +// called when receiving a PLAY request. +func (s *server) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + log.Printf("PLAY request") + + ctx.Session.OnPacketRTPAny(func(m *description.Media, f format.Format, pkt *rtp.Packet) { + log.Printf("received RTP packet from readers, routing to the client") + + s.writeToClient(pkt) + }) + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +func (s *server) setStreamReady( + desc *description.Session, + writeToClient func(*rtp.Packet), +) *gortsplib.ServerStream { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.stream = &gortsplib.ServerStream{ + Server: s.server, + Desc: desc, + } + err := s.stream.Initialize() + if err != nil { + panic(err) + } + + s.writeToClient = writeToClient + + return s.stream +} + +func (s *server) setStreamUnready() { + s.mutex.Lock() + defer s.mutex.Unlock() + + s.stream.Close() + s.stream = nil +} diff --git a/examples/server-play-backchannel/audio_streamer.go b/examples/server-play-backchannel/audio_streamer.go new file mode 100644 index 00000000..cbfb861e --- /dev/null +++ b/examples/server-play-backchannel/audio_streamer.go @@ -0,0 +1,103 @@ +package main + +import ( + "crypto/rand" + "fmt" + "log" + "time" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/bluenviron/mediacommon/v2/pkg/codecs/g711" + "github.com/bluenviron/mediacommon/v2/pkg/formats/mpegts" +) + +func multiplyAndDivide(v, m, d int64) int64 { + secs := v / d + dec := v % d + return (secs*m + dec*m/d) +} + +func randUint32() (uint32, error) { + var b [4]byte + _, err := rand.Read(b[:]) + if err != nil { + return 0, err + } + return uint32(b[0])<<24 | uint32(b[1])<<16 | uint32(b[2])<<8 | uint32(b[3]), nil +} + +func findTrack(r *mpegts.Reader) (*mpegts.Track, error) { + for _, track := range r.Tracks() { + if _, ok := track.Codec.(*mpegts.CodecH264); ok { + return track, nil + } + } + return nil, fmt.Errorf("H264 track not found") +} + +type audioStreamer struct { + stream *gortsplib.ServerStream +} + +func (r *audioStreamer) initialize() { + go r.run() +} + +func (r *audioStreamer) close() { +} + +func (r *audioStreamer) run() { + // setup G711 -> RTP encoder + rtpEnc, err := r.stream.Desc.Medias[0].Formats[0].(*format.G711).CreateEncoder() + if err != nil { + panic(err) + } + + start := time.Now() + prevPTS := int64(0) + + randomStart, err := randUint32() + if err != nil { + panic(err) + } + + // setup a ticker to sleep between writings + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + // get current timestamp + pts := multiplyAndDivide(int64(time.Since(start)), int64(r.stream.Desc.Medias[0].Formats[0].ClockRate()), int64(time.Second)) + + // generate dummy LPCM audio samples + samples := createDummyAudio(pts, prevPTS) + + // encode samples with G711 + samples, err = g711.Mulaw(samples).Marshal() + if err != nil { + panic(err) + } + + // generate RTP packets from G711 samples + pkts, err := rtpEnc.Encode(samples) + if err != nil { + panic(err) + } + + log.Printf("writing RTP packets with PTS=%d, sample size=%d, pkt count=%d", prevPTS, len(samples), len(pkts)) + + // write RTP packets to the server + for _, pkt := range pkts { + pkt.Timestamp += uint32(int64(randomStart) + prevPTS) + + err = r.stream.WritePacketRTP(r.stream.Desc.Medias[0], pkt) + if err != nil { + panic(err) + } + } + + prevPTS = pts + + } +} diff --git a/examples/server-play-backchannel/dummy_audio.go b/examples/server-play-backchannel/dummy_audio.go new file mode 100644 index 00000000..c56bad21 --- /dev/null +++ b/examples/server-play-backchannel/dummy_audio.go @@ -0,0 +1,24 @@ +package main + +import "math" + +const ( + sampleRate = 8000 + frequency = 400 + amplitude = (1 << 14) - 1 +) + +func createDummyAudio(pts int64, prevPTS int64) []byte { + sampleCount := (pts - prevPTS) + n := 0 + ret := make([]byte, sampleCount*2) + + for i := int64(0); i < sampleCount; i++ { + v := int16(amplitude * math.Sin((float64(prevPTS+i)*frequency*math.Pi*2)/sampleRate)) + ret[n] = byte(v >> 8) + ret[n+1] = byte(v) + n += 2 + } + + return ret +} diff --git a/examples/server-play-backchannel/main.go b/examples/server-play-backchannel/main.go new file mode 100644 index 00000000..4369c67d --- /dev/null +++ b/examples/server-play-backchannel/main.go @@ -0,0 +1,162 @@ +package main + +import ( + "log" + "sync" + + "github.com/bluenviron/gortsplib/v4" + "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/description" + "github.com/bluenviron/gortsplib/v4/pkg/format" + "github.com/pion/rtp" +) + +// This example shows how to +// 1. create a RTSP server which accepts plain connections. +// 2. create a stream with an audio direct channel and an audio back channel. +// 3. write the audio direct channel to readers, read the back channel from readers. + +type serverHandler struct { + server *gortsplib.Server + stream *gortsplib.ServerStream + mutex sync.RWMutex +} + +// called when a connection is opened. +func (sh *serverHandler) OnConnOpen(ctx *gortsplib.ServerHandlerOnConnOpenCtx) { + log.Printf("conn opened") +} + +// called when a connection is closed. +func (sh *serverHandler) OnConnClose(ctx *gortsplib.ServerHandlerOnConnCloseCtx) { + log.Printf("conn closed (%v)", ctx.Error) +} + +// called when a session is opened. +func (sh *serverHandler) OnSessionOpen(ctx *gortsplib.ServerHandlerOnSessionOpenCtx) { + log.Printf("session opened") +} + +// called when a session is closed. +func (sh *serverHandler) OnSessionClose(ctx *gortsplib.ServerHandlerOnSessionCloseCtx) { + log.Printf("session closed") +} + +// called when receiving a DESCRIBE request. +func (sh *serverHandler) OnDescribe(ctx *gortsplib.ServerHandlerOnDescribeCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("DESCRIBE request") + + sh.mutex.RLock() + defer sh.mutex.RUnlock() + + return &base.Response{ + StatusCode: base.StatusOK, + }, sh.stream, nil +} + +// called when receiving a SETUP request. +func (sh *serverHandler) OnSetup(ctx *gortsplib.ServerHandlerOnSetupCtx) (*base.Response, *gortsplib.ServerStream, error) { + log.Printf("SETUP request") + + sh.mutex.RLock() + defer sh.mutex.RUnlock() + + return &base.Response{ + StatusCode: base.StatusOK, + }, sh.stream, nil +} + +// called when receiving a PLAY request. +func (sh *serverHandler) OnPlay(ctx *gortsplib.ServerHandlerOnPlayCtx) (*base.Response, error) { + log.Printf("PLAY request") + + // called when receiving a RTP packet + ctx.Session.OnPacketRTPAny(func(m *description.Media, f format.Format, pkt *rtp.Packet) { + // decode timestamp + pts, ok := ctx.Session.PacketPTS2(m, pkt) + if !ok { + return + } + + log.Printf("incoming RTP packet with PTS=%v size=%v", pts, len(pkt.Payload)) + }) + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil +} + +func main() { + h := &serverHandler{} + + // prevent clients from connecting to the server until the stream is properly set up + h.mutex.Lock() + + // create the server + h.server = &gortsplib.Server{ + Handler: h, + RTSPAddress: ":8554", + UDPRTPAddress: ":8000", + UDPRTCPAddress: ":8001", + MulticastIPRange: "224.1.0.0/16", + MulticastRTPPort: 8002, + MulticastRTCPPort: 8003, + } + + // start the server + err := h.server.Start() + if err != nil { + panic(err) + } + defer h.server.Close() + + // create a RTSP description + desc := &description.Session{ + Medias: []*description.Media{ + // direct channel + { + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: 1, + }}, + }, + // back channel + { + Type: description.MediaTypeAudio, + IsBackChannel: true, + Formats: []format.Format{&format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: 1, + }}, + }, + }, + } + + // create a server stream + h.stream = &gortsplib.ServerStream{ + Server: h.server, + Desc: desc, + } + err = h.stream.Initialize() + if err != nil { + panic(err) + } + defer h.stream.Close() + + // create audio streamer + r := &audioStreamer{stream: h.stream} + r.initialize() + defer r.close() + + // allow clients to connect + h.mutex.Unlock() + + // wait until a fatal error + log.Printf("server is ready on %s", h.server.RTSPAddress) + panic(h.server.Wait()) +} diff --git a/pkg/description/session.go b/pkg/description/session.go index c65691cb..6fed6f22 100644 --- a/pkg/description/session.go +++ b/pkg/description/session.go @@ -48,6 +48,9 @@ type Session struct { // Title of the stream (optional). Title string + // Whether to use multicast. + Multicast bool + // FEC groups (RFC5109). FECGroups []SessionFECGroup @@ -115,8 +118,9 @@ func (d *Session) Unmarshal(ssd *sdp.SessionDescription) error { return nil } -// Marshal encodes the description in SDP. -func (d Session) Marshal(multicast bool) ([]byte, error) { +// Marshal encodes the description in SDP format. +// The argument is deprecated and has no effect. Set Session.Multicast to enable multicast. +func (d Session) Marshal(_ bool) ([]byte, error) { var sessionName psdp.SessionName if d.Title != "" { sessionName = psdp.SessionName(d.Title) @@ -127,7 +131,7 @@ func (d Session) Marshal(multicast bool) ([]byte, error) { } var address string - if multicast { + if d.Multicast { address = "224.1.0.0" } else { address = "0.0.0.0" @@ -150,11 +154,6 @@ func (d Session) Marshal(multicast bool) ([]byte, error) { TimeDescriptions: []psdp.TimeDescription{ {Timing: psdp.Timing{StartTime: 0, StopTime: 0}}, }, - MediaDescriptions: make([]*psdp.MediaDescription, len(d.Medias)), - } - - for i, media := range d.Medias { - sout.MediaDescriptions[i] = media.Marshal() } for _, group := range d.FECGroups { @@ -164,5 +163,11 @@ func (d Session) Marshal(multicast bool) ([]byte, error) { }) } + sout.MediaDescriptions = make([]*psdp.MediaDescription, len(d.Medias)) + + for i, media := range d.Medias { + sout.MediaDescriptions[i] = media.Marshal() + } + return sout.Marshal() } diff --git a/server_conn.go b/server_conn.go index 50f24d85..90ea1adb 100644 --- a/server_conn.go +++ b/server_conn.go @@ -26,22 +26,49 @@ func getSessionID(header base.Header) string { return "" } -func serverSideDescription(d *description.Session) *description.Session { +func checkMulticastEnabled(multicastIPRange string, query string) bool { + // VLC uses multicast if the SDP contains a multicast address. + // therefore, we introduce a special query (vlcmulticast) that allows + // to return a SDP that contains a multicast address. + if multicastIPRange != "" { + if q, err2 := gourl.ParseQuery(query); err2 == nil { + if _, ok := q["vlcmulticast"]; ok { + return true + } + } + } + return false +} + +func checkBackChannelsEnabled(header base.Header) bool { + if vals, ok := header["Require"]; ok { + for _, val := range vals { + if val == "www.onvif.org/ver20/backchannel" { + return true + } + } + } + return false +} + +func prepareForDescribe(d *description.Session, multicast bool, backChannels bool) *description.Session { out := &description.Session{ Title: d.Title, + Multicast: multicast, FECGroups: d.FECGroups, - Medias: make([]*description.Media, len(d.Medias)), } for i, medi := range d.Medias { - out.Medias[i] = &description.Media{ - Type: medi.Type, - ID: medi.ID, - IsBackChannel: medi.IsBackChannel, - // we have to use trackID=number in order to support clients - // like the Grandstream GXV3500. - Control: "trackID=" + strconv.FormatInt(int64(i), 10), - Formats: medi.Formats, + if !medi.IsBackChannel || backChannels { + out.Medias = append(out.Medias, &description.Media{ + Type: medi.Type, + ID: medi.ID, + IsBackChannel: medi.IsBackChannel, + // we have to use trackID=number in order to support clients + // like the Grandstream GXV3500. + Control: "trackID=" + strconv.FormatInt(int64(i), 10), + Formats: medi.Formats, + }) } } @@ -343,19 +370,13 @@ func (sc *ServerConn) handleRequestInner(req *base.Request) (*base.Response, err return res, err } - // VLC uses multicast if the SDP contains a multicast address. - // therefore, we introduce a special query (vlcmulticast) that allows - // to return a SDP that contains a multicast address. - multicast := false - if sc.s.MulticastIPRange != "" { - if q, err2 := gourl.ParseQuery(query); err2 == nil { - if _, ok := q["vlcmulticast"]; ok { - multicast = true - } - } - } + desc := prepareForDescribe( + stream.Desc, + checkMulticastEnabled(sc.s.MulticastIPRange, query), + checkBackChannelsEnabled(req.Header), + ) - byts, _ := serverSideDescription(stream.Desc).Marshal(multicast) + byts, _ := desc.Marshal(false) res.Body = byts } diff --git a/server_play_test.go b/server_play_test.go index 7a3cf7bb..a744ce97 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -68,30 +68,6 @@ func mediaURL(t *testing.T, baseURL *base.URL, media *description.Media) *base.U return u } -func doDescribe(t *testing.T, conn *conn.Conn) *description.Session { - res, err := writeReqReadRes(conn, base.Request{ - Method: base.Describe, - URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - }, - }) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - var desc sdp.SessionDescription - err = desc.Unmarshal(res.Body) - require.NoError(t, err) - - var desc2 description.Session - err = desc2.Unmarshal(&desc) - require.NoError(t, err) - - desc2.BaseURL = mustParseURL(res.Header["Content-Base"][0]) - - return &desc2 -} - func doSetup(t *testing.T, conn *conn.Conn, u string, inTH *headers.Transport, session string, ) (*base.Response, *headers.Transport) { @@ -311,7 +287,7 @@ func TestServerPlayPath(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) th := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -432,7 +408,7 @@ func TestServerPlaySetupErrors(t *testing.T) { require.Equal(t, base.StatusBadRequest, res.StatusCode) default: - desc = doDescribe(t, conn) + desc = doDescribe(t, conn, false) th = &headers.Transport{ Protocol: headers.TransportProtocolUDP, @@ -579,7 +555,7 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { ClientPorts: &[2]int{35466, 35467}, } - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) res, err := writeReqReadRes(conn, base.Request{ Method: base.Setup, @@ -760,7 +736,7 @@ func TestServerPlay(t *testing.T) { <-nconnOpened - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -1061,7 +1037,7 @@ func TestServerPlaySocketError(t *testing.T) { }() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -1225,7 +1201,7 @@ func TestServerPlayDecodeErrors(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -1348,7 +1324,7 @@ func TestServerPlayRTCPReport(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -1558,7 +1534,7 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -1650,7 +1626,7 @@ func TestServerPlayPause(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -1748,7 +1724,7 @@ func TestServerPlayPlayPausePausePlay(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -1836,7 +1812,7 @@ func TestServerPlayTimeout(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -1927,7 +1903,7 @@ func TestServerPlayWithoutTeardown(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Delivery: deliveryPtr(headers.TransportDeliveryUnicast), @@ -2007,7 +1983,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Delivery: deliveryPtr(headers.TransportDeliveryUnicast), @@ -2093,7 +2069,7 @@ func TestServerPlayPartialMedias(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Delivery: deliveryPtr(headers.TransportDeliveryUnicast), @@ -2121,7 +2097,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Delivery: deliveryPtr(headers.TransportDeliveryUnicast), @@ -2346,7 +2322,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Delivery: deliveryPtr(headers.TransportDeliveryUnicast), @@ -2423,7 +2399,7 @@ func TestServerPlayStreamStats(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Mode: transportModePtr(headers.TransportModePlay), @@ -2453,3 +2429,148 @@ func TestServerPlayStreamStats(t *testing.T) { st := stream.Stats() require.Equal(t, uint64(16*2), st.BytesSent) } + +func TestServerPlayBackChannel(t *testing.T) { + for _, transport := range []string{ + "udp", + "tcp", + } { + t.Run(transport, func(t *testing.T) { + serverOk := make(chan struct{}) + var stream *ServerStream + + s := &Server{ + Handler: &testServerHandler{ + onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onSetup: func(_ *ServerHandlerOnSetupCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusOK, + }, stream, nil + }, + onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { + ctx.Session.OnPacketRTPAny(func(_ *description.Media, _ format.Format, _ *rtp.Packet) { + close(serverOk) + }) + + return &base.Response{ + StatusCode: base.StatusOK, + }, nil + }, + }, + RTSPAddress: "127.0.0.1:8554", + } + + if transport == "udp" { + s.UDPRTPAddress = "127.0.0.1:8000" + s.UDPRTCPAddress = "127.0.0.1:8001" + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + stream = &ServerStream{ + Server: s, + Desc: &description.Session{Medias: []*description.Media{ + testH264Media, + { + Type: description.MediaTypeAudio, + IsBackChannel: true, + Formats: []format.Format{&format.G711{ + PayloadTyp: 8, + MULaw: false, + SampleRate: 8000, + ChannelCount: 1, + }}, + }, + }}, + } + err = stream.Initialize() + require.NoError(t, err) + defer stream.Close() + + nconn, err := net.Dial("tcp", "127.0.0.1:8554") + require.NoError(t, err) + defer nconn.Close() + + conn := conn.NewConn(nconn) + + desc := doDescribe(t, conn, true) + + var session string + var serverPorts [2]*[2]int + var l1s [2]net.PacketConn + var l2s [2]net.PacketConn + + for i := 0; i < 2; i++ { + inTH := &headers.Transport{ + Mode: transportModePtr(headers.TransportModePlay), + } + + if transport == "udp" { + v := headers.TransportDeliveryUnicast + inTH.Delivery = &v + inTH.Protocol = headers.TransportProtocolUDP + inTH.ClientPorts = &[2]int{35466 + i*2, 35467 + i*2} + } else { + v := headers.TransportDeliveryUnicast + inTH.Delivery = &v + inTH.Protocol = headers.TransportProtocolTCP + inTH.InterleavedIDs = &[2]int{0 + i*2, 1 + i*2} + } + + res, th := doSetup(t, conn, mediaURL(t, desc.BaseURL, desc.Medias[i]).String(), inTH, "") + + if transport == "udp" { + serverPorts[i] = th.ServerPorts + + l1s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35466+i*2), 10))) + require.NoError(t, err) + defer l1s[i].Close() + + l2s[i], err = net.ListenPacket("udp", net.JoinHostPort("127.0.0.1", strconv.FormatInt(int64(35467+i*2), 10))) + require.NoError(t, err) + defer l2s[i].Close() + } + + session = readSession(t, res) + } + + doPlay(t, conn, "rtsp://127.0.0.1:8554/teststream", session) + + // client -> server RTP packet + + pkt := &rtp.Packet{ + Header: rtp.Header{ + Version: 2, + PayloadType: 8, + }, + Payload: []byte{1, 2, 3, 4}, + } + buf, err := pkt.Marshal() + require.NoError(t, err) + + if transport == "udp" { + _, err = l1s[1].WriteTo(buf, &net.UDPAddr{ + IP: net.ParseIP("127.0.0.1"), + Port: serverPorts[1][0], + }) + require.NoError(t, err) + } else { + err = conn.WriteInterleavedFrame(&base.InterleavedFrame{ + Channel: 2, + Payload: buf, + }, make([]byte, 1024)) + require.NoError(t, err) + } + + <-serverOk + + doTeardown(t, conn, "rtsp://127.0.0.1:8554/teststream", session) + }) + } +} diff --git a/server_session.go b/server_session.go index beca57a8..df0ab599 100644 --- a/server_session.go +++ b/server_session.go @@ -37,6 +37,15 @@ func stringsReverseIndex(s, substr string) int { return -1 } +func hasBackChannel(desc description.Session) bool { + for _, medi := range desc.Medias { + if medi.IsBackChannel { + return true + } + } + return false +} + // used for all methods except SETUP func getPathAndQuery(u *base.URL, isAnnounce bool) (string, string) { if !isAnnounce { @@ -245,13 +254,13 @@ type ServerSession struct { setuppedMediasOrdered []*serverSessionMedia tcpCallbackByChannel map[int]readFunc setuppedTransport *Transport - setuppedStream *ServerStream // read + setuppedStream *ServerStream // play setuppedPath string setuppedQuery string lastRequestTime time.Time tcpConn *ServerConn - announcedDesc *description.Session // publish - udpLastPacketTime *int64 // publish + announcedDesc *description.Session // record + udpLastPacketTime *int64 // record udpCheckStreamTimer *time.Timer writer *asyncProcessor writerMutex sync.RWMutex @@ -863,6 +872,12 @@ func (ss *ServerSession) handleRequestInner(sc *ServerConn, req *base.Request) ( }, liberrors.ErrServerSDPInvalid{Err: err} } + if hasBackChannel(desc) { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, liberrors.ErrServerSDPInvalid{Err: fmt.Errorf("back channels cannot be recorded")} + } + res, err := ss.s.Handler.(ServerHandlerOnAnnounce).OnAnnounce(&ServerHandlerOnAnnounceCtx{ Session: ss, Conn: sc, diff --git a/server_session_format.go b/server_session_format.go index 865a81cd..a686f84c 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -20,7 +20,7 @@ type serverSessionFormat struct { format format.Format onPacketRTP OnPacketRTPFunc - udpReorderer *rtpreorderer.Reorderer + udpReorderer *rtpreorderer.Reorderer // publish or back channel tcpLossDetector *rtplossdetector.LossDetector rtcpReceiver *rtcpreceiver.RTCPReceiver writePacketRTPInQueue func([]byte) error @@ -44,7 +44,7 @@ func (sf *serverSessionFormat) start() { sf.writePacketRTPInQueue = sf.writePacketRTPInQueueTCP } - if sf.sm.ss.state != ServerSessionStatePlay { + if sf.sm.ss.state == ServerSessionStateRecord || sf.sm.media.IsBackChannel { if *sf.sm.ss.setuppedTransport == TransportUDP || *sf.sm.ss.setuppedTransport == TransportUDPMulticast { sf.udpReorderer = &rtpreorderer.Reorderer{} sf.udpReorderer.Initialize() diff --git a/server_session_media.go b/server_session_media.go index f94ce5ad..23155600 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -69,7 +69,9 @@ func (sm *serverSessionMedia) start() { if sm.ss.state == ServerSessionStatePlay { // firewall opening is performed with RTCP sender reports generated by ServerStream - // readers can send RTCP packets only + if sm.media.IsBackChannel { + sm.ss.s.udpRTPListener.addClient(sm.ss.author.ip(), sm.udpRTPReadPort, sm.readPacketRTPUDPPlay) + } sm.ss.s.udpRTCPListener.addClient(sm.ss.author.ip(), sm.udpRTCPReadPort, sm.readPacketRTCPUDPPlay) } else { // open the firewall by sending empty packets to the counterpart. @@ -147,6 +149,34 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { return nil } +func (sm *serverSessionMedia) readPacketRTPUDPPlay(payload []byte) bool { + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) + + if len(payload) == (udpMaxPayloadSize + 1) { + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketTooBigUDP{}) + return false + } + + pkt := &rtp.Packet{} + err := pkt.Unmarshal(payload) + if err != nil { + sm.onPacketRTPDecodeError(err) + return false + } + + forma, ok := sm.formats[pkt.PayloadType] + if !ok { + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + return false + } + + now := sm.ss.s.timeNow() + + forma.readPacketRTPUDP(pkt, now) + + return true +} + func (sm *serverSessionMedia) readPacketRTCPUDPPlay(payload []byte) bool { atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) @@ -235,8 +265,29 @@ func (sm *serverSessionMedia) readPacketRTCPUDPRecord(payload []byte) bool { return true } -func (sm *serverSessionMedia) readPacketRTPTCPPlay(_ []byte) bool { - return false +func (sm *serverSessionMedia) readPacketRTPTCPPlay(payload []byte) bool { + if !sm.media.IsBackChannel { + return false + } + + atomic.AddUint64(sm.bytesReceived, uint64(len(payload))) + + pkt := &rtp.Packet{} + err := pkt.Unmarshal(payload) + if err != nil { + sm.onPacketRTPDecodeError(err) + return false + } + + forma, ok := sm.formats[pkt.PayloadType] + if !ok { + sm.onPacketRTPDecodeError(liberrors.ErrServerRTPPacketUnknownPayloadType{PayloadType: pkt.PayloadType}) + return false + } + + forma.readPacketRTPTCP(pkt) + + return true } func (sm *serverSessionMedia) readPacketRTCPTCPPlay(payload []byte) bool { diff --git a/server_test.go b/server_test.go index f02ac861..a2f98072 100644 --- a/server_test.go +++ b/server_test.go @@ -15,6 +15,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" + "github.com/bluenviron/gortsplib/v4/pkg/sdp" ) var serverCert = []byte(`-----BEGIN CERTIFICATE----- @@ -83,6 +84,36 @@ func writeReqReadRes( return conn.ReadResponse() } +func doDescribe(t *testing.T, conn *conn.Conn, backChannels bool) *description.Session { + header := base.Header{ + "CSeq": base.HeaderValue{"1"}, + } + + if backChannels { + header["Require"] = base.HeaderValue{"www.onvif.org/ver20/backchannel"} + } + + res, err := writeReqReadRes(conn, base.Request{ + Method: base.Describe, + URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"), + Header: header, + }) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + var desc sdp.SessionDescription + err = desc.Unmarshal(res.Body) + require.NoError(t, err) + + var desc2 description.Session + err = desc2.Unmarshal(&desc) + require.NoError(t, err) + + desc2.BaseURL = mustParseURL(res.Header["Content-Base"][0]) + + return &desc2 +} + type testServerHandler struct { onConnOpen func(*ServerHandlerOnConnOpenCtx) onConnClose func(*ServerHandlerOnConnCloseCtx) @@ -438,7 +469,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) var session string @@ -534,7 +565,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { defer nconn1.Close() conn1 := conn.NewConn(nconn1) - desc1 := doDescribe(t, conn1) + desc1 := doDescribe(t, conn1, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -554,7 +585,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { defer nconn2.Close() conn2 := conn.NewConn(nconn2) - desc2 := doDescribe(t, conn2) + desc2 := doDescribe(t, conn2, false) res, err = writeReqReadRes(conn2, base.Request{ Method: base.Setup, @@ -620,7 +651,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -688,7 +719,7 @@ func TestServerSetupMultipleTransports(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTHS := headers.Transports{ { @@ -789,7 +820,7 @@ func TestServerGetSetParameter(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) var session string @@ -1078,7 +1109,7 @@ func TestServerSessionClose(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -1157,7 +1188,7 @@ func TestServerSessionAutoClose(t *testing.T) { require.NoError(t, err) conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP, @@ -1225,7 +1256,7 @@ func TestServerSessionTeardown(t *testing.T) { defer nconn.Close() conn := conn.NewConn(nconn) - desc := doDescribe(t, conn) + desc := doDescribe(t, conn, false) inTH := &headers.Transport{ Protocol: headers.TransportProtocolTCP,