From a53ba70dbc8f6dc2edb4cbd7fb36b7aee0b9736f Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Mon, 4 Jan 2021 21:50:36 +0100 Subject: [PATCH] replace content with payload --- clientconf.go | 65 +--------------------------------- clientconf_test.go | 12 +++---- clientconn.go | 68 ++++++++++++++++++++++++++++++++++-- clientconnpublish.go | 10 +++--- pkg/base/content.go | 6 ++-- pkg/base/interleavedframe.go | 2 +- pkg/base/request.go | 6 ++-- pkg/base/response.go | 6 ++-- pkg/base/response_test.go | 2 +- pkg/headers/transport.go | 2 +- 10 files changed, 89 insertions(+), 90 deletions(-) diff --git a/clientconf.go b/clientconf.go index d3b84a6b..fd0f281d 100644 --- a/clientconf.go +++ b/clientconf.go @@ -1,18 +1,12 @@ package gortsplib import ( - "bufio" "crypto/tls" - "fmt" "net" - "strings" "time" "github.com/aler9/gortsplib/pkg/base" "github.com/aler9/gortsplib/pkg/headers" - "github.com/aler9/gortsplib/pkg/multibuffer" - "github.com/aler9/gortsplib/pkg/rtcpreceiver" - "github.com/aler9/gortsplib/pkg/rtcpsender" ) // DefaultClientConf is the default ClientConf. @@ -80,64 +74,7 @@ type ClientConf struct { // Dial connects to a server. func (c ClientConf) Dial(scheme string, host string) (*ClientConn, error) { - if c.TLSConfig == nil { - c.TLSConfig = &tls.Config{InsecureSkipVerify: true} - } - if c.ReadTimeout == 0 { - c.ReadTimeout = 10 * time.Second - } - if c.WriteTimeout == 0 { - c.WriteTimeout = 10 * time.Second - } - if c.ReadBufferCount == 0 { - c.ReadBufferCount = 1 - } - if c.DialTimeout == nil { - c.DialTimeout = net.DialTimeout - } - if c.ListenPacket == nil { - c.ListenPacket = net.ListenPacket - } - - if scheme != "rtsp" && scheme != "rtsps" { - return nil, fmt.Errorf("unsupported scheme '%s'", scheme) - } - - v := StreamProtocolUDP - if scheme == "rtsps" && c.StreamProtocol == &v { - return nil, fmt.Errorf("RTSPS can't be used with UDP") - } - - if !strings.Contains(host, ":") { - host += ":554" - } - - nconn, err := c.DialTimeout("tcp", host, c.ReadTimeout) - if err != nil { - return nil, err - } - - conn := func() net.Conn { - if scheme == "rtsps" { - return tls.Client(nconn, c.TLSConfig) - } - return nconn - }() - - return &ClientConn{ - conf: c, - nconn: nconn, - isTLS: (scheme == "rtsps"), - br: bufio.NewReaderSize(conn, clientReadBufferSize), - bw: bufio.NewWriterSize(conn, clientWriteBufferSize), - udpRtpListeners: make(map[int]*clientConnUDPListener), - udpRtcpListeners: make(map[int]*clientConnUDPListener), - rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), - udpLastFrameTimes: make(map[int]*int64), - tcpFrameBuffer: multibuffer.New(c.ReadBufferCount, clientTCPFrameReadBufferSize), - rtcpSenders: make(map[int]*rtcpsender.RtcpSender), - publishError: fmt.Errorf("not running"), - }, nil + return newClientConn(c, scheme, host) } // DialRead connects to the address and starts reading all tracks. diff --git a/clientconf_test.go b/clientconf_test.go index c990f1e7..f34c33d9 100644 --- a/clientconf_test.go +++ b/clientconf_test.go @@ -127,7 +127,7 @@ func TestClientDialRead(t *testing.T) { var firstFrame int32 frameRecv := make(chan struct{}) - done := conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -137,7 +137,7 @@ func TestClientDialRead(t *testing.T) { conn.Close() <-done - done = conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done = conn.ReadFrames(func(id int, typ StreamType, payload []byte) { t.Error("should not happen") }) <-done @@ -175,7 +175,7 @@ func TestClientDialReadAutomaticProtocol(t *testing.T) { var firstFrame int32 frameRecv := make(chan struct{}) - done := conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -218,7 +218,7 @@ func TestClientDialReadRedirect(t *testing.T) { var firstFrame int32 frameRecv := make(chan struct{}) - done := conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -271,7 +271,7 @@ func TestClientDialReadPause(t *testing.T) { firstFrame := int32(0) frameRecv := make(chan struct{}) - done := conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done := conn.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } @@ -287,7 +287,7 @@ func TestClientDialReadPause(t *testing.T) { firstFrame = int32(0) frameRecv = make(chan struct{}) - done = conn.ReadFrames(func(id int, typ StreamType, content []byte) { + done = conn.ReadFrames(func(id int, typ StreamType, payload []byte) { if atomic.SwapInt32(&firstFrame, 1) == 0 { close(frameRecv) } diff --git a/clientconn.go b/clientconn.go index 861993ab..0f27af95 100644 --- a/clientconn.go +++ b/clientconn.go @@ -9,6 +9,7 @@ package gortsplib import ( "bufio" + "crypto/tls" "fmt" "math/rand" "net" @@ -98,6 +99,67 @@ type ClientConn struct { backgroundDone chan struct{} } +func newClientConn(conf ClientConf, scheme string, host string) (*ClientConn, error) { + if conf.TLSConfig == nil { + conf.TLSConfig = &tls.Config{InsecureSkipVerify: true} + } + if conf.ReadTimeout == 0 { + conf.ReadTimeout = 10 * time.Second + } + if conf.WriteTimeout == 0 { + conf.WriteTimeout = 10 * time.Second + } + if conf.ReadBufferCount == 0 { + conf.ReadBufferCount = 1 + } + if conf.DialTimeout == nil { + conf.DialTimeout = net.DialTimeout + } + if conf.ListenPacket == nil { + conf.ListenPacket = net.ListenPacket + } + + if scheme != "rtsp" && scheme != "rtsps" { + return nil, fmt.Errorf("unsupported scheme '%s'", scheme) + } + + v := StreamProtocolUDP + if scheme == "rtsps" && conf.StreamProtocol == &v { + return nil, fmt.Errorf("RTSPS can't be used with UDP") + } + + if !strings.Contains(host, ":") { + host += ":554" + } + + nconn, err := conf.DialTimeout("tcp", host, conf.ReadTimeout) + if err != nil { + return nil, err + } + + conn := func() net.Conn { + if scheme == "rtsps" { + return tls.Client(nconn, conf.TLSConfig) + } + return nconn + }() + + return &ClientConn{ + conf: conf, + nconn: nconn, + isTLS: (scheme == "rtsps"), + br: bufio.NewReaderSize(conn, clientReadBufferSize), + bw: bufio.NewWriterSize(conn, clientWriteBufferSize), + udpRtpListeners: make(map[int]*clientConnUDPListener), + udpRtcpListeners: make(map[int]*clientConnUDPListener), + rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), + udpLastFrameTimes: make(map[int]*int64), + tcpFrameBuffer: multibuffer.New(conf.ReadBufferCount, clientTCPFrameReadBufferSize), + rtcpSenders: make(map[int]*rtcpsender.RtcpSender), + publishError: fmt.Errorf("not running"), + }, nil +} + // Close closes all the ClientConn resources. func (c *ClientConn) Close() error { if c.state == clientConnStatePlay || c.state == clientConnStateRecord { @@ -342,12 +404,12 @@ func (c *ClientConn) Describe(u *base.URL) (Tracks, *base.Response, error) { return nil, res, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } - contentType, ok := res.Header["Content-Type"] - if !ok || len(contentType) != 1 { + payloadType, ok := res.Header["Content-Type"] + if !ok || len(payloadType) != 1 { return nil, nil, fmt.Errorf("Content-Type not provided") } - if contentType[0] != "application/sdp" { + if payloadType[0] != "application/sdp" { return nil, nil, fmt.Errorf("wrong Content-Type, expected application/sdp") } diff --git a/clientconnpublish.go b/clientconnpublish.go index a6e0f3e6..f9150f92 100644 --- a/clientconnpublish.go +++ b/clientconnpublish.go @@ -180,7 +180,7 @@ func (c *ClientConn) backgroundRecordTCP() { // WriteFrame writes a frame. // This can be called only after Record(). -func (c *ClientConn) WriteFrame(trackID int, streamType StreamType, content []byte) error { +func (c *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []byte) error { c.publishWriteMutex.RLock() defer c.publishWriteMutex.RUnlock() @@ -190,20 +190,20 @@ func (c *ClientConn) WriteFrame(trackID int, streamType StreamType, content []by now := time.Now() - c.rtcpSenders[trackID].ProcessFrame(now, streamType, content) + c.rtcpSenders[trackID].ProcessFrame(now, streamType, payload) if *c.streamProtocol == StreamProtocolUDP { if streamType == StreamTypeRtp { - return c.udpRtpListeners[trackID].write(content) + return c.udpRtpListeners[trackID].write(payload) } - return c.udpRtcpListeners[trackID].write(content) + return c.udpRtcpListeners[trackID].write(payload) } c.nconn.SetWriteDeadline(now.Add(c.conf.WriteTimeout)) frame := base.InterleavedFrame{ TrackID: trackID, StreamType: streamType, - Content: content, + Content: payload, } return frame.Write(c.bw) } diff --git a/pkg/base/content.go b/pkg/base/content.go index da6d8878..dd17fa4d 100644 --- a/pkg/base/content.go +++ b/pkg/base/content.go @@ -7,9 +7,9 @@ import ( "strconv" ) -type content []byte +type payload []byte -func (c *content) read(rb *bufio.Reader, header Header) error { +func (c *payload) read(rb *bufio.Reader, header Header) error { cls, ok := header["Content-Length"] if !ok || len(cls) != 1 { *c = nil @@ -34,7 +34,7 @@ func (c *content) read(rb *bufio.Reader, header Header) error { return nil } -func (c content) write(bw *bufio.Writer) error { +func (c payload) write(bw *bufio.Writer) error { if len(c) == 0 { return nil } diff --git a/pkg/base/interleavedframe.go b/pkg/base/interleavedframe.go index 21b00baf..289272be 100644 --- a/pkg/base/interleavedframe.go +++ b/pkg/base/interleavedframe.go @@ -66,7 +66,7 @@ type InterleavedFrame struct { // stream type StreamType StreamType - // frame content + // frame payload Content []byte } diff --git a/pkg/base/request.go b/pkg/base/request.go index ece18457..7b7cb335 100644 --- a/pkg/base/request.go +++ b/pkg/base/request.go @@ -43,7 +43,7 @@ type Request struct { // map of header values Header Header - // optional content + // optional payload Content []byte // whether to wait for a response or not @@ -99,7 +99,7 @@ func (req *Request) Read(rb *bufio.Reader) error { return err } - err = (*content)(&req.Content).read(rb, req.Header) + err = (*payload)(&req.Content).read(rb, req.Header) if err != nil { return err } @@ -124,7 +124,7 @@ func (req Request) Write(bw *bufio.Writer) error { return err } - err = content(req.Content).write(bw) + err = payload(req.Content).write(bw) if err != nil { return err } diff --git a/pkg/base/response.go b/pkg/base/response.go index 3dbcd0a3..78a3914a 100644 --- a/pkg/base/response.go +++ b/pkg/base/response.go @@ -129,7 +129,7 @@ type Response struct { // map of header values Header Header - // optional content + // optional payload Content []byte } @@ -177,7 +177,7 @@ func (res *Response) Read(rb *bufio.Reader) error { return err } - err = (*content)(&res.Content).read(rb, res.Header) + err = (*payload)(&res.Content).read(rb, res.Header) if err != nil { return err } @@ -207,7 +207,7 @@ func (res Response) Write(bw *bufio.Writer) error { return err } - err = content(res.Content).write(bw) + err = payload(res.Content).write(bw) if err != nil { return err } diff --git a/pkg/base/response_test.go b/pkg/base/response_test.go index c165af2b..5e66d462 100644 --- a/pkg/base/response_test.go +++ b/pkg/base/response_test.go @@ -54,7 +54,7 @@ var casesResponse = []struct { }, }, { - "ok with content", + "ok with payload", []byte("RTSP/1.0 200 OK\r\n" + "CSeq: 2\r\n" + "Content-Base: rtsp://example.com/media.mp4\r\n" + diff --git a/pkg/headers/transport.go b/pkg/headers/transport.go index 52186d1b..e6846197 100644 --- a/pkg/headers/transport.go +++ b/pkg/headers/transport.go @@ -190,7 +190,7 @@ func ReadTransport(v base.HeaderValue) (*Transport, error) { ht.Mode = &v default: - return nil, fmt.Errorf("unrecognized transport mode: '%s'", str) + return nil, fmt.Errorf("invalid transport mode: '%s'", str) } }