allow setting additional properties of streams through description.Stream

This commit is contained in:
aler9
2023-08-16 19:02:49 +02:00
committed by Alessandro Ros
parent 4e000eb2dd
commit cdbecb1f5d
54 changed files with 943 additions and 893 deletions

128
client.go
View File

@@ -23,10 +23,10 @@ import (
"github.com/bluenviron/gortsplib/v4/pkg/base"
"github.com/bluenviron/gortsplib/v4/pkg/bytecounter"
"github.com/bluenviron/gortsplib/v4/pkg/conn"
"github.com/bluenviron/gortsplib/v4/pkg/description"
"github.com/bluenviron/gortsplib/v4/pkg/format"
"github.com/bluenviron/gortsplib/v4/pkg/headers"
"github.com/bluenviron/gortsplib/v4/pkg/liberrors"
"github.com/bluenviron/gortsplib/v4/pkg/media"
"github.com/bluenviron/gortsplib/v4/pkg/rtptime"
"github.com/bluenviron/gortsplib/v4/pkg/sdp"
"github.com/bluenviron/gortsplib/v4/pkg/url"
@@ -91,7 +91,7 @@ func findBaseURL(sd *sdp.SessionDescription, res *base.Response, u *url.URL) (*u
return u, nil
}
func resetMediaControls(ms media.Medias) {
func resetMediaControls(ms []*description.Media) {
for i, media := range ms {
media.Control = "trackID=" + strconv.FormatInt(int64(i), 10)
}
@@ -148,14 +148,14 @@ type describeReq struct {
}
type announceReq struct {
url *url.URL
medias media.Medias
res chan clientRes
url *url.URL
desc *description.Session
res chan clientRes
}
type setupReq struct {
media *media.Media
baseURL *url.URL
media *description.Media
rtpPort int
rtcpPort int
res chan clientRes
@@ -175,10 +175,9 @@ type pauseReq struct {
}
type clientRes struct {
medias media.Medias
baseURL *url.URL
res *base.Response
err error
sd *description.Session // describe only
res *base.Response
err error
}
// ClientOnRequestFunc is the prototype of Client.OnRequest.
@@ -200,13 +199,13 @@ type ClientOnDecodeErrorFunc func(err error)
type OnPacketRTPFunc func(*rtp.Packet)
// OnPacketRTPAnyFunc is the prototype of the callback passed to OnPacketRTP(Any).
type OnPacketRTPAnyFunc func(*media.Media, format.Format, *rtp.Packet)
type OnPacketRTPAnyFunc func(*description.Media, format.Format, *rtp.Packet)
// OnPacketRTCPFunc is the prototype of the callback passed to OnPacketRTCP().
type OnPacketRTCPFunc func(rtcp.Packet)
// OnPacketRTCPAnyFunc is the prototype of the callback passed to OnPacketRTCPAny().
type OnPacketRTCPAnyFunc func(*media.Media, rtcp.Packet)
type OnPacketRTCPAnyFunc func(*description.Media, rtcp.Packet)
// Client is a RTSP client.
type Client struct {
@@ -306,7 +305,7 @@ type Client struct {
lastDescribeURL *url.URL
baseURL *url.URL
effectiveTransport *Transport
medias map[*media.Media]*clientMedia
medias map[*description.Media]*clientMedia
tcpCallbackByChannel map[int]readFunc
lastRange *headers.Range
checkTimeoutTimer *time.Timer
@@ -442,7 +441,7 @@ func (c *Client) Start(scheme string, host string) error {
}
// StartRecording connects to the address and starts publishing given media.
func (c *Client) StartRecording(address string, medias media.Medias) error {
func (c *Client) StartRecording(address string, desc *description.Session) error {
u, err := url.Parse(address)
if err != nil {
return err
@@ -453,13 +452,13 @@ func (c *Client) StartRecording(address string, medias media.Medias) error {
return err
}
_, err = c.Announce(u, medias)
_, err = c.Announce(u, desc)
if err != nil {
c.Close()
return err
}
err = c.SetupAll(u, medias)
err = c.SetupAll(u, desc.Medias)
if err != nil {
c.Close()
return err
@@ -505,11 +504,11 @@ func (c *Client) runInner() error {
req.res <- clientRes{res: res, err: err}
case req := <-c.describe:
medias, baseURL, res, err := c.doDescribe(req.url)
req.res <- clientRes{medias: medias, baseURL: baseURL, res: res, err: err}
sd, res, err := c.doDescribe(req.url)
req.res <- clientRes{sd: sd, res: res, err: err}
case req := <-c.announce:
res, err := c.doAnnounce(req.url, req.medias)
res, err := c.doAnnounce(req.url, req.desc)
req.res <- clientRes{res: res, err: err}
case req := <-c.setup:
@@ -624,7 +623,7 @@ func (c *Client) trySwitchingProtocol() error {
c.connURL = prevConnURL
// some Hikvision cameras require a describe before a setup
_, _, _, err := c.doDescribe(c.lastDescribeURL)
_, _, err := c.doDescribe(c.lastDescribeURL)
if err != nil {
return err
}
@@ -649,7 +648,7 @@ func (c *Client) trySwitchingProtocol() error {
return nil
}
func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*base.Response, error) {
func (c *Client) trySwitchingProtocol2(medi *description.Media, baseURL *url.URL) (*base.Response, error) {
c.OnTransportSwitch(fmt.Errorf("switching to TCP because server requested it"))
prevConnURL := c.connURL
@@ -661,7 +660,7 @@ func (c *Client) trySwitchingProtocol2(medi *media.Media, baseURL *url.URL) (*ba
c.connURL = prevConnURL
// some Hikvision cameras require a describe before a setup
_, _, _, err := c.doDescribe(c.lastDescribeURL)
_, _, err := c.doDescribe(c.lastDescribeURL)
if err != nil {
return nil, err
}
@@ -970,7 +969,7 @@ func (c *Client) doOptions(u *url.URL) (*base.Response, error) {
return res, nil
}
// Options writes an OPTIONS request and reads a response.
// Options sends an OPTIONS request.
func (c *Client) Options(u *url.URL) (*base.Response, error) {
cres := make(chan clientRes)
select {
@@ -983,14 +982,14 @@ func (c *Client) Options(u *url.URL) (*base.Response, error) {
}
}
func (c *Client) doDescribe(u *url.URL) (media.Medias, *url.URL, *base.Response, error) {
func (c *Client) doDescribe(u *url.URL) (*description.Session, *base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
clientStatePrePlay: {},
clientStatePreRecord: {},
})
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
res, err := c.do(&base.Request{
@@ -1001,7 +1000,7 @@ func (c *Client) doDescribe(u *url.URL) (media.Medias, *url.URL, *base.Response,
},
}, false, false)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if res.StatusCode != base.StatusOK {
@@ -1013,7 +1012,7 @@ func (c *Client) doDescribe(u *url.URL) (media.Medias, *url.URL, *base.Response,
ru, err := url.Parse(res.Header["Location"][0])
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
if u.User != nil {
@@ -1028,57 +1027,58 @@ func (c *Client) doDescribe(u *url.URL) (media.Medias, *url.URL, *base.Response,
return c.doDescribe(ru)
}
return nil, nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage}
return nil, res, liberrors.ErrClientBadStatusCode{Code: res.StatusCode, Message: res.StatusMessage}
}
ct, ok := res.Header["Content-Type"]
if !ok || len(ct) != 1 {
return nil, nil, nil, liberrors.ErrClientContentTypeMissing{}
return nil, nil, liberrors.ErrClientContentTypeMissing{}
}
// strip encoding information from Content-Type header
ct = base.HeaderValue{strings.Split(ct[0], ";")[0]}
if ct[0] != "application/sdp" {
return nil, nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct}
return nil, nil, liberrors.ErrClientContentTypeUnsupported{CT: ct}
}
var sd sdp.SessionDescription
err = sd.Unmarshal(res.Body)
var ssd sdp.SessionDescription
err = ssd.Unmarshal(res.Body)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
var medias media.Medias
err = medias.Unmarshal(sd.MediaDescriptions)
var desc description.Session
err = desc.Unmarshal(&ssd)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
baseURL, err := findBaseURL(&sd, res, u)
baseURL, err := findBaseURL(&ssd, res, u)
if err != nil {
return nil, nil, nil, err
return nil, nil, err
}
desc.BaseURL = baseURL
c.lastDescribeURL = u
return medias, baseURL, res, nil
return &desc, res, nil
}
// Describe writes a DESCRIBE request and reads a Response.
func (c *Client) Describe(u *url.URL) (media.Medias, *url.URL, *base.Response, error) {
// Describe sends a DESCRIBE request.
func (c *Client) Describe(u *url.URL) (*description.Session, *base.Response, error) {
cres := make(chan clientRes)
select {
case c.describe <- describeReq{url: u, res: cres}:
res := <-cres
return res.medias, res.baseURL, res.res, res.err
return res.sd, res.res, res.err
case <-c.ctx.Done():
return nil, nil, nil, liberrors.ErrClientTerminated{}
return nil, nil, liberrors.ErrClientTerminated{}
}
}
func (c *Client) doAnnounce(u *url.URL, medias media.Medias) (*base.Response, error) {
func (c *Client) doAnnounce(u *url.URL, desc *description.Session) (*base.Response, error) {
err := c.checkState(map[clientState]struct{}{
clientStateInitial: {},
})
@@ -1086,9 +1086,9 @@ func (c *Client) doAnnounce(u *url.URL, medias media.Medias) (*base.Response, er
return nil, err
}
resetMediaControls(medias)
resetMediaControls(desc.Medias)
byts, err := medias.Marshal(false).Marshal()
byts, err := desc.Marshal(false)
if err != nil {
return nil, err
}
@@ -1117,11 +1117,11 @@ func (c *Client) doAnnounce(u *url.URL, medias media.Medias) (*base.Response, er
return res, nil
}
// Announce writes an ANNOUNCE request and reads a Response.
func (c *Client) Announce(u *url.URL, medias media.Medias) (*base.Response, error) {
// Announce sends an ANNOUNCE request.
func (c *Client) Announce(u *url.URL, desc *description.Session) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.announce <- announceReq{url: u, medias: medias, res: cres}:
case c.announce <- announceReq{url: u, desc: desc, res: cres}:
res := <-cres
return res.res, res.err
@@ -1132,7 +1132,7 @@ func (c *Client) Announce(u *url.URL, medias media.Medias) (*base.Response, erro
func (c *Client) doSetup(
baseURL *url.URL,
medi *media.Media,
medi *description.Media,
rtpPort int,
rtcpPort int,
) (*base.Response, error) {
@@ -1384,7 +1384,7 @@ func (c *Client) doSetup(
}
if c.medias == nil {
c.medias = make(map[*media.Media]*clientMedia)
c.medias = make(map[*description.Media]*clientMedia)
}
c.medias[medi] = cm
@@ -1417,20 +1417,20 @@ func (c *Client) findFreeChannelPair() int {
}
}
// Setup writes a SETUP request and reads a Response.
// Setup sends a SETUP request.
// rtpPort and rtcpPort are used only if transport is UDP.
// if rtpPort and rtcpPort are zero, they are chosen automatically.
func (c *Client) Setup(
baseURL *url.URL,
media *media.Media,
media *description.Media,
rtpPort int,
rtcpPort int,
) (*base.Response, error) {
cres := make(chan clientRes)
select {
case c.setup <- setupReq{
media: media,
baseURL: baseURL,
media: media,
rtpPort: rtpPort,
rtcpPort: rtcpPort,
res: cres,
@@ -1444,7 +1444,7 @@ func (c *Client) Setup(
}
// SetupAll setups all the given medias.
func (c *Client) SetupAll(baseURL *url.URL, medias media.Medias) error {
func (c *Client) SetupAll(baseURL *url.URL, medias []*description.Media) error {
for _, m := range medias {
_, err := c.Setup(baseURL, m, 0, 0)
if err != nil {
@@ -1509,7 +1509,7 @@ func (c *Client) doPlay(ra *headers.Range) (*base.Response, error) {
return res, nil
}
// Play writes a PLAY request and reads a Response.
// Play sends a PLAY request.
// This can be called only after Setup().
func (c *Client) Play(ra *headers.Range) (*base.Response, error) {
cres := make(chan clientRes)
@@ -1551,7 +1551,7 @@ func (c *Client) doRecord() (*base.Response, error) {
return nil, nil
}
// Record writes a RECORD request and reads a Response.
// Record sends a RECORD request.
// This can be called only after Announce() and Setup().
func (c *Client) Record() (*base.Response, error) {
cres := make(chan clientRes)
@@ -1601,7 +1601,7 @@ func (c *Client) doPause() (*base.Response, error) {
return res, nil
}
// Pause writes a PAUSE request and reads a Response.
// Pause sends a PAUSE request.
// This can be called only after Play() or Record().
func (c *Client) Pause() (*base.Response, error) {
cres := make(chan clientRes)
@@ -1648,26 +1648,26 @@ func (c *Client) OnPacketRTCPAny(cb OnPacketRTCPAnyFunc) {
}
// OnPacketRTP sets the callback that is called when a RTP packet is read.
func (c *Client) OnPacketRTP(medi *media.Media, forma format.Format, cb OnPacketRTPFunc) {
func (c *Client) OnPacketRTP(medi *description.Media, forma format.Format, cb OnPacketRTPFunc) {
cm := c.medias[medi]
ct := cm.formats[forma.PayloadType()]
ct.onPacketRTP = cb
}
// OnPacketRTCP sets the callback that is called when a RTCP packet is read.
func (c *Client) OnPacketRTCP(medi *media.Media, cb OnPacketRTCPFunc) {
func (c *Client) OnPacketRTCP(medi *description.Media, cb OnPacketRTCPFunc) {
cm := c.medias[medi]
cm.onPacketRTCP = cb
}
// WritePacketRTP writes a RTP packet to the server.
func (c *Client) WritePacketRTP(medi *media.Media, pkt *rtp.Packet) error {
func (c *Client) WritePacketRTP(medi *description.Media, pkt *rtp.Packet) error {
return c.WritePacketRTPWithNTP(medi, pkt, c.timeNow())
}
// WritePacketRTPWithNTP writes a RTP packet to the server.
// ntp is the absolute time of the packet, and is sent with periodic RTCP sender reports.
func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp time.Time) error {
func (c *Client) WritePacketRTPWithNTP(medi *description.Media, pkt *rtp.Packet, ntp time.Time) error {
byts := make([]byte, c.MaxPacketSize)
n, err := pkt.MarshalTo(byts)
if err != nil {
@@ -1688,7 +1688,7 @@ func (c *Client) WritePacketRTPWithNTP(medi *media.Media, pkt *rtp.Packet, ntp t
}
// WritePacketRTCP writes a RTCP packet to the server.
func (c *Client) WritePacketRTCP(medi *media.Media, pkt rtcp.Packet) error {
func (c *Client) WritePacketRTCP(medi *description.Media, pkt rtcp.Packet) error {
byts, err := pkt.Marshal()
if err != nil {
return err
@@ -1713,7 +1713,7 @@ func (c *Client) PacketPTS(forma format.Format, pkt *rtp.Packet) (time.Duration,
// PacketNTP returns the NTP timestamp of an incoming RTP packet.
// The NTP timestamp is computed from sender reports.
func (c *Client) PacketNTP(medi *media.Media, pkt *rtp.Packet) (time.Time, bool) {
func (c *Client) PacketNTP(medi *description.Media, pkt *rtp.Packet) (time.Time, bool) {
cm := c.medias[medi]
ct := cm.formats[pkt.PayloadType]
return ct.rtcpReceiver.PacketNTP(pkt.Timestamp)