diff --git a/Makefile b/Makefile index 276c4a17..bed83508 100644 --- a/Makefile +++ b/Makefile @@ -49,5 +49,5 @@ test: test-nodocker: $(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \ docker build -q testimages/$(IMG) -t gortsplib-test-$(IMG)$(NL)) - go test -race -v . + go test -race -v ./... $(foreach f,$(shell ls examples/*),go build -o /dev/null $(f)$(NL)) diff --git a/auth_test.go b/auth_test.go index 70780dc7..0fe9a49a 100644 --- a/auth_test.go +++ b/auth_test.go @@ -5,6 +5,8 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/base" ) var casesAuth = []struct { @@ -33,10 +35,10 @@ func TestAuthMethods(t *testing.T) { ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass") require.NoError(t, err) - authorization := ac.GenerateHeader(ANNOUNCE, + authorization := ac.GenerateHeader(base.ANNOUNCE, &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"}) - err = authServer.ValidateHeader(authorization, ANNOUNCE, + err = authServer.ValidateHeader(authorization, base.ANNOUNCE, &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"}) require.NoError(t, err) }) @@ -49,10 +51,10 @@ func TestAuthBasePath(t *testing.T) { ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass") require.NoError(t, err) - authorization := ac.GenerateHeader(ANNOUNCE, + authorization := ac.GenerateHeader(base.ANNOUNCE, &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/"}) - err = authServer.ValidateHeader(authorization, ANNOUNCE, + err = authServer.ValidateHeader(authorization, base.ANNOUNCE, &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/trackId=0"}) require.NoError(t, err) } diff --git a/authclient.go b/authclient.go index 86ee490e..d58847ca 100644 --- a/authclient.go +++ b/authclient.go @@ -5,6 +5,8 @@ import ( "fmt" "net/url" "strings" + + "github.com/aler9/gortsplib/base" ) // authClient is an object that helps a client to send its credentials to a @@ -19,7 +21,7 @@ type authClient struct { // newAuthClient allocates an authClient. // header is the WWW-Authenticate header provided by the server. -func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error) { +func newAuthClient(v base.HeaderValue, user string, pass string) (*authClient, error) { // prefer digest if headerAuthDigest := func() string { for _, vi := range v { @@ -29,7 +31,7 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error) } return "" }(); headerAuthDigest != "" { - auth, err := ReadHeaderAuth(HeaderValue{headerAuthDigest}) + auth, err := ReadHeaderAuth(base.HeaderValue{headerAuthDigest}) if err != nil { return nil, err } @@ -59,7 +61,7 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error) } return "" }(); headerAuthBasic != "" { - auth, err := ReadHeaderAuth(HeaderValue{headerAuthBasic}) + auth, err := ReadHeaderAuth(base.HeaderValue{headerAuthBasic}) if err != nil { return nil, err } @@ -81,12 +83,12 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error) // GenerateHeader generates an Authorization Header that allows to authenticate a request with // the given method and url. -func (ac *authClient) GenerateHeader(method Method, ur *url.URL) HeaderValue { +func (ac *authClient) GenerateHeader(method base.Method, ur *url.URL) base.HeaderValue { switch ac.method { case Basic: response := base64.StdEncoding.EncodeToString([]byte(ac.user + ":" + ac.pass)) - return HeaderValue{"Basic " + response} + return base.HeaderValue{"Basic " + response} case Digest: response := md5Hex(md5Hex(ac.user+":"+ac.realm+":"+ac.pass) + ":" + diff --git a/authserver.go b/authserver.go index 98e27922..2b347693 100644 --- a/authserver.go +++ b/authserver.go @@ -7,6 +7,8 @@ import ( "fmt" "net/url" "strings" + + "github.com/aler9/gortsplib/base" ) // AuthServer is an object that helps a server to validate the credentials of @@ -40,8 +42,8 @@ func NewAuthServer(user string, pass string, methods []AuthMethod) *AuthServer { } // GenerateHeader generates the WWW-Authenticate header needed by a client to log in. -func (as *AuthServer) GenerateHeader() HeaderValue { - var ret HeaderValue +func (as *AuthServer) GenerateHeader() base.HeaderValue { + var ret base.HeaderValue for _, m := range as.methods { switch m { case Basic: @@ -63,7 +65,7 @@ func (as *AuthServer) GenerateHeader() HeaderValue { // ValidateHeader validates the Authorization header sent by a client after receiving the // WWW-Authenticate header. -func (as *AuthServer) ValidateHeader(v HeaderValue, method Method, ur *url.URL) error { +func (as *AuthServer) ValidateHeader(v base.HeaderValue, method base.Method, ur *url.URL) error { if len(v) == 0 { return fmt.Errorf("authorization header not provided") } @@ -83,7 +85,7 @@ func (as *AuthServer) ValidateHeader(v HeaderValue, method Method, ur *url.URL) } } else if strings.HasPrefix(v0, "Digest ") { - auth, err := ReadHeaderAuth(HeaderValue{v0}) + auth, err := ReadHeaderAuth(base.HeaderValue{v0}) if err != nil { return err } diff --git a/base/defs.go b/base/defs.go new file mode 100644 index 00000000..b5ad13af --- /dev/null +++ b/base/defs.go @@ -0,0 +1,24 @@ +package base + +// StreamType is the stream type. +type StreamType int + +const ( + // StreamTypeRtp means that the stream contains RTP packets + StreamTypeRtp StreamType = iota + + // StreamTypeRtcp means that the stream contains RTCP packets + StreamTypeRtcp +) + +// String implements fmt.Stringer +func (st StreamType) String() string { + switch st { + case StreamTypeRtp: + return "RTP" + + case StreamTypeRtcp: + return "RTCP" + } + return "unknown" +} diff --git a/header.go b/base/header.go similarity index 99% rename from header.go rename to base/header.go index 945ec9a5..2fd13ca5 100644 --- a/header.go +++ b/base/header.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" diff --git a/header_test.go b/base/header_test.go similarity index 99% rename from header_test.go rename to base/header_test.go index 5467c9a6..5c02cdb1 100644 --- a/header_test.go +++ b/base/header_test.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" diff --git a/interleavedframe.go b/base/interleavedframe.go similarity index 62% rename from interleavedframe.go rename to base/interleavedframe.go index d91f12ba..9f554cb2 100644 --- a/interleavedframe.go +++ b/base/interleavedframe.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" @@ -11,7 +11,45 @@ const ( interleavedFrameMagicByte = 0x24 ) -// InterleavedFrame is a structure that allows to transfer binary data +// ReadInterleavedFrameOrResponse reads an InterleavedFrame or a Response. +func ReadInterleavedFrameOrResponse(frame *InterleavedFrame, br *bufio.Reader) (interface{}, error) { + b, err := br.ReadByte() + if err != nil { + return nil, err + } + br.UnreadByte() + + if b == interleavedFrameMagicByte { + err := frame.Read(br) + if err != nil { + return nil, err + } + return frame, err + } + + return ReadResponse(br) +} + +// ReadInterleavedFrameOrRequest reads an InterleavedFrame or a Response. +func ReadInterleavedFrameOrRequest(frame *InterleavedFrame, br *bufio.Reader) (interface{}, error) { + b, err := br.ReadByte() + if err != nil { + return nil, err + } + br.UnreadByte() + + if b == interleavedFrameMagicByte { + err := frame.Read(br) + if err != nil { + return nil, err + } + return frame, err + } + + return ReadRequest(br) +} + +// InterleavedFrame is an interleaved frame, and allows to transfer binary data // within RTSP/TCP connections. It is used to send and receive RTP and RTCP packets with TCP. type InterleavedFrame struct { // track id @@ -24,7 +62,7 @@ type InterleavedFrame struct { Content []byte } -// Read reads an interleaved frame from a buffered reader. +// Read reads an interleaved frame. func (f *InterleavedFrame) Read(br *bufio.Reader) error { var header [4]byte _, err := io.ReadFull(br, header[:]) @@ -61,7 +99,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error { } // Write writes an InterleavedFrame into a buffered writer. -func (f *InterleavedFrame) Write(bw *bufio.Writer) error { +func (f InterleavedFrame) Write(bw *bufio.Writer) error { // convert TrackId and StreamType into channel channel := func() uint8 { if f.StreamType == StreamTypeRtp { diff --git a/request.go b/base/request.go similarity index 93% rename from request.go rename to base/request.go index 9be3a9b4..72e4028d 100644 --- a/request.go +++ b/base/request.go @@ -1,4 +1,5 @@ -package gortsplib +// Package base contains the base elements of the RTSP protocol. +package base import ( "bufio" @@ -52,7 +53,7 @@ type Request struct { SkipResponse bool } -// ReadRequest reads a request from a buffered reader. +// ReadRequest reads a request. func ReadRequest(rb *bufio.Reader) (*Request, error) { req := &Request{} @@ -114,8 +115,8 @@ func ReadRequest(rb *bufio.Reader) (*Request, error) { return req, nil } -// Write writes a request into a buffered writer. -func (req *Request) Write(bw *bufio.Writer) error { +// Write writes a request. +func (req Request) Write(bw *bufio.Writer) error { // remove credentials u := &url.URL{ Scheme: req.Url.Scheme, diff --git a/request_test.go b/base/request_test.go similarity index 99% rename from request_test.go rename to base/request_test.go index 67fdd025..b8225e9c 100644 --- a/request_test.go +++ b/base/request_test.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" diff --git a/response.go b/base/response.go similarity index 97% rename from response.go rename to base/response.go index e7061268..8b877631 100644 --- a/response.go +++ b/base/response.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" @@ -132,7 +132,7 @@ type Response struct { Content []byte } -// ReadResponse reads a response from a buffered reader. +// ReadResponse reads a response. func ReadResponse(rb *bufio.Reader) (*Response, error) { res := &Response{} @@ -186,8 +186,8 @@ func ReadResponse(rb *bufio.Reader) (*Response, error) { return res, nil } -// Write writes a Response into a buffered writer. -func (res *Response) Write(bw *bufio.Writer) error { +// Write writes a Response. +func (res Response) Write(bw *bufio.Writer) error { if res.StatusMessage == "" { if status, ok := statusMessages[res.StatusCode]; ok { res.StatusMessage = status diff --git a/response_test.go b/base/response_test.go similarity index 99% rename from response_test.go rename to base/response_test.go index 81a3c9e6..295f7003 100644 --- a/response_test.go +++ b/base/response_test.go @@ -1,4 +1,4 @@ -package gortsplib +package base import ( "bufio" diff --git a/base/utils.go b/base/utils.go new file mode 100644 index 00000000..eae7e4b4 --- /dev/null +++ b/base/utils.go @@ -0,0 +1,77 @@ +package base + +import ( + "bufio" + "fmt" + "io" + "strconv" +) + +const ( + rtspMaxContentLength = 4096 +) + +func readByteEqual(rb *bufio.Reader, cmp byte) error { + byt, err := rb.ReadByte() + if err != nil { + return err + } + + if byt != cmp { + return fmt.Errorf("expected '%c', got '%c'", cmp, byt) + } + + return nil +} + +func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) { + for i := 1; i <= n; i++ { + byts, err := rb.Peek(i) + if err != nil { + return nil, err + } + + if byts[len(byts)-1] == delim { + rb.Discard(len(byts)) + return byts, nil + } + } + return nil, fmt.Errorf("buffer length exceeds %d", n) +} + +func readContent(rb *bufio.Reader, header Header) ([]byte, error) { + cls, ok := header["Content-Length"] + if !ok || len(cls) != 1 { + return nil, nil + } + + cl, err := strconv.ParseInt(cls[0], 10, 64) + if err != nil { + return nil, fmt.Errorf("invalid Content-Length") + } + + if cl > rtspMaxContentLength { + return nil, fmt.Errorf("Content-Length exceeds %d", rtspMaxContentLength) + } + + ret := make([]byte, cl) + n, err := io.ReadFull(rb, ret) + if err != nil && n != len(ret) { + return nil, err + } + + return ret, nil +} + +func writeContent(bw *bufio.Writer, content []byte) error { + if len(content) == 0 { + return nil + } + + _, err := bw.Write(content) + if err != nil { + return err + } + + return nil +} diff --git a/connclient.go b/connclient.go index 639441e0..502d638a 100644 --- a/connclient.go +++ b/connclient.go @@ -17,6 +17,8 @@ import ( "strings" "sync/atomic" "time" + + "github.com/aler9/gortsplib/base" ) const ( @@ -119,14 +121,15 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { udpLastFrameTimes: make(map[int]*int64), udpRtpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener), + tcpFrames: newMultiFrame(conf.ReadBufferCount, clientTCPFrameReadBufferSize), }, nil } // Close closes all the ConnClient resources. func (c *ConnClient) Close() error { if c.state == connClientStateReading { - c.Do(&Request{ - Method: TEARDOWN, + c.Do(&base.Request{ + Method: base.TEARDOWN, Url: c.streamUrl, SkipResponse: true, }) @@ -167,89 +170,82 @@ func (c *ConnClient) NetConn() net.Conn { } func (c *ConnClient) readFrameTCPOrResponse() (interface{}, error) { + frame := c.tcpFrames.next() + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - b, err := c.br.ReadByte() - if err != nil { - return nil, err - } - c.br.UnreadByte() - - if b == interleavedFrameMagicByte { - frame := c.tcpFrames.next() - err := frame.Read(c.br) - if err != nil { - return nil, err - } - return frame, err - } - - return ReadResponse(c.br) + return base.ReadInterleavedFrameOrResponse(frame, c.br) } // ReadFrameTCP reads an InterleavedFrame. // This can't be used when recording. -func (c *ConnClient) ReadFrameTCP() (*InterleavedFrame, error) { - c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) +func (c *ConnClient) ReadFrameTCP() (int, StreamType, []byte, error) { frame := c.tcpFrames.next() + + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) err := frame.Read(c.br) if err != nil { - return nil, err + return 0, 0, nil, err } c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) - return frame, nil + return frame.TrackId, frame.StreamType, frame.Content, nil } // ReadFrameUDP reads an UDP frame. -func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) { +func (c *ConnClient) ReadFrameUDP(trackId int, streamType StreamType) ([]byte, error) { var buf []byte var err error if streamType == StreamTypeRtp { - buf, err = c.udpRtpListeners[track.Id].read() + buf, err = c.udpRtpListeners[trackId].read() if err != nil { return nil, err } } else { - buf, err = c.udpRtcpListeners[track.Id].read() + buf, err = c.udpRtcpListeners[trackId].read() if err != nil { return nil, err } } - atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) + atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix()) - c.rtcpReceivers[track.Id].OnFrame(streamType, buf) + c.rtcpReceivers[trackId].OnFrame(streamType, buf) return buf, nil } // WriteFrameTCP writes an interleaved frame. // this can't be used when playing. -func (c *ConnClient) WriteFrameTCP(frame *InterleavedFrame) error { +func (c *ConnClient) WriteFrameTCP(trackId int, streamType StreamType, content []byte) error { + frame := base.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: content, + } + c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) return frame.Write(c.bw) } // WriteFrameUDP writes an UDP frame. -func (c *ConnClient) WriteFrameUDP(track *Track, streamType StreamType, content []byte) error { +func (c *ConnClient) WriteFrameUDP(trackId int, streamType StreamType, content []byte) error { if streamType == StreamTypeRtp { - return c.udpRtpListeners[track.Id].write(content) + return c.udpRtpListeners[trackId].write(content) } - - return c.udpRtcpListeners[track.Id].write(content) + return c.udpRtcpListeners[trackId].write(content) } // Do writes a Request and reads a Response. Interleaved frames sent before the // response are ignored. -func (c *ConnClient) Do(req *Request) (*Response, error) { +func (c *ConnClient) Do(req *base.Request) (*base.Response, error) { if req.Header == nil { - req.Header = make(Header) + req.Header = make(base.Header) } // insert session if c.session != "" { - req.Header["Session"] = HeaderValue{c.session} + req.Header["Session"] = base.HeaderValue{c.session} } // insert auth @@ -266,7 +262,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { // insert cseq c.cseq += 1 - req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} + req.Header["CSeq"] = base.HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) err := req.Write(c.bw) @@ -282,14 +278,14 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { // interleaved frames are sent in two situations: // * when the server is v4lrtspserver, before the PLAY response // * when the stream is already playing - res, err := func() (*Response, error) { + res, err := func() (*base.Response, error) { for { recv, err := c.readFrameTCPOrResponse() if err != nil { return nil, err } - if res, ok := recv.(*Response); ok { + if res, ok := recv.(*base.Response); ok { return res, nil } } @@ -308,7 +304,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { } // setup authentication - if res.StatusCode == StatusUnauthorized && req.Url.User != nil && c.auth == nil { + if res.StatusCode == base.StatusUnauthorized && req.Url.User != nil && c.auth == nil { pass, _ := req.Url.User.Password() auth, err := newAuthClient(res.Header["WWW-Authenticate"], req.Url.User.Username(), pass) if err != nil { @@ -326,13 +322,13 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { // Options writes an OPTIONS request and reads a response, that contains // the methods allowed by the server. Since this method is not implemented by // every RTSP server, the function does not fail if the returned code is StatusNotFound. -func (c *ConnClient) Options(u *url.URL) (*Response, error) { +func (c *ConnClient) Options(u *url.URL) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } - res, err := c.Do(&Request{ - Method: OPTIONS, + res, err := c.Do(&base.Request{ + Method: base.OPTIONS, // strip path Url: &url.URL{ Scheme: "rtsp", @@ -345,7 +341,7 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) { return nil, err } - if res.StatusCode != StatusOK && res.StatusCode != StatusNotFound { + if res.StatusCode != base.StatusOK && res.StatusCode != base.StatusNotFound { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } @@ -353,23 +349,23 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) { } // Describe writes a DESCRIBE request and reads a Response. -func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) { +func (c *ConnClient) Describe(u *url.URL) (Tracks, *base.Response, error) { if c.state != connClientStateInitial { return nil, nil, fmt.Errorf("can't be called when reading or publishing") } - res, err := c.Do(&Request{ - Method: DESCRIBE, + res, err := c.Do(&base.Request{ + Method: base.DESCRIBE, Url: u, - Header: Header{ - "Accept": HeaderValue{"application/sdp"}, + Header: base.Header{ + "Accept": base.HeaderValue{"application/sdp"}, }, }) if err != nil { return nil, nil, err } - if res.StatusCode != StatusOK { + if res.StatusCode != base.StatusOK { return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } @@ -451,11 +447,11 @@ func (c *ConnClient) urlForTrack(baseUrl *url.URL, mode SetupMode, track *Track) return u } -func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderTransport) (*Response, error) { - res, err := c.Do(&Request{ - Method: SETUP, +func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderTransport) (*base.Response, error) { + res, err := c.Do(&base.Request{ + Method: base.SETUP, Url: c.urlForTrack(u, mode, track), - Header: Header{ + Header: base.Header{ "Transport": ht.Write(), }, }) @@ -463,7 +459,7 @@ func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderT return nil, err } - if res.StatusCode != StatusOK { + if res.StatusCode != base.StatusOK { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } @@ -473,7 +469,7 @@ func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderT // SetupUDP writes a SETUP request and reads a Response. // If rtpPort and rtcpPort are zero, they are be chosen automatically. func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort int, - rtcpPort int) (*Response, error) { + rtcpPort int) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } @@ -597,7 +593,7 @@ func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort } // SetupTCP writes a SETUP request and reads a Response. -func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Response, error) { +func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } @@ -657,7 +653,7 @@ func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Respon // Play writes a PLAY request and reads a Response // This function can be called only after SetupUDP() or SetupTCP(). -func (c *ConnClient) Play(u *url.URL) (*Response, error) { +func (c *ConnClient) Play(u *url.URL) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } @@ -670,19 +666,15 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { fmt.Errorf("must be called with the same url used for SetupUDP() or SetupTCP()") } - if *c.streamProtocol == StreamProtocolTCP { - c.tcpFrames = newMultiFrame(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize) - } - - res, err := c.Do(&Request{ - Method: PLAY, + res, err := c.Do(&base.Request{ + Method: base.PLAY, Url: u, }) if err != nil { return nil, err } - if res.StatusCode != StatusOK { + if res.StatusCode != base.StatusOK { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } @@ -720,11 +712,7 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { c.udpRtcpListeners[trackId].write(frame) } else { - c.WriteFrameTCP(&InterleavedFrame{ - TrackId: trackId, - StreamType: StreamTypeRtcp, - Content: frame, - }) + c.WriteFrameTCP(trackId, StreamTypeRtcp, frame) } } } @@ -750,7 +738,7 @@ func (c *ConnClient) LoopUDP() error { go func() { for { c.nconn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout)) - _, err := ReadResponse(c.br) + _, err := base.ReadResponse(c.br) if err != nil { readDone <- err return @@ -771,8 +759,8 @@ func (c *ConnClient) LoopUDP() error { return err case <-keepaliveTicker.C: - _, err := c.Do(&Request{ - Method: OPTIONS, + _, err := c.Do(&base.Request{ + Method: base.OPTIONS, Url: &url.URL{ Scheme: "rtsp", Host: c.streamUrl.Host, @@ -804,16 +792,16 @@ func (c *ConnClient) LoopUDP() error { } // Announce writes an ANNOUNCE request and reads a Response. -func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) { +func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*base.Response, error) { if c.streamUrl != nil { fmt.Errorf("announce has already been sent with another url url") } - res, err := c.Do(&Request{ - Method: ANNOUNCE, + res, err := c.Do(&base.Request{ + Method: base.ANNOUNCE, Url: u, - Header: Header{ - "Content-Type": HeaderValue{"application/sdp"}, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, }, Content: tracks.Write(), }) @@ -821,7 +809,7 @@ func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) { return nil, err } - if res.StatusCode != StatusOK { + if res.StatusCode != base.StatusOK { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } @@ -831,7 +819,7 @@ func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) { } // Record writes a RECORD request and reads a Response. -func (c *ConnClient) Record(u *url.URL) (*Response, error) { +func (c *ConnClient) Record(u *url.URL) (*base.Response, error) { if c.state != connClientStateInitial { return nil, fmt.Errorf("can't be called when reading or publishing") } @@ -840,15 +828,15 @@ func (c *ConnClient) Record(u *url.URL) (*Response, error) { return nil, fmt.Errorf("must be called with the same url used for Announce()") } - res, err := c.Do(&Request{ - Method: RECORD, + res, err := c.Do(&base.Request{ + Method: base.RECORD, Url: u, }) if err != nil { return nil, err } - if res.StatusCode != StatusOK { + if res.StatusCode != base.StatusOK { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } diff --git a/connclientdial_test.go b/connclientdial_test.go index 4d378273..c7625d4a 100644 --- a/connclientdial_test.go +++ b/connclientdial_test.go @@ -119,7 +119,7 @@ func TestConnClientDialReadUDP(t *testing.T) { time.Sleep(1 * time.Second) - conn, tracks, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP) + conn, _, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP) require.NoError(t, err) defer conn.Close() @@ -131,7 +131,7 @@ func TestConnClientDialReadUDP(t *testing.T) { conn.LoopUDP() }() - _, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp) + _, err = conn.ReadFrameUDP(0, StreamTypeRtp) require.NoError(t, err) conn.CloseUDPListeners() @@ -162,8 +162,11 @@ func TestConnClientDialReadTCP(t *testing.T) { require.NoError(t, err) defer conn.Close() - _, err = conn.ReadFrameTCP() + id, typ, _, err := conn.ReadFrameTCP() require.NoError(t, err) + + require.Equal(t, 0, id) + require.Equal(t, StreamTypeRtp, typ) } func TestConnClientDialPublishUDP(t *testing.T) { @@ -212,7 +215,7 @@ func TestConnClientDialPublishUDP(t *testing.T) { break } - err = conn.WriteFrameUDP(track, StreamTypeRtp, buf[:n]) + err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n]) if err != nil { break } @@ -281,11 +284,7 @@ func TestConnClientDialPublishTCP(t *testing.T) { break } - err = conn.WriteFrameTCP(&InterleavedFrame{ - TrackId: track.Id, - StreamType: StreamTypeRtp, - Content: buf[:n], - }) + err = conn.WriteFrameTCP(track.Id, StreamTypeRtp, buf[:n]) if err != nil { break } diff --git a/connserver.go b/connserver.go index e77e3c93..cb844d86 100644 --- a/connserver.go +++ b/connserver.go @@ -4,6 +4,8 @@ import ( "bufio" "net" "time" + + "github.com/aler9/gortsplib/base" ) const ( @@ -70,9 +72,9 @@ func (s *ConnServer) NetConn() net.Conn { } // ReadRequest reads a Request. -func (s *ConnServer) ReadRequest() (*Request, error) { +func (s *ConnServer) ReadRequest() (*base.Request, error) { s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline - return ReadRequest(s.br) + return base.ReadRequest(s.br) } // ReadFrameTCPOrRequest reads an InterleavedFrame or a Request. @@ -81,32 +83,24 @@ func (s *ConnServer) ReadFrameTCPOrRequest(timeout bool) (interface{}, error) { s.conf.Conn.SetReadDeadline(time.Now().Add(s.conf.ReadTimeout)) } - b, err := s.br.ReadByte() - if err != nil { - return nil, err - } - s.br.UnreadByte() - - if b == interleavedFrameMagicByte { - frame := s.tcpFrames.next() - err := frame.Read(s.br) - if err != nil { - return nil, err - } - return frame, err - } - - return ReadRequest(s.br) + frame := s.tcpFrames.next() + return base.ReadInterleavedFrameOrRequest(frame, s.br) } // WriteResponse writes a Response. -func (s *ConnServer) WriteResponse(res *Response) error { +func (s *ConnServer) WriteResponse(res *base.Response) error { s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) return res.Write(s.bw) } // WriteFrameTCP writes an InterleavedFrame. -func (s *ConnServer) WriteFrameTCP(frame *InterleavedFrame) error { +func (s *ConnServer) WriteFrameTCP(trackId int, streamType StreamType, content []byte) error { + frame := base.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: content, + } + s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) return frame.Write(s.bw) } diff --git a/examples/client-publish-tcp.go b/examples/client-publish-tcp.go index e9afc8bd..f9429939 100644 --- a/examples/client-publish-tcp.go +++ b/examples/client-publish-tcp.go @@ -96,11 +96,7 @@ func main() { } // write frames to the server - err = conn.WriteFrameTCP(&gortsplib.InterleavedFrame{ - TrackId: track.Id, - StreamType: gortsplib.StreamTypeRtp, - Content: buf[:n], - }) + err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) if err != nil { break } diff --git a/examples/client-publish-udp.go b/examples/client-publish-udp.go index 98097eb0..01c6c4e9 100644 --- a/examples/client-publish-udp.go +++ b/examples/client-publish-udp.go @@ -96,7 +96,7 @@ func main() { } // write frames to the server - err = conn.WriteFrameUDP(track, gortsplib.StreamTypeRtp, buf[:n]) + err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n]) if err != nil { break } diff --git a/examples/client-read-tcp.go b/examples/client-read-tcp.go index 3d084f38..14899213 100644 --- a/examples/client-read-tcp.go +++ b/examples/client-read-tcp.go @@ -21,13 +21,13 @@ func main() { for { // read frames - frame, err := conn.ReadFrameTCP() + id, typ, buf, err := conn.ReadFrameTCP() if err != nil { fmt.Println("connection is closed (%s)", err) break } fmt.Printf("frame from track %d, type %v: %v\n", - frame.TrackId, frame.StreamType, frame.Content) + id, typ, buf) } } diff --git a/examples/client-read-udp.go b/examples/client-read-udp.go index 6201f1e4..eb3c797b 100644 --- a/examples/client-read-udp.go +++ b/examples/client-read-udp.go @@ -24,36 +24,36 @@ func main() { defer wg.Wait() defer conn.CloseUDPListeners() - for _, track := range tracks { + for trackId := range tracks { // read RTP frames wg.Add(1) - go func(track *gortsplib.Track) { + go func(trackId int) { defer wg.Done() for { - buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtp) + buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtp) if err != nil { break } - fmt.Printf("frame from track %d, type RTP: %v\n", track.Id, buf) + fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf) } - }(track) + }(trackId) // read RTCP frames wg.Add(1) - go func(track *gortsplib.Track) { + go func(trackId int) { defer wg.Done() for { - buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtcp) + buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtcp) if err != nil { break } - fmt.Printf("frame from track %d, type RTCP: %v\n", track.Id, buf) + fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf) } - }(track) + }(trackId) } err = conn.LoopUDP() diff --git a/headerauth.go b/headerauth.go index 4e9895e6..70635df0 100644 --- a/headerauth.go +++ b/headerauth.go @@ -3,6 +3,8 @@ package gortsplib import ( "fmt" "strings" + + "github.com/aler9/gortsplib/base" ) // HeaderAuth is an Authenticate or a WWWW-Authenticate header. @@ -66,7 +68,7 @@ func findValue(v0 string) (string, string, error) { } // ReadHeaderAuth parses an Authenticate or a WWW-Authenticate header. -func ReadHeaderAuth(v HeaderValue) (*HeaderAuth, error) { +func ReadHeaderAuth(v base.HeaderValue) (*HeaderAuth, error) { if len(v) == 0 { return nil, fmt.Errorf("value not provided") } @@ -154,7 +156,7 @@ func ReadHeaderAuth(v HeaderValue) (*HeaderAuth, error) { } // Write encodes an Authenticate or a WWW-Authenticate header. -func (ha *HeaderAuth) Write() HeaderValue { +func (ha *HeaderAuth) Write() base.HeaderValue { ret := "" switch ha.Method { @@ -203,5 +205,5 @@ func (ha *HeaderAuth) Write() HeaderValue { ret += strings.Join(vals, ", ") - return HeaderValue{ret} + return base.HeaderValue{ret} } diff --git a/headerauth_test.go b/headerauth_test.go index c3e26b3c..67e2b340 100644 --- a/headerauth_test.go +++ b/headerauth_test.go @@ -4,18 +4,20 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/base" ) var casesHeaderAuth = []struct { name string - vin HeaderValue - vout HeaderValue + vin base.HeaderValue + vout base.HeaderValue h *HeaderAuth }{ { "basic", - HeaderValue{`Basic realm="4419b63f5e51"`}, - HeaderValue{`Basic realm="4419b63f5e51"`}, + base.HeaderValue{`Basic realm="4419b63f5e51"`}, + base.HeaderValue{`Basic realm="4419b63f5e51"`}, &HeaderAuth{ Method: Basic, Realm: func() *string { @@ -26,8 +28,8 @@ var casesHeaderAuth = []struct { }, { "digest request 1", - HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, - HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, + base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, + base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, &HeaderAuth{ Method: Digest, Realm: func() *string { @@ -46,8 +48,8 @@ var casesHeaderAuth = []struct { }, { "digest request 2", - HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale=FALSE`}, - HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, + base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale=FALSE`}, + base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, &HeaderAuth{ Method: Digest, Realm: func() *string { @@ -66,8 +68,8 @@ var casesHeaderAuth = []struct { }, { "digest request 3", - HeaderValue{`Digest realm="4419b63f5e51",nonce="133767111917411116111311118211673010032", stale="FALSE"`}, - HeaderValue{`Digest realm="4419b63f5e51", nonce="133767111917411116111311118211673010032", stale="FALSE"`}, + base.HeaderValue{`Digest realm="4419b63f5e51",nonce="133767111917411116111311118211673010032", stale="FALSE"`}, + base.HeaderValue{`Digest realm="4419b63f5e51", nonce="133767111917411116111311118211673010032", stale="FALSE"`}, &HeaderAuth{ Method: Digest, Realm: func() *string { @@ -86,8 +88,8 @@ var casesHeaderAuth = []struct { }, { "digest response generic", - HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, - HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, + base.HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, + base.HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, &HeaderAuth{ Method: Digest, Username: func() *string { @@ -114,8 +116,8 @@ var casesHeaderAuth = []struct { }, { "digest response with empty field", - HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, - HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, + base.HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, + base.HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, &HeaderAuth{ Method: Digest, Username: func() *string { @@ -142,8 +144,8 @@ var casesHeaderAuth = []struct { }, { "digest response with no spaces and additional fields", - HeaderValue{`Digest realm="Please log in with a valid username",nonce="752a62306daf32b401a41004555c7663",opaque="",stale=FALSE,algorithm=MD5`}, - HeaderValue{`Digest realm="Please log in with a valid username", nonce="752a62306daf32b401a41004555c7663", opaque="", stale="FALSE", algorithm="MD5"`}, + base.HeaderValue{`Digest realm="Please log in with a valid username",nonce="752a62306daf32b401a41004555c7663",opaque="",stale=FALSE,algorithm=MD5`}, + base.HeaderValue{`Digest realm="Please log in with a valid username", nonce="752a62306daf32b401a41004555c7663", opaque="", stale="FALSE", algorithm="MD5"`}, &HeaderAuth{ Method: Digest, Realm: func() *string { diff --git a/headersession.go b/headersession.go index f240f1fe..57ebf31d 100644 --- a/headersession.go +++ b/headersession.go @@ -4,6 +4,8 @@ import ( "fmt" "strconv" "strings" + + "github.com/aler9/gortsplib/base" ) // HeaderSession is a Session header. @@ -16,7 +18,7 @@ type HeaderSession struct { } // ReadHeaderSession parses a Session header. -func ReadHeaderSession(v HeaderValue) (*HeaderSession, error) { +func ReadHeaderSession(v base.HeaderValue) (*HeaderSession, error) { if len(v) == 0 { return nil, fmt.Errorf("value not provided") } @@ -61,12 +63,12 @@ func ReadHeaderSession(v HeaderValue) (*HeaderSession, error) { } // Write encodes a Session header -func (hs *HeaderSession) Write() HeaderValue { +func (hs *HeaderSession) Write() base.HeaderValue { val := hs.Session if hs.Timeout != nil { val += ";timeout=" + strconv.FormatUint(uint64(*hs.Timeout), 10) } - return HeaderValue{val} + return base.HeaderValue{val} } diff --git a/headersession_test.go b/headersession_test.go index e19988f8..b1417abc 100644 --- a/headersession_test.go +++ b/headersession_test.go @@ -4,26 +4,28 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/base" ) var casesHeaderSession = []struct { name string - vin HeaderValue - vout HeaderValue + vin base.HeaderValue + vout base.HeaderValue h *HeaderSession }{ { "base", - HeaderValue{`A3eqwsafq3rFASqew`}, - HeaderValue{`A3eqwsafq3rFASqew`}, + base.HeaderValue{`A3eqwsafq3rFASqew`}, + base.HeaderValue{`A3eqwsafq3rFASqew`}, &HeaderSession{ Session: "A3eqwsafq3rFASqew", }, }, { "with timeout", - HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, - HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, + base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, + base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, &HeaderSession{ Session: "A3eqwsafq3rFASqew", Timeout: func() *uint { @@ -34,8 +36,8 @@ var casesHeaderSession = []struct { }, { "with timeout and space", - HeaderValue{`A3eqwsafq3rFASqew; timeout=47`}, - HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, + base.HeaderValue{`A3eqwsafq3rFASqew; timeout=47`}, + base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, &HeaderSession{ Session: "A3eqwsafq3rFASqew", Timeout: func() *uint { diff --git a/headertransport.go b/headertransport.go index b885b9d2..8a6d2891 100644 --- a/headertransport.go +++ b/headertransport.go @@ -4,6 +4,8 @@ import ( "fmt" "strconv" "strings" + + "github.com/aler9/gortsplib/base" ) // HeaderTransport is a Transport header. @@ -56,7 +58,7 @@ func parsePorts(val string) (*[2]int, error) { } // ReadHeaderTransport parses a Transport header. -func ReadHeaderTransport(v HeaderValue) (*HeaderTransport, error) { +func ReadHeaderTransport(v base.HeaderValue) (*HeaderTransport, error) { if len(v) == 0 { return nil, fmt.Errorf("value not provided") } @@ -151,7 +153,7 @@ func ReadHeaderTransport(v HeaderValue) (*HeaderTransport, error) { } // Write encodes a Transport header -func (ht *HeaderTransport) Write() HeaderValue { +func (ht *HeaderTransport) Write() base.HeaderValue { var vals []string if ht.Protocol == StreamProtocolUDP { @@ -187,5 +189,5 @@ func (ht *HeaderTransport) Write() HeaderValue { vals = append(vals, "mode="+*ht.Mode) } - return HeaderValue{strings.Join(vals, ";")} + return base.HeaderValue{strings.Join(vals, ";")} } diff --git a/headertransport_test.go b/headertransport_test.go index ff6c4939..72798676 100644 --- a/headertransport_test.go +++ b/headertransport_test.go @@ -4,18 +4,20 @@ import ( "testing" "github.com/stretchr/testify/require" + + "github.com/aler9/gortsplib/base" ) var casesHeaderTransport = []struct { name string - vin HeaderValue - vout HeaderValue + vin base.HeaderValue + vout base.HeaderValue h *HeaderTransport }{ { "udp unicast play request", - HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode="PLAY"`}, - HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode=play`}, + base.HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode="PLAY"`}, + base.HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode=play`}, &HeaderTransport{ Protocol: StreamProtocolUDP, Cast: func() *StreamCast { @@ -31,8 +33,8 @@ var casesHeaderTransport = []struct { }, { "udp unicast play response", - HeaderValue{`RTP/AVP/UDP;unicast;client_port=3056-3057;server_port=5000-5001`}, - HeaderValue{`RTP/AVP;unicast;client_port=3056-3057;server_port=5000-5001`}, + base.HeaderValue{`RTP/AVP/UDP;unicast;client_port=3056-3057;server_port=5000-5001`}, + base.HeaderValue{`RTP/AVP;unicast;client_port=3056-3057;server_port=5000-5001`}, &HeaderTransport{ Protocol: StreamProtocolUDP, Cast: func() *StreamCast { @@ -45,8 +47,8 @@ var casesHeaderTransport = []struct { }, { "udp multicast play request / response", - HeaderValue{`RTP/AVP;multicast;destination=225.219.201.15;port=7000-7001;ttl=127`}, - HeaderValue{`RTP/AVP;multicast`}, + base.HeaderValue{`RTP/AVP;multicast;destination=225.219.201.15;port=7000-7001;ttl=127`}, + base.HeaderValue{`RTP/AVP;multicast`}, &HeaderTransport{ Protocol: StreamProtocolUDP, Cast: func() *StreamCast { @@ -66,8 +68,8 @@ var casesHeaderTransport = []struct { }, { "tcp play request / response", - HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, - HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, + base.HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, + base.HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, &HeaderTransport{ Protocol: StreamProtocolTCP, InterleavedIds: &[2]int{0, 1}, diff --git a/multibuffer.go b/multibuffer.go index 7a332caf..94561441 100644 --- a/multibuffer.go +++ b/multibuffer.go @@ -1,5 +1,9 @@ package gortsplib +import ( + "github.com/aler9/gortsplib/base" +) + // MultiBuffer implements software multi buffering, that allows to reuse // existing buffers without creating new ones, increasing performance. type MultiBuffer struct { @@ -33,14 +37,14 @@ func (mb *MultiBuffer) Next() []byte { type multiFrame struct { count int - frames []*InterleavedFrame + frames []*base.InterleavedFrame cur int } func newMultiFrame(count int, bufsize int) *multiFrame { - frames := make([]*InterleavedFrame, count) + frames := make([]*base.InterleavedFrame, count) for i := 0; i < count; i++ { - frames[i] = &InterleavedFrame{ + frames[i] = &base.InterleavedFrame{ Content: make([]byte, 0, bufsize), } } @@ -51,7 +55,7 @@ func newMultiFrame(count int, bufsize int) *multiFrame { } } -func (mf *multiFrame) next() *InterleavedFrame { +func (mf *multiFrame) next() *base.InterleavedFrame { ret := mf.frames[mf.cur] mf.cur += 1 if mf.cur >= mf.count { diff --git a/utils.go b/utils.go index 9239e799..068f7860 100644 --- a/utils.go +++ b/utils.go @@ -1,14 +1,7 @@ package gortsplib import ( - "bufio" - "fmt" - "io" - "strconv" -) - -const ( - rtspMaxContentLength = 4096 + "github.com/aler9/gortsplib/base" ) // StreamProtocol is the protocol of a stream. @@ -22,7 +15,7 @@ const ( StreamProtocolTCP ) -// String implements fmt.Stringer +// String implements fmt.Stringer. func (sp StreamProtocol) String() string { switch sp { case StreamProtocolUDP: @@ -45,7 +38,7 @@ const ( StreamMulticast ) -// String implements fmt.Stringer +// String implements fmt.Stringer. func (sc StreamCast) String() string { switch sc { case StreamUnicast: @@ -58,29 +51,17 @@ func (sc StreamCast) String() string { } // StreamType is the stream type. -type StreamType int +type StreamType = base.StreamType const ( // StreamTypeRtp means that the stream contains RTP packets - StreamTypeRtp StreamType = iota + StreamTypeRtp = base.StreamTypeRtp // StreamTypeRtcp means that the stream contains RTCP packets - StreamTypeRtcp + StreamTypeRtcp = base.StreamTypeRtcp ) -// String implements fmt.Stringer -func (st StreamType) String() string { - switch st { - case StreamTypeRtp: - return "RTP" - - case StreamTypeRtcp: - return "RTCP" - } - return "unknown" -} - -// SetupMode is a setup mode. +// SetupMode is the setup mode. type SetupMode int const ( @@ -91,7 +72,7 @@ const ( SetupModeRecord ) -// String implements fmt.Stringer +// String implements fmt.Stringer. func (sm SetupMode) String() string { switch sm { case SetupModePlay: @@ -102,68 +83,3 @@ func (sm SetupMode) String() string { } return "unknown" } - -func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) { - for i := 1; i <= n; i++ { - byts, err := rb.Peek(i) - if err != nil { - return nil, err - } - - if byts[len(byts)-1] == delim { - rb.Discard(len(byts)) - return byts, nil - } - } - return nil, fmt.Errorf("buffer length exceeds %d", n) -} - -func readByteEqual(rb *bufio.Reader, cmp byte) error { - byt, err := rb.ReadByte() - if err != nil { - return err - } - - if byt != cmp { - return fmt.Errorf("expected '%c', got '%c'", cmp, byt) - } - - return nil -} - -func readContent(rb *bufio.Reader, header Header) ([]byte, error) { - cls, ok := header["Content-Length"] - if !ok || len(cls) != 1 { - return nil, nil - } - - cl, err := strconv.ParseInt(cls[0], 10, 64) - if err != nil { - return nil, fmt.Errorf("invalid Content-Length") - } - - if cl > rtspMaxContentLength { - return nil, fmt.Errorf("Content-Length exceeds %d", rtspMaxContentLength) - } - - ret := make([]byte, cl) - n, err := io.ReadFull(rb, ret) - if err != nil && n != len(ret) { - return nil, err - } - - return ret, nil -} - -func writeContent(bw *bufio.Writer, content []byte) error { - if len(content) == 0 { - return nil - } - - _, err := bw.Write(content) - if err != nil { - return err - } - - return nil -}