add ConnClient.Pause

This commit is contained in:
aler9
2020-11-07 18:32:26 +01:00
parent e711b2925f
commit 6c9dc82a55
2 changed files with 162 additions and 87 deletions

View File

@@ -38,8 +38,10 @@ type connClientState int
const ( const (
connClientStateInitial connClientState = iota connClientStateInitial connClientState = iota
connClientStateReading connClientStatePrePlay
connClientStatePublishing connClientStatePlay
connClientStatePreRecord
connClientStateRecord
) )
// ConnClientConf allows to configure a ConnClient. // ConnClientConf allows to configure a ConnClient.
@@ -149,7 +151,7 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
// Close closes all the ConnClient resources. // Close closes all the ConnClient resources.
func (c *ConnClient) Close() error { func (c *ConnClient) Close() error {
if c.state == connClientStateReading { if c.state == connClientStatePlay {
c.Do(&base.Request{ c.Do(&base.Request{
Method: base.TEARDOWN, Method: base.TEARDOWN,
URL: c.streamUrl, URL: c.streamUrl,
@@ -175,6 +177,15 @@ func (c *ConnClient) Close() error {
return err return err
} }
func (c *ConnClient) checkState(allowed map[connClientState]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
}
return fmt.Errorf("client must be in state %v, while is in state %v",
allowed, c.state)
}
// CloseUDPListeners closes any open UDP listener. // CloseUDPListeners closes any open UDP listener.
func (c *ConnClient) CloseUDPListeners() { func (c *ConnClient) CloseUDPListeners() {
for _, l := range c.udpRtpListeners { for _, l := range c.udpRtpListeners {
@@ -342,8 +353,13 @@ func (c *ConnClient) Do(req *base.Request) (*base.Response, error) {
// Since this method is not implemented by every RTSP server, the function // Since this method is not implemented by every RTSP server, the function
// does not fail if the returned code is StatusNotFound. // does not fail if the returned code is StatusNotFound.
func (c *ConnClient) Options(u *base.URL) (*base.Response, error) { func (c *ConnClient) Options(u *base.URL) (*base.Response, error) {
if c.state != connClientStateInitial { err := c.checkState(map[connClientState]struct{}{
return nil, fmt.Errorf("can't be called when reading or publishing") connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
})
if err != nil {
return nil, err
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
@@ -363,8 +379,13 @@ func (c *ConnClient) Options(u *base.URL) (*base.Response, error) {
// Describe writes a DESCRIBE request and reads a Response. // Describe writes a DESCRIBE request and reads a Response.
func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) { func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) {
if c.state != connClientStateInitial { err := c.checkState(map[connClientState]struct{}{
return nil, nil, fmt.Errorf("can't be called when reading or publishing") connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
})
if err != nil {
return nil, nil, err
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
@@ -451,32 +472,27 @@ func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode,
return newUrl return newUrl
} }
func (c *ConnClient) setup(u *base.URL, mode headers.TransportMode, track *Track, ht *headers.Transport) (*base.Response, error) {
res, err := c.Do(&base.Request{
Method: base.SETUP,
URL: c.urlForTrack(u, mode, track),
Header: base.Header{
"Transport": ht.Write(),
},
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
// Setup writes a SETUP request and reads a Response. // Setup writes a SETUP request and reads a Response.
// rtpPort and rtcpPort are used only if protocol is UDP. // rtpPort and rtcpPort are used only if protocol is UDP.
// if rtpPort and rtcpPort are zero, they are chosen automatically. // if rtpPort and rtcpPort are zero, they are chosen automatically.
func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.StreamProtocol, func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.StreamProtocol,
track *Track, rtpPort int, rtcpPort int) (*base.Response, error) { track *Track, rtpPort int, rtcpPort int) (*base.Response, error) {
if c.state != connClientStateInitial { err := c.checkState(map[connClientState]struct{}{
return nil, fmt.Errorf("can't be called when reading or publishing") connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
})
if err != nil {
return nil, err
}
if mode == headers.TransportModeRecord && c.state != connClientStatePreRecord {
return nil, fmt.Errorf("cannot read and publish at the same time")
}
if mode == headers.TransportModePlay && c.state != connClientStatePrePlay &&
c.state != connClientStateInitial {
return nil, fmt.Errorf("cannot read and publish at the same time")
} }
if c.streamUrl != nil && *u != *c.streamUrl { if c.streamUrl != nil && *u != *c.streamUrl {
@@ -557,7 +573,13 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
transport.InterleavedIds = &[2]int{(track.Id * 2), (track.Id * 2) + 1} transport.InterleavedIds = &[2]int{(track.Id * 2), (track.Id * 2) + 1}
} }
res, err := c.setup(u, mode, track, transport) res, err := c.Do(&base.Request{
Method: base.SETUP,
URL: c.urlForTrack(u, mode, track),
Header: base.Header{
"Transport": transport.Write(),
},
})
if err != nil { if err != nil {
if proto == StreamProtocolUDP { if proto == StreamProtocolUDP {
rtpListener.close() rtpListener.close()
@@ -566,6 +588,14 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
return nil, err return nil, err
} }
if res.StatusCode != base.StatusOK {
if proto == StreamProtocolUDP {
rtpListener.close()
rtcpListener.close()
}
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
th, err := headers.ReadTransport(res.Header["Transport"]) th, err := headers.ReadTransport(res.Header["Transport"])
if err != nil { if err != nil {
if proto == StreamProtocolUDP { if proto == StreamProtocolUDP {
@@ -618,27 +648,28 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
c.udpRtcpListeners[track.Id] = rtcpListener c.udpRtcpListeners[track.Id] = rtcpListener
} }
if mode == headers.TransportModePlay {
c.state = connClientStatePrePlay
} else {
c.state = connClientStatePreRecord
}
return res, nil return res, nil
} }
// Play writes a PLAY request and reads a Response. // Play writes a PLAY request and reads a Response.
// This can be called only after Setup(). // This can be called only after Setup().
func (c *ConnClient) Play(u *base.URL) (*base.Response, error) { func (c *ConnClient) Play() (*base.Response, error) {
if c.state != connClientStateInitial { err := c.checkState(map[connClientState]struct{}{
return nil, fmt.Errorf("can't be called when reading or publishing") connClientStatePrePlay: {},
} })
if err != nil {
if c.streamUrl == nil { return nil, err
return nil, fmt.Errorf("can be called only after a successful Setup()")
}
if *u != *c.streamUrl {
return nil, fmt.Errorf("must be called with the same url used for Setup())")
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
Method: base.PLAY, Method: base.PLAY,
URL: u, URL: c.streamUrl,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -648,7 +679,7 @@ func (c *ConnClient) Play(u *base.URL) (*base.Response, error) {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
c.state = connClientStateReading c.state = connClientStatePlay
// open the firewall by sending packets to the counterpart // open the firewall by sending packets to the counterpart
if *c.streamProtocol == StreamProtocolUDP { if *c.streamProtocol == StreamProtocolUDP {
@@ -692,19 +723,81 @@ func (c *ConnClient) Play(u *base.URL) (*base.Response, error) {
return res, nil return res, nil
} }
// Announce writes an ANNOUNCE request and reads a Response.
func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
connClientStateInitial: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.ANNOUNCE,
URL: u,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Content: tracks.Write(),
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.streamUrl = u
c.state = connClientStatePreRecord
return res, nil
}
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ConnClient) Record() (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
connClientStatePreRecord: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.RECORD,
URL: c.streamUrl,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.state = connClientStateRecord
return nil, nil
}
// LoopUDP must be called after Play() or Record(); it keeps // LoopUDP must be called after Play() or Record(); it keeps
// the TCP connection open with keepalives, and returns when the TCP // the TCP connection open with keepalives, and returns when the TCP
// connection closes. // connection closes.
func (c *ConnClient) LoopUDP() error { func (c *ConnClient) LoopUDP() error {
if c.state != connClientStateReading && c.state != connClientStatePublishing { err := c.checkState(map[connClientState]struct{}{
return fmt.Errorf("can be called only after a successful Play() or Record()") connClientStatePlay: {},
connClientStateRecord: {},
})
if err != nil {
return err
} }
if *c.streamProtocol != StreamProtocolUDP { if *c.streamProtocol != StreamProtocolUDP {
return fmt.Errorf("stream protocol is not UDP") return fmt.Errorf("stream protocol is not UDP")
} }
if c.state == connClientStateReading { if c.state == connClientStatePlay {
readDone := make(chan error) readDone := make(chan error)
go func() { go func() {
for { for {
@@ -759,25 +852,31 @@ func (c *ConnClient) LoopUDP() error {
} }
} }
// connClientStatePublishing // connClientStateRecord
c.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline c.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline
var res base.Response var res base.Response
return res.Read(c.br) return res.Read(c.br)
} }
// Announce writes an ANNOUNCE request and reads a Response. // Pause writes a PAUSE request and reads a Response.
func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { // This can be called only after Play() or Record().
if c.streamUrl != nil { func (c *ConnClient) Pause() (*base.Response, error) {
return nil, fmt.Errorf("announce has already been sent with another url") err := c.checkState(map[connClientState]struct{}{
connClientStatePlay: {},
connClientStateRecord: {},
})
if err != nil {
return nil, err
}
if c.state == connClientStatePlay {
close(c.receiverReportTerminate)
<-c.receiverReportDone
} }
res, err := c.Do(&base.Request{ res, err := c.Do(&base.Request{
Method: base.ANNOUNCE, Method: base.PAUSE,
URL: u, URL: c.streamUrl,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Content: tracks.Write(),
}) })
if err != nil { if err != nil {
return nil, err return nil, err
@@ -787,35 +886,11 @@ func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
c.streamUrl = u if c.state == connClientStatePlay {
c.state = connClientStatePrePlay
} else {
c.state = connClientStatePreRecord
}
return res, nil return res, nil
} }
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ConnClient) Record(u *base.URL) (*base.Response, error) {
if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing")
}
if *u != *c.streamUrl {
return nil, fmt.Errorf("must be called with the same url used for Announce()")
}
res, err := c.Do(&base.Request{
Method: base.RECORD,
URL: u,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.state = connClientStatePublishing
return nil, nil
}

View File

@@ -91,7 +91,7 @@ func (d Dialer) DialRead(address string, proto StreamProtocol) (*ConnClient, err
} }
} }
_, err = conn.Play(u) _, err = conn.Play()
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, err return nil, err
@@ -139,7 +139,7 @@ func (d Dialer) DialPublish(address string, proto StreamProtocol, tracks Tracks)
} }
} }
_, err = conn.Record(u) _, err = conn.Record()
if err != nil { if err != nil {
conn.Close() conn.Close()
return nil, err return nil, err