client: allow to call client.Close() always

This commit is contained in:
aler9
2021-05-10 17:18:41 +02:00
parent 034eee1c8a
commit f8ef945dae
8 changed files with 1019 additions and 745 deletions

View File

@@ -1,3 +1,10 @@
/*
Package gortsplib is a RTSP 1.0 library for the Go programming language,
written for rtsp-simple-server.
Examples are available at https://github.com/aler9/gortsplib/tree/master/examples
*/
package gortsplib
import (

View File

@@ -381,6 +381,9 @@ func TestClientRead(t *testing.T) {
<-frameRecv
conn.Close()
<-done
<-conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
})
})
}
}
@@ -1232,6 +1235,9 @@ func TestClientReadPause(t *testing.T) {
require.NoError(t, err)
<-done
<-conn.ReadFrames(func(id int, typ StreamType, payload []byte) {
})
_, err = conn.Play()
require.NoError(t, err)

File diff suppressed because it is too large Load Diff

View File

@@ -1,187 +0,0 @@
package gortsplib
import (
"fmt"
"strconv"
"time"
psdp "github.com/pion/sdp/v3"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/liberrors"
)
// Announce writes an ANNOUNCE request and reads a Response.
func (cc *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, error) {
err := cc.checkState(map[clientConnState]struct{}{
clientConnStateInitial: {},
})
if err != nil {
return nil, err
}
// in case of ANNOUNCE, the base URL doesn't have a trailing slash.
// (tested with ffmpeg and gstreamer)
baseURL := u.Clone()
// set id, base url and control attribute on tracks
for i, t := range tracks {
t.ID = i
t.BaseURL = baseURL
t.Media.Attributes = append(t.Media.Attributes, psdp.Attribute{
Key: "control",
Value: "trackID=" + strconv.FormatInt(int64(i), 10),
})
}
res, err := cc.Do(&base.Request{
Method: base.Announce,
URL: u,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Body: tracks.Write(),
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientWrongStatusCode{
Code: res.StatusCode, Message: res.StatusMessage}
}
cc.streamBaseURL = baseURL
cc.state = clientConnStatePreRecord
return res, nil
}
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (cc *ClientConn) Record() (*base.Response, error) {
err := cc.checkState(map[clientConnState]struct{}{
clientConnStatePreRecord: {},
})
if err != nil {
return nil, err
}
res, err := cc.Do(&base.Request{
Method: base.Record,
URL: cc.streamBaseURL,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientWrongStatusCode{
Code: res.StatusCode, Message: res.StatusMessage}
}
cc.state = clientConnStateRecord
cc.ReadFrames(func(trackID int, streamType StreamType, payload []byte) {
})
return nil, nil
}
func (cc *ClientConn) backgroundRecordUDP() error {
for _, cct := range cc.tracks {
cct.udpRTPListener.start()
cct.udpRTCPListener.start()
}
defer func() {
for _, cct := range cc.tracks {
cct.udpRTPListener.stop()
cct.udpRTCPListener.stop()
}
}()
// disable deadline
cc.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
for {
var res base.Response
err := res.Read(cc.br)
if err != nil {
readerDone <- err
return
}
}
}()
reportTicker := time.NewTicker(cc.c.senderReportPeriod)
defer reportTicker.Stop()
for {
select {
case <-cc.backgroundTerminate:
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range cc.tracks {
sr := cct.rtcpSender.Report(now)
if sr != nil {
cc.WriteFrame(trackID, StreamTypeRTCP, sr)
}
}
case err := <-readerDone:
return err
}
}
}
func (cc *ClientConn) backgroundRecordTCP() error {
// disable deadline
cc.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
for {
frame := base.InterleavedFrame{
Payload: cc.tcpFrameBuffer.Next(),
}
err := frame.Read(cc.br)
if err != nil {
readerDone <- err
return
}
cc.readCB(frame.TrackID, frame.StreamType, frame.Payload)
}
}()
reportTicker := time.NewTicker(cc.c.senderReportPeriod)
defer reportTicker.Stop()
for {
select {
case <-cc.backgroundTerminate:
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range cc.tracks {
sr := cct.rtcpSender.Report(now)
if sr != nil {
cc.WriteFrame(trackID, StreamTypeRTCP, sr)
}
}
case err := <-readerDone:
return err
}
}
}

View File

@@ -1,267 +0,0 @@
package gortsplib
import (
"fmt"
"sync/atomic"
"time"
"github.com/aler9/gortsplib/pkg/base"
"github.com/aler9/gortsplib/pkg/headers"
"github.com/aler9/gortsplib/pkg/liberrors"
)
// Play writes a PLAY request and reads a Response.
// This can be called only after Setup().
func (cc *ClientConn) Play() (*base.Response, error) {
err := cc.checkState(map[clientConnState]struct{}{
clientConnStatePrePlay: {},
})
if err != nil {
return nil, err
}
res, err := cc.Do(&base.Request{
Method: base.Play,
URL: cc.streamBaseURL,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientWrongStatusCode{
Code: res.StatusCode, Message: res.StatusMessage}
}
if v, ok := res.Header["RTP-Info"]; ok {
var ri headers.RTPInfo
err := ri.Read(v)
if err != nil {
return nil, liberrors.ErrClientRTPInfoInvalid{Err: err}
}
cc.rtpInfo = &ri
}
cc.state = clientConnStatePlay
return res, nil
}
// RTPInfo returns the RTP-Info header sent by the server in the PLAY response.
func (cc *ClientConn) RTPInfo() *headers.RTPInfo {
return cc.rtpInfo
}
func (cc *ClientConn) backgroundPlayUDP() error {
// open the firewall by sending packets to the counterpart
for _, cct := range cc.tracks {
cct.udpRTPListener.write(
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
cct.udpRTCPListener.write(
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
for _, cct := range cc.tracks {
cct.udpRTPListener.start()
cct.udpRTCPListener.start()
}
defer func() {
for _, cct := range cc.tracks {
cct.udpRTPListener.stop()
cct.udpRTCPListener.stop()
}
}()
// disable deadline
cc.nconn.SetReadDeadline(time.Time{})
readerDone := make(chan error)
go func() {
for {
var res base.Response
err := res.Read(cc.br)
if err != nil {
readerDone <- err
return
}
}
}()
reportTicker := time.NewTicker(cc.c.receiverReportPeriod)
defer reportTicker.Stop()
keepaliveTicker := time.NewTicker(clientConnUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamInitial := true
checkStreamTicker := time.NewTicker(cc.c.InitialUDPReadTimeout)
defer func() {
checkStreamTicker.Stop()
}()
for {
select {
case <-cc.backgroundTerminate:
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for _, cct := range cc.tracks {
rr := cct.rtcpReceiver.Report(now)
cct.udpRTCPListener.write(rr)
}
case <-keepaliveTicker.C:
_, err := cc.Do(&base.Request{
Method: func() base.Method {
// the vlc integrated rtsp server requires GET_PARAMETER
if cc.useGetParameter {
return base.GetParameter
}
return base.Options
}(),
// use the stream base URL, otherwise some cameras do not reply
URL: cc.streamBaseURL,
SkipResponse: true,
})
if err != nil {
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return err
}
case <-checkStreamTicker.C:
if checkStreamInitial {
// check that at least one packet has been received
inTimeout := func() bool {
for _, cct := range cc.tracks {
lft := atomic.LoadInt64(cct.udpRTPListener.lastFrameTime)
if lft != 0 {
return false
}
lft = atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime)
if lft != 0 {
return false
}
}
return true
}()
if inTimeout {
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientNoUDPPacketsRecently{}
}
checkStreamInitial = false
checkStreamTicker.Stop()
checkStreamTicker = time.NewTicker(clientConnCheckStreamPeriod)
} else {
inTimeout := func() bool {
now := time.Now()
for _, cct := range cc.tracks {
lft := time.Unix(atomic.LoadInt64(cct.udpRTPListener.lastFrameTime), 0)
if now.Sub(lft) < cc.c.ReadTimeout {
return false
}
lft = time.Unix(atomic.LoadInt64(cct.udpRTCPListener.lastFrameTime), 0)
if now.Sub(lft) < cc.c.ReadTimeout {
return false
}
}
return true
}()
if inTimeout {
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientUDPTimeout{}
}
}
case err := <-readerDone:
return err
}
}
}
func (cc *ClientConn) backgroundPlayTCP() error {
// for some reason, SetReadDeadline() must always be called in the same
// goroutine, otherwise Read() freezes.
// therefore, we disable the deadline and perform check with a ticker.
cc.nconn.SetReadDeadline(time.Time{})
var lastFrameTime int64
readerDone := make(chan error)
go func() {
for {
frame := base.InterleavedFrame{
Payload: cc.tcpFrameBuffer.Next(),
}
err := frame.Read(cc.br)
if err != nil {
readerDone <- err
return
}
track, ok := cc.tracks[frame.TrackID]
if !ok {
continue
}
now := time.Now()
atomic.StoreInt64(&lastFrameTime, now.Unix())
track.rtcpReceiver.ProcessFrame(now, frame.StreamType, frame.Payload)
cc.readCB(frame.TrackID, frame.StreamType, frame.Payload)
}
}()
reportTicker := time.NewTicker(cc.c.receiverReportPeriod)
defer reportTicker.Stop()
checkStreamTicker := time.NewTicker(clientConnCheckStreamPeriod)
defer checkStreamTicker.Stop()
for {
select {
case <-cc.backgroundTerminate:
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
for trackID, cct := range cc.tracks {
r := cct.rtcpReceiver.Report(now)
cc.nconn.SetWriteDeadline(time.Now().Add(cc.c.WriteTimeout))
frame := base.InterleavedFrame{
TrackID: trackID,
StreamType: StreamTypeRTCP,
Payload: r,
}
frame.Write(cc.bw)
}
case <-checkStreamTicker.C:
inTimeout := func() bool {
now := time.Now()
lft := time.Unix(atomic.LoadInt64(&lastFrameTime), 0)
return now.Sub(lft) >= cc.c.ReadTimeout
}()
if inTimeout {
cc.nconn.SetReadDeadline(time.Now())
<-readerDone
return liberrors.ErrClientTCPTimeout{}
}
case err := <-readerDone:
return err
}
}
}

View File

@@ -3,6 +3,7 @@ package gortsplib
import (
"net"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -25,6 +26,7 @@ type clientConnUDPListener struct {
running bool
frameBuffer *multibuffer.MultiBuffer
lastFrameTime *int64
writeMutex sync.Mutex
done chan struct{}
}
@@ -90,7 +92,7 @@ func (l *clientConnUDPListener) run() {
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
l.cc.tracks[l.trackID].rtcpReceiver.ProcessFrame(now, l.streamType, buf[:n])
l.cc.readCB(l.trackID, l.streamType, buf[:n])
l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n])
}
} else {
for {
@@ -108,12 +110,15 @@ func (l *clientConnUDPListener) run() {
now := time.Now()
atomic.StoreInt64(l.lastFrameTime, now.Unix())
l.cc.readCB(l.trackID, l.streamType, buf[:n])
l.cc.pullReadCB()(l.trackID, l.streamType, buf[:n])
}
}
}
func (l *clientConnUDPListener) write(buf []byte) error {
l.writeMutex.Lock()
defer l.writeMutex.Unlock()
l.pc.SetWriteDeadline(time.Now().Add(l.cc.c.WriteTimeout))
_, err := l.pc.WriteTo(buf, &net.UDPAddr{
IP: l.remoteIP,

View File

@@ -45,10 +45,6 @@ type Request struct {
// optional body
Body []byte
// whether to wait for a response or not
// used only by ClientConn.Do()
SkipResponse bool
}
// Read reads a request.

View File

@@ -6,6 +6,14 @@ import (
"github.com/aler9/gortsplib/pkg/base"
)
// ErrClientTerminated is an error that can be returned by a client.
type ErrClientTerminated struct{}
// Error implements the error interface.
func (e ErrClientTerminated) Error() string {
return "terminated"
}
// ErrClientWrongState is an error that can be returned by a client.
type ErrClientWrongState struct {
AllowedList []fmt.Stringer