From 6c9dc82a55dd11b77b3947a4ee977d545d4b1b2a Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sat, 7 Nov 2020 18:32:26 +0100 Subject: [PATCH] add ConnClient.Pause --- connclient.go | 245 ++++++++++++++++++++++++++++++++------------------ dialer.go | 4 +- 2 files changed, 162 insertions(+), 87 deletions(-) diff --git a/connclient.go b/connclient.go index e652546f..0ce10fa4 100644 --- a/connclient.go +++ b/connclient.go @@ -38,8 +38,10 @@ type connClientState int const ( connClientStateInitial connClientState = iota - connClientStateReading - connClientStatePublishing + connClientStatePrePlay + connClientStatePlay + connClientStatePreRecord + connClientStateRecord ) // ConnClientConf allows to configure a ConnClient. @@ -149,7 +151,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { // Close closes all the ConnClient resources. func (c *ConnClient) Close() error { - if c.state == connClientStateReading { + if c.state == connClientStatePlay { c.Do(&base.Request{ Method: base.TEARDOWN, URL: c.streamUrl, @@ -175,6 +177,15 @@ func (c *ConnClient) Close() error { return err } +func (c *ConnClient) checkState(allowed map[connClientState]struct{}) error { + if _, ok := allowed[c.state]; ok { + return nil + } + + return fmt.Errorf("client must be in state %v, while is in state %v", + allowed, c.state) +} + // CloseUDPListeners closes any open UDP listener. func (c *ConnClient) CloseUDPListeners() { for _, l := range c.udpRtpListeners { @@ -342,8 +353,13 @@ func (c *ConnClient) Do(req *base.Request) (*base.Response, error) { // Since this method is not implemented by every RTSP server, the function // does not fail if the returned code is StatusNotFound. func (c *ConnClient) Options(u *base.URL) (*base.Response, error) { - if c.state != connClientStateInitial { - return nil, fmt.Errorf("can't be called when reading or publishing") + err := c.checkState(map[connClientState]struct{}{ + connClientStateInitial: {}, + connClientStatePrePlay: {}, + connClientStatePreRecord: {}, + }) + if err != nil { + return nil, err } res, err := c.Do(&base.Request{ @@ -363,8 +379,13 @@ func (c *ConnClient) Options(u *base.URL) (*base.Response, error) { // Describe writes a DESCRIBE request and reads a Response. func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) { - if c.state != connClientStateInitial { - return nil, nil, fmt.Errorf("can't be called when reading or publishing") + err := c.checkState(map[connClientState]struct{}{ + connClientStateInitial: {}, + connClientStatePrePlay: {}, + connClientStatePreRecord: {}, + }) + if err != nil { + return nil, nil, err } res, err := c.Do(&base.Request{ @@ -451,32 +472,27 @@ func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode, return newUrl } -func (c *ConnClient) setup(u *base.URL, mode headers.TransportMode, track *Track, ht *headers.Transport) (*base.Response, error) { - res, err := c.Do(&base.Request{ - Method: base.SETUP, - URL: c.urlForTrack(u, mode, track), - Header: base.Header{ - "Transport": ht.Write(), - }, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) - } - - return res, nil -} - // Setup writes a SETUP request and reads a Response. // rtpPort and rtcpPort are used only if protocol is UDP. // if rtpPort and rtcpPort are zero, they are chosen automatically. func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.StreamProtocol, track *Track, rtpPort int, rtcpPort int) (*base.Response, error) { - if c.state != connClientStateInitial { - return nil, fmt.Errorf("can't be called when reading or publishing") + err := c.checkState(map[connClientState]struct{}{ + connClientStateInitial: {}, + connClientStatePrePlay: {}, + connClientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + if mode == headers.TransportModeRecord && c.state != connClientStatePreRecord { + return nil, fmt.Errorf("cannot read and publish at the same time") + } + + if mode == headers.TransportModePlay && c.state != connClientStatePrePlay && + c.state != connClientStateInitial { + return nil, fmt.Errorf("cannot read and publish at the same time") } if c.streamUrl != nil && *u != *c.streamUrl { @@ -557,7 +573,13 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S transport.InterleavedIds = &[2]int{(track.Id * 2), (track.Id * 2) + 1} } - res, err := c.setup(u, mode, track, transport) + res, err := c.Do(&base.Request{ + Method: base.SETUP, + URL: c.urlForTrack(u, mode, track), + Header: base.Header{ + "Transport": transport.Write(), + }, + }) if err != nil { if proto == StreamProtocolUDP { rtpListener.close() @@ -566,6 +588,14 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S return nil, err } + if res.StatusCode != base.StatusOK { + if proto == StreamProtocolUDP { + rtpListener.close() + rtcpListener.close() + } + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + th, err := headers.ReadTransport(res.Header["Transport"]) if err != nil { if proto == StreamProtocolUDP { @@ -618,27 +648,28 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S c.udpRtcpListeners[track.Id] = rtcpListener } + if mode == headers.TransportModePlay { + c.state = connClientStatePrePlay + } else { + c.state = connClientStatePreRecord + } + return res, nil } // Play writes a PLAY request and reads a Response. // This can be called only after Setup(). -func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { - if c.state != connClientStateInitial { - return nil, fmt.Errorf("can't be called when reading or publishing") - } - - if c.streamUrl == nil { - return nil, fmt.Errorf("can be called only after a successful Setup()") - } - - if *u != *c.streamUrl { - return nil, fmt.Errorf("must be called with the same url used for Setup())") +func (c *ConnClient) Play() (*base.Response, error) { + err := c.checkState(map[connClientState]struct{}{ + connClientStatePrePlay: {}, + }) + if err != nil { + return nil, err } res, err := c.Do(&base.Request{ Method: base.PLAY, - URL: u, + URL: c.streamUrl, }) if err != nil { return nil, err @@ -648,7 +679,7 @@ func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } - c.state = connClientStateReading + c.state = connClientStatePlay // open the firewall by sending packets to the counterpart if *c.streamProtocol == StreamProtocolUDP { @@ -692,19 +723,81 @@ func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { return res, nil } +// Announce writes an ANNOUNCE request and reads a Response. +func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { + err := c.checkState(map[connClientState]struct{}{ + connClientStateInitial: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.Do(&base.Request{ + Method: base.ANNOUNCE, + URL: u, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Content: tracks.Write(), + }) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + c.streamUrl = u + c.state = connClientStatePreRecord + + return res, nil +} + +// Record writes a RECORD request and reads a Response. +// This can be called only after Announce() and Setup(). +func (c *ConnClient) Record() (*base.Response, error) { + err := c.checkState(map[connClientState]struct{}{ + connClientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.Do(&base.Request{ + Method: base.RECORD, + URL: c.streamUrl, + }) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + c.state = connClientStateRecord + + return nil, nil +} + // LoopUDP must be called after Play() or Record(); it keeps // the TCP connection open with keepalives, and returns when the TCP // connection closes. func (c *ConnClient) LoopUDP() error { - if c.state != connClientStateReading && c.state != connClientStatePublishing { - return fmt.Errorf("can be called only after a successful Play() or Record()") + err := c.checkState(map[connClientState]struct{}{ + connClientStatePlay: {}, + connClientStateRecord: {}, + }) + if err != nil { + return err } if *c.streamProtocol != StreamProtocolUDP { return fmt.Errorf("stream protocol is not UDP") } - if c.state == connClientStateReading { + if c.state == connClientStatePlay { readDone := make(chan error) go func() { for { @@ -759,25 +852,31 @@ func (c *ConnClient) LoopUDP() error { } } - // connClientStatePublishing + // connClientStateRecord c.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline var res base.Response return res.Read(c.br) } -// Announce writes an ANNOUNCE request and reads a Response. -func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { - if c.streamUrl != nil { - return nil, fmt.Errorf("announce has already been sent with another url") +// Pause writes a PAUSE request and reads a Response. +// This can be called only after Play() or Record(). +func (c *ConnClient) Pause() (*base.Response, error) { + err := c.checkState(map[connClientState]struct{}{ + connClientStatePlay: {}, + connClientStateRecord: {}, + }) + if err != nil { + return nil, err + } + + if c.state == connClientStatePlay { + close(c.receiverReportTerminate) + <-c.receiverReportDone } res, err := c.Do(&base.Request{ - Method: base.ANNOUNCE, - URL: u, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Content: tracks.Write(), + Method: base.PAUSE, + URL: c.streamUrl, }) if err != nil { return nil, err @@ -787,35 +886,11 @@ func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } - c.streamUrl = u + if c.state == connClientStatePlay { + c.state = connClientStatePrePlay + } else { + c.state = connClientStatePreRecord + } return res, nil } - -// Record writes a RECORD request and reads a Response. -// This can be called only after Announce() and Setup(). -func (c *ConnClient) Record(u *base.URL) (*base.Response, error) { - if c.state != connClientStateInitial { - return nil, fmt.Errorf("can't be called when reading or publishing") - } - - if *u != *c.streamUrl { - return nil, fmt.Errorf("must be called with the same url used for Announce()") - } - - res, err := c.Do(&base.Request{ - Method: base.RECORD, - URL: u, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) - } - - c.state = connClientStatePublishing - - return nil, nil -} diff --git a/dialer.go b/dialer.go index 2044762a..d3bd4b06 100644 --- a/dialer.go +++ b/dialer.go @@ -91,7 +91,7 @@ func (d Dialer) DialRead(address string, proto StreamProtocol) (*ConnClient, err } } - _, err = conn.Play(u) + _, err = conn.Play() if err != nil { conn.Close() return nil, err @@ -139,7 +139,7 @@ func (d Dialer) DialPublish(address string, proto StreamProtocol, tracks Tracks) } } - _, err = conn.Record(u) + _, err = conn.Record() if err != nil { conn.Close() return nil, err