move base elements into base folder

This commit is contained in:
aler9
2020-10-04 17:32:26 +02:00
parent 973464ed1d
commit 662138a0cf
28 changed files with 352 additions and 297 deletions

View File

@@ -49,5 +49,5 @@ test:
test-nodocker: test-nodocker:
$(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \ $(foreach IMG,$(shell echo testimages/*/ | xargs -n1 basename), \
docker build -q testimages/$(IMG) -t gortsplib-test-$(IMG)$(NL)) docker build -q testimages/$(IMG) -t gortsplib-test-$(IMG)$(NL))
go test -race -v . go test -race -v ./...
$(foreach f,$(shell ls examples/*),go build -o /dev/null $(f)$(NL)) $(foreach f,$(shell ls examples/*),go build -o /dev/null $(f)$(NL))

View File

@@ -5,6 +5,8 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/base"
) )
var casesAuth = []struct { var casesAuth = []struct {
@@ -33,10 +35,10 @@ func TestAuthMethods(t *testing.T) {
ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass") ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass")
require.NoError(t, err) require.NoError(t, err)
authorization := ac.GenerateHeader(ANNOUNCE, authorization := ac.GenerateHeader(base.ANNOUNCE,
&url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"}) &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"})
err = authServer.ValidateHeader(authorization, ANNOUNCE, err = authServer.ValidateHeader(authorization, base.ANNOUNCE,
&url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"}) &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath"})
require.NoError(t, err) require.NoError(t, err)
}) })
@@ -49,10 +51,10 @@ func TestAuthBasePath(t *testing.T) {
ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass") ac, err := newAuthClient(wwwAuthenticate, "testuser", "testpass")
require.NoError(t, err) require.NoError(t, err)
authorization := ac.GenerateHeader(ANNOUNCE, authorization := ac.GenerateHeader(base.ANNOUNCE,
&url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/"}) &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/"})
err = authServer.ValidateHeader(authorization, ANNOUNCE, err = authServer.ValidateHeader(authorization, base.ANNOUNCE,
&url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/trackId=0"}) &url.URL{Scheme: "rtsp", Host: "myhost", Path: "mypath/trackId=0"})
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"strings" "strings"
"github.com/aler9/gortsplib/base"
) )
// authClient is an object that helps a client to send its credentials to a // authClient is an object that helps a client to send its credentials to a
@@ -19,7 +21,7 @@ type authClient struct {
// newAuthClient allocates an authClient. // newAuthClient allocates an authClient.
// header is the WWW-Authenticate header provided by the server. // header is the WWW-Authenticate header provided by the server.
func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error) { func newAuthClient(v base.HeaderValue, user string, pass string) (*authClient, error) {
// prefer digest // prefer digest
if headerAuthDigest := func() string { if headerAuthDigest := func() string {
for _, vi := range v { for _, vi := range v {
@@ -29,7 +31,7 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error)
} }
return "" return ""
}(); headerAuthDigest != "" { }(); headerAuthDigest != "" {
auth, err := ReadHeaderAuth(HeaderValue{headerAuthDigest}) auth, err := ReadHeaderAuth(base.HeaderValue{headerAuthDigest})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -59,7 +61,7 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error)
} }
return "" return ""
}(); headerAuthBasic != "" { }(); headerAuthBasic != "" {
auth, err := ReadHeaderAuth(HeaderValue{headerAuthBasic}) auth, err := ReadHeaderAuth(base.HeaderValue{headerAuthBasic})
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -81,12 +83,12 @@ func newAuthClient(v HeaderValue, user string, pass string) (*authClient, error)
// GenerateHeader generates an Authorization Header that allows to authenticate a request with // GenerateHeader generates an Authorization Header that allows to authenticate a request with
// the given method and url. // the given method and url.
func (ac *authClient) GenerateHeader(method Method, ur *url.URL) HeaderValue { func (ac *authClient) GenerateHeader(method base.Method, ur *url.URL) base.HeaderValue {
switch ac.method { switch ac.method {
case Basic: case Basic:
response := base64.StdEncoding.EncodeToString([]byte(ac.user + ":" + ac.pass)) response := base64.StdEncoding.EncodeToString([]byte(ac.user + ":" + ac.pass))
return HeaderValue{"Basic " + response} return base.HeaderValue{"Basic " + response}
case Digest: case Digest:
response := md5Hex(md5Hex(ac.user+":"+ac.realm+":"+ac.pass) + ":" + response := md5Hex(md5Hex(ac.user+":"+ac.realm+":"+ac.pass) + ":" +

View File

@@ -7,6 +7,8 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"strings" "strings"
"github.com/aler9/gortsplib/base"
) )
// AuthServer is an object that helps a server to validate the credentials of // AuthServer is an object that helps a server to validate the credentials of
@@ -40,8 +42,8 @@ func NewAuthServer(user string, pass string, methods []AuthMethod) *AuthServer {
} }
// GenerateHeader generates the WWW-Authenticate header needed by a client to log in. // GenerateHeader generates the WWW-Authenticate header needed by a client to log in.
func (as *AuthServer) GenerateHeader() HeaderValue { func (as *AuthServer) GenerateHeader() base.HeaderValue {
var ret HeaderValue var ret base.HeaderValue
for _, m := range as.methods { for _, m := range as.methods {
switch m { switch m {
case Basic: case Basic:
@@ -63,7 +65,7 @@ func (as *AuthServer) GenerateHeader() HeaderValue {
// ValidateHeader validates the Authorization header sent by a client after receiving the // ValidateHeader validates the Authorization header sent by a client after receiving the
// WWW-Authenticate header. // WWW-Authenticate header.
func (as *AuthServer) ValidateHeader(v HeaderValue, method Method, ur *url.URL) error { func (as *AuthServer) ValidateHeader(v base.HeaderValue, method base.Method, ur *url.URL) error {
if len(v) == 0 { if len(v) == 0 {
return fmt.Errorf("authorization header not provided") return fmt.Errorf("authorization header not provided")
} }
@@ -83,7 +85,7 @@ func (as *AuthServer) ValidateHeader(v HeaderValue, method Method, ur *url.URL)
} }
} else if strings.HasPrefix(v0, "Digest ") { } else if strings.HasPrefix(v0, "Digest ") {
auth, err := ReadHeaderAuth(HeaderValue{v0}) auth, err := ReadHeaderAuth(base.HeaderValue{v0})
if err != nil { if err != nil {
return err return err
} }

24
base/defs.go Normal file
View File

@@ -0,0 +1,24 @@
package base
// StreamType is the stream type.
type StreamType int
const (
// StreamTypeRtp means that the stream contains RTP packets
StreamTypeRtp StreamType = iota
// StreamTypeRtcp means that the stream contains RTCP packets
StreamTypeRtcp
)
// String implements fmt.Stringer
func (st StreamType) String() string {
switch st {
case StreamTypeRtp:
return "RTP"
case StreamTypeRtcp:
return "RTCP"
}
return "unknown"
}

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"
@@ -11,7 +11,45 @@ const (
interleavedFrameMagicByte = 0x24 interleavedFrameMagicByte = 0x24
) )
// InterleavedFrame is a structure that allows to transfer binary data // ReadInterleavedFrameOrResponse reads an InterleavedFrame or a Response.
func ReadInterleavedFrameOrResponse(frame *InterleavedFrame, br *bufio.Reader) (interface{}, error) {
b, err := br.ReadByte()
if err != nil {
return nil, err
}
br.UnreadByte()
if b == interleavedFrameMagicByte {
err := frame.Read(br)
if err != nil {
return nil, err
}
return frame, err
}
return ReadResponse(br)
}
// ReadInterleavedFrameOrRequest reads an InterleavedFrame or a Response.
func ReadInterleavedFrameOrRequest(frame *InterleavedFrame, br *bufio.Reader) (interface{}, error) {
b, err := br.ReadByte()
if err != nil {
return nil, err
}
br.UnreadByte()
if b == interleavedFrameMagicByte {
err := frame.Read(br)
if err != nil {
return nil, err
}
return frame, err
}
return ReadRequest(br)
}
// InterleavedFrame is an interleaved frame, and allows to transfer binary data
// within RTSP/TCP connections. It is used to send and receive RTP and RTCP packets with TCP. // within RTSP/TCP connections. It is used to send and receive RTP and RTCP packets with TCP.
type InterleavedFrame struct { type InterleavedFrame struct {
// track id // track id
@@ -24,7 +62,7 @@ type InterleavedFrame struct {
Content []byte Content []byte
} }
// Read reads an interleaved frame from a buffered reader. // Read reads an interleaved frame.
func (f *InterleavedFrame) Read(br *bufio.Reader) error { func (f *InterleavedFrame) Read(br *bufio.Reader) error {
var header [4]byte var header [4]byte
_, err := io.ReadFull(br, header[:]) _, err := io.ReadFull(br, header[:])
@@ -61,7 +99,7 @@ func (f *InterleavedFrame) Read(br *bufio.Reader) error {
} }
// Write writes an InterleavedFrame into a buffered writer. // Write writes an InterleavedFrame into a buffered writer.
func (f *InterleavedFrame) Write(bw *bufio.Writer) error { func (f InterleavedFrame) Write(bw *bufio.Writer) error {
// convert TrackId and StreamType into channel // convert TrackId and StreamType into channel
channel := func() uint8 { channel := func() uint8 {
if f.StreamType == StreamTypeRtp { if f.StreamType == StreamTypeRtp {

View File

@@ -1,4 +1,5 @@
package gortsplib // Package base contains the base elements of the RTSP protocol.
package base
import ( import (
"bufio" "bufio"
@@ -52,7 +53,7 @@ type Request struct {
SkipResponse bool SkipResponse bool
} }
// ReadRequest reads a request from a buffered reader. // ReadRequest reads a request.
func ReadRequest(rb *bufio.Reader) (*Request, error) { func ReadRequest(rb *bufio.Reader) (*Request, error) {
req := &Request{} req := &Request{}
@@ -114,8 +115,8 @@ func ReadRequest(rb *bufio.Reader) (*Request, error) {
return req, nil return req, nil
} }
// Write writes a request into a buffered writer. // Write writes a request.
func (req *Request) Write(bw *bufio.Writer) error { func (req Request) Write(bw *bufio.Writer) error {
// remove credentials // remove credentials
u := &url.URL{ u := &url.URL{
Scheme: req.Url.Scheme, Scheme: req.Url.Scheme,

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"
@@ -132,7 +132,7 @@ type Response struct {
Content []byte Content []byte
} }
// ReadResponse reads a response from a buffered reader. // ReadResponse reads a response.
func ReadResponse(rb *bufio.Reader) (*Response, error) { func ReadResponse(rb *bufio.Reader) (*Response, error) {
res := &Response{} res := &Response{}
@@ -186,8 +186,8 @@ func ReadResponse(rb *bufio.Reader) (*Response, error) {
return res, nil return res, nil
} }
// Write writes a Response into a buffered writer. // Write writes a Response.
func (res *Response) Write(bw *bufio.Writer) error { func (res Response) Write(bw *bufio.Writer) error {
if res.StatusMessage == "" { if res.StatusMessage == "" {
if status, ok := statusMessages[res.StatusCode]; ok { if status, ok := statusMessages[res.StatusCode]; ok {
res.StatusMessage = status res.StatusMessage = status

View File

@@ -1,4 +1,4 @@
package gortsplib package base
import ( import (
"bufio" "bufio"

77
base/utils.go Normal file
View File

@@ -0,0 +1,77 @@
package base
import (
"bufio"
"fmt"
"io"
"strconv"
)
const (
rtspMaxContentLength = 4096
)
func readByteEqual(rb *bufio.Reader, cmp byte) error {
byt, err := rb.ReadByte()
if err != nil {
return err
}
if byt != cmp {
return fmt.Errorf("expected '%c', got '%c'", cmp, byt)
}
return nil
}
func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) {
for i := 1; i <= n; i++ {
byts, err := rb.Peek(i)
if err != nil {
return nil, err
}
if byts[len(byts)-1] == delim {
rb.Discard(len(byts))
return byts, nil
}
}
return nil, fmt.Errorf("buffer length exceeds %d", n)
}
func readContent(rb *bufio.Reader, header Header) ([]byte, error) {
cls, ok := header["Content-Length"]
if !ok || len(cls) != 1 {
return nil, nil
}
cl, err := strconv.ParseInt(cls[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid Content-Length")
}
if cl > rtspMaxContentLength {
return nil, fmt.Errorf("Content-Length exceeds %d", rtspMaxContentLength)
}
ret := make([]byte, cl)
n, err := io.ReadFull(rb, ret)
if err != nil && n != len(ret) {
return nil, err
}
return ret, nil
}
func writeContent(bw *bufio.Writer, content []byte) error {
if len(content) == 0 {
return nil
}
_, err := bw.Write(content)
if err != nil {
return err
}
return nil
}

View File

@@ -17,6 +17,8 @@ import (
"strings" "strings"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/aler9/gortsplib/base"
) )
const ( const (
@@ -119,14 +121,15 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
udpLastFrameTimes: make(map[int]*int64), udpLastFrameTimes: make(map[int]*int64),
udpRtpListeners: make(map[int]*connClientUDPListener), udpRtpListeners: make(map[int]*connClientUDPListener),
udpRtcpListeners: make(map[int]*connClientUDPListener), udpRtcpListeners: make(map[int]*connClientUDPListener),
tcpFrames: newMultiFrame(conf.ReadBufferCount, clientTCPFrameReadBufferSize),
}, nil }, nil
} }
// Close closes all the ConnClient resources. // Close closes all the ConnClient resources.
func (c *ConnClient) Close() error { func (c *ConnClient) Close() error {
if c.state == connClientStateReading { if c.state == connClientStateReading {
c.Do(&Request{ c.Do(&base.Request{
Method: TEARDOWN, Method: base.TEARDOWN,
Url: c.streamUrl, Url: c.streamUrl,
SkipResponse: true, SkipResponse: true,
}) })
@@ -167,89 +170,82 @@ func (c *ConnClient) NetConn() net.Conn {
} }
func (c *ConnClient) readFrameTCPOrResponse() (interface{}, error) { func (c *ConnClient) readFrameTCPOrResponse() (interface{}, error) {
frame := c.tcpFrames.next()
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
b, err := c.br.ReadByte() return base.ReadInterleavedFrameOrResponse(frame, c.br)
if err != nil {
return nil, err
}
c.br.UnreadByte()
if b == interleavedFrameMagicByte {
frame := c.tcpFrames.next()
err := frame.Read(c.br)
if err != nil {
return nil, err
}
return frame, err
}
return ReadResponse(c.br)
} }
// ReadFrameTCP reads an InterleavedFrame. // ReadFrameTCP reads an InterleavedFrame.
// This can't be used when recording. // This can't be used when recording.
func (c *ConnClient) ReadFrameTCP() (*InterleavedFrame, error) { func (c *ConnClient) ReadFrameTCP() (int, StreamType, []byte, error) {
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
frame := c.tcpFrames.next() frame := c.tcpFrames.next()
c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
err := frame.Read(c.br) err := frame.Read(c.br)
if err != nil { if err != nil {
return nil, err return 0, 0, nil, err
} }
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
return frame, nil return frame.TrackId, frame.StreamType, frame.Content, nil
} }
// ReadFrameUDP reads an UDP frame. // ReadFrameUDP reads an UDP frame.
func (c *ConnClient) ReadFrameUDP(track *Track, streamType StreamType) ([]byte, error) { func (c *ConnClient) ReadFrameUDP(trackId int, streamType StreamType) ([]byte, error) {
var buf []byte var buf []byte
var err error var err error
if streamType == StreamTypeRtp { if streamType == StreamTypeRtp {
buf, err = c.udpRtpListeners[track.Id].read() buf, err = c.udpRtpListeners[trackId].read()
if err != nil { if err != nil {
return nil, err return nil, err
} }
} else { } else {
buf, err = c.udpRtcpListeners[track.Id].read() buf, err = c.udpRtcpListeners[trackId].read()
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
atomic.StoreInt64(c.udpLastFrameTimes[track.Id], time.Now().Unix()) atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix())
c.rtcpReceivers[track.Id].OnFrame(streamType, buf) c.rtcpReceivers[trackId].OnFrame(streamType, buf)
return buf, nil return buf, nil
} }
// WriteFrameTCP writes an interleaved frame. // WriteFrameTCP writes an interleaved frame.
// this can't be used when playing. // this can't be used when playing.
func (c *ConnClient) WriteFrameTCP(frame *InterleavedFrame) error { func (c *ConnClient) WriteFrameTCP(trackId int, streamType StreamType, content []byte) error {
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: content,
}
c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.Write(c.bw) return frame.Write(c.bw)
} }
// WriteFrameUDP writes an UDP frame. // WriteFrameUDP writes an UDP frame.
func (c *ConnClient) WriteFrameUDP(track *Track, streamType StreamType, content []byte) error { func (c *ConnClient) WriteFrameUDP(trackId int, streamType StreamType, content []byte) error {
if streamType == StreamTypeRtp { if streamType == StreamTypeRtp {
return c.udpRtpListeners[track.Id].write(content) return c.udpRtpListeners[trackId].write(content)
} }
return c.udpRtcpListeners[trackId].write(content)
return c.udpRtcpListeners[track.Id].write(content)
} }
// Do writes a Request and reads a Response. Interleaved frames sent before the // Do writes a Request and reads a Response. Interleaved frames sent before the
// response are ignored. // response are ignored.
func (c *ConnClient) Do(req *Request) (*Response, error) { func (c *ConnClient) Do(req *base.Request) (*base.Response, error) {
if req.Header == nil { if req.Header == nil {
req.Header = make(Header) req.Header = make(base.Header)
} }
// insert session // insert session
if c.session != "" { if c.session != "" {
req.Header["Session"] = HeaderValue{c.session} req.Header["Session"] = base.HeaderValue{c.session}
} }
// insert auth // insert auth
@@ -266,7 +262,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
// insert cseq // insert cseq
c.cseq += 1 c.cseq += 1
req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} req.Header["CSeq"] = base.HeaderValue{strconv.FormatInt(int64(c.cseq), 10)}
c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
err := req.Write(c.bw) err := req.Write(c.bw)
@@ -282,14 +278,14 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
// interleaved frames are sent in two situations: // interleaved frames are sent in two situations:
// * when the server is v4lrtspserver, before the PLAY response // * when the server is v4lrtspserver, before the PLAY response
// * when the stream is already playing // * when the stream is already playing
res, err := func() (*Response, error) { res, err := func() (*base.Response, error) {
for { for {
recv, err := c.readFrameTCPOrResponse() recv, err := c.readFrameTCPOrResponse()
if err != nil { if err != nil {
return nil, err return nil, err
} }
if res, ok := recv.(*Response); ok { if res, ok := recv.(*base.Response); ok {
return res, nil return res, nil
} }
} }
@@ -308,7 +304,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
} }
// setup authentication // setup authentication
if res.StatusCode == StatusUnauthorized && req.Url.User != nil && c.auth == nil { if res.StatusCode == base.StatusUnauthorized && req.Url.User != nil && c.auth == nil {
pass, _ := req.Url.User.Password() pass, _ := req.Url.User.Password()
auth, err := newAuthClient(res.Header["WWW-Authenticate"], req.Url.User.Username(), pass) auth, err := newAuthClient(res.Header["WWW-Authenticate"], req.Url.User.Username(), pass)
if err != nil { if err != nil {
@@ -326,13 +322,13 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
// Options writes an OPTIONS request and reads a response, that contains // Options writes an OPTIONS request and reads a response, that contains
// the methods allowed by the server. Since this method is not implemented by // the methods allowed by the server. Since this method is not implemented by
// every RTSP server, the function does not fail if the returned code is StatusNotFound. // every RTSP server, the function does not fail if the returned code is StatusNotFound.
func (c *ConnClient) Options(u *url.URL) (*Response, error) { func (c *ConnClient) Options(u *url.URL) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
res, err := c.Do(&Request{ res, err := c.Do(&base.Request{
Method: OPTIONS, Method: base.OPTIONS,
// strip path // strip path
Url: &url.URL{ Url: &url.URL{
Scheme: "rtsp", Scheme: "rtsp",
@@ -345,7 +341,7 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) {
return nil, err return nil, err
} }
if res.StatusCode != StatusOK && res.StatusCode != StatusNotFound { if res.StatusCode != base.StatusOK && res.StatusCode != base.StatusNotFound {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
@@ -353,23 +349,23 @@ func (c *ConnClient) Options(u *url.URL) (*Response, error) {
} }
// Describe writes a DESCRIBE request and reads a Response. // Describe writes a DESCRIBE request and reads a Response.
func (c *ConnClient) Describe(u *url.URL) (Tracks, *Response, error) { func (c *ConnClient) Describe(u *url.URL) (Tracks, *base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, nil, fmt.Errorf("can't be called when reading or publishing") return nil, nil, fmt.Errorf("can't be called when reading or publishing")
} }
res, err := c.Do(&Request{ res, err := c.Do(&base.Request{
Method: DESCRIBE, Method: base.DESCRIBE,
Url: u, Url: u,
Header: Header{ Header: base.Header{
"Accept": HeaderValue{"application/sdp"}, "Accept": base.HeaderValue{"application/sdp"},
}, },
}) })
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
if res.StatusCode != StatusOK { if res.StatusCode != base.StatusOK {
return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
@@ -451,11 +447,11 @@ func (c *ConnClient) urlForTrack(baseUrl *url.URL, mode SetupMode, track *Track)
return u return u
} }
func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderTransport) (*Response, error) { func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderTransport) (*base.Response, error) {
res, err := c.Do(&Request{ res, err := c.Do(&base.Request{
Method: SETUP, Method: base.SETUP,
Url: c.urlForTrack(u, mode, track), Url: c.urlForTrack(u, mode, track),
Header: Header{ Header: base.Header{
"Transport": ht.Write(), "Transport": ht.Write(),
}, },
}) })
@@ -463,7 +459,7 @@ func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderT
return nil, err return nil, err
} }
if res.StatusCode != StatusOK { if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
@@ -473,7 +469,7 @@ func (c *ConnClient) setup(u *url.URL, mode SetupMode, track *Track, ht *HeaderT
// SetupUDP writes a SETUP request and reads a Response. // SetupUDP writes a SETUP request and reads a Response.
// If rtpPort and rtcpPort are zero, they are be chosen automatically. // If rtpPort and rtcpPort are zero, they are be chosen automatically.
func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort int, func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort int,
rtcpPort int) (*Response, error) { rtcpPort int) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
@@ -597,7 +593,7 @@ func (c *ConnClient) SetupUDP(u *url.URL, mode SetupMode, track *Track, rtpPort
} }
// SetupTCP writes a SETUP request and reads a Response. // SetupTCP writes a SETUP request and reads a Response.
func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Response, error) { func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
@@ -657,7 +653,7 @@ func (c *ConnClient) SetupTCP(u *url.URL, mode SetupMode, track *Track) (*Respon
// Play writes a PLAY request and reads a Response // Play writes a PLAY request and reads a Response
// This function can be called only after SetupUDP() or SetupTCP(). // This function can be called only after SetupUDP() or SetupTCP().
func (c *ConnClient) Play(u *url.URL) (*Response, error) { func (c *ConnClient) Play(u *url.URL) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
@@ -670,19 +666,15 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
fmt.Errorf("must be called with the same url used for SetupUDP() or SetupTCP()") fmt.Errorf("must be called with the same url used for SetupUDP() or SetupTCP()")
} }
if *c.streamProtocol == StreamProtocolTCP { res, err := c.Do(&base.Request{
c.tcpFrames = newMultiFrame(c.conf.ReadBufferCount, clientTCPFrameReadBufferSize) Method: base.PLAY,
}
res, err := c.Do(&Request{
Method: PLAY,
Url: u, Url: u,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
if res.StatusCode != StatusOK { if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
@@ -720,11 +712,7 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
c.udpRtcpListeners[trackId].write(frame) c.udpRtcpListeners[trackId].write(frame)
} else { } else {
c.WriteFrameTCP(&InterleavedFrame{ c.WriteFrameTCP(trackId, StreamTypeRtcp, frame)
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: frame,
})
} }
} }
} }
@@ -750,7 +738,7 @@ func (c *ConnClient) LoopUDP() error {
go func() { go func() {
for { for {
c.nconn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.conf.ReadTimeout))
_, err := ReadResponse(c.br) _, err := base.ReadResponse(c.br)
if err != nil { if err != nil {
readDone <- err readDone <- err
return return
@@ -771,8 +759,8 @@ func (c *ConnClient) LoopUDP() error {
return err return err
case <-keepaliveTicker.C: case <-keepaliveTicker.C:
_, err := c.Do(&Request{ _, err := c.Do(&base.Request{
Method: OPTIONS, Method: base.OPTIONS,
Url: &url.URL{ Url: &url.URL{
Scheme: "rtsp", Scheme: "rtsp",
Host: c.streamUrl.Host, Host: c.streamUrl.Host,
@@ -804,16 +792,16 @@ func (c *ConnClient) LoopUDP() error {
} }
// Announce writes an ANNOUNCE request and reads a Response. // Announce writes an ANNOUNCE request and reads a Response.
func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) { func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*base.Response, error) {
if c.streamUrl != nil { if c.streamUrl != nil {
fmt.Errorf("announce has already been sent with another url url") fmt.Errorf("announce has already been sent with another url url")
} }
res, err := c.Do(&Request{ res, err := c.Do(&base.Request{
Method: ANNOUNCE, Method: base.ANNOUNCE,
Url: u, Url: u,
Header: Header{ Header: base.Header{
"Content-Type": HeaderValue{"application/sdp"}, "Content-Type": base.HeaderValue{"application/sdp"},
}, },
Content: tracks.Write(), Content: tracks.Write(),
}) })
@@ -821,7 +809,7 @@ func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) {
return nil, err return nil, err
} }
if res.StatusCode != StatusOK { if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }
@@ -831,7 +819,7 @@ func (c *ConnClient) Announce(u *url.URL, tracks Tracks) (*Response, error) {
} }
// Record writes a RECORD request and reads a Response. // Record writes a RECORD request and reads a Response.
func (c *ConnClient) Record(u *url.URL) (*Response, error) { func (c *ConnClient) Record(u *url.URL) (*base.Response, error) {
if c.state != connClientStateInitial { if c.state != connClientStateInitial {
return nil, fmt.Errorf("can't be called when reading or publishing") return nil, fmt.Errorf("can't be called when reading or publishing")
} }
@@ -840,15 +828,15 @@ func (c *ConnClient) Record(u *url.URL) (*Response, error) {
return nil, fmt.Errorf("must be called with the same url used for Announce()") return nil, fmt.Errorf("must be called with the same url used for Announce()")
} }
res, err := c.Do(&Request{ res, err := c.Do(&base.Request{
Method: RECORD, Method: base.RECORD,
Url: u, Url: u,
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
if res.StatusCode != StatusOK { if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
} }

View File

@@ -119,7 +119,7 @@ func TestConnClientDialReadUDP(t *testing.T) {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
conn, tracks, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP) conn, _, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP)
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
@@ -131,7 +131,7 @@ func TestConnClientDialReadUDP(t *testing.T) {
conn.LoopUDP() conn.LoopUDP()
}() }()
_, err = conn.ReadFrameUDP(tracks[0], StreamTypeRtp) _, err = conn.ReadFrameUDP(0, StreamTypeRtp)
require.NoError(t, err) require.NoError(t, err)
conn.CloseUDPListeners() conn.CloseUDPListeners()
@@ -162,8 +162,11 @@ func TestConnClientDialReadTCP(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
_, err = conn.ReadFrameTCP() id, typ, _, err := conn.ReadFrameTCP()
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, id)
require.Equal(t, StreamTypeRtp, typ)
} }
func TestConnClientDialPublishUDP(t *testing.T) { func TestConnClientDialPublishUDP(t *testing.T) {
@@ -212,7 +215,7 @@ func TestConnClientDialPublishUDP(t *testing.T) {
break break
} }
err = conn.WriteFrameUDP(track, StreamTypeRtp, buf[:n]) err = conn.WriteFrameUDP(track.Id, StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
break break
} }
@@ -281,11 +284,7 @@ func TestConnClientDialPublishTCP(t *testing.T) {
break break
} }
err = conn.WriteFrameTCP(&InterleavedFrame{ err = conn.WriteFrameTCP(track.Id, StreamTypeRtp, buf[:n])
TrackId: track.Id,
StreamType: StreamTypeRtp,
Content: buf[:n],
})
if err != nil { if err != nil {
break break
} }

View File

@@ -4,6 +4,8 @@ import (
"bufio" "bufio"
"net" "net"
"time" "time"
"github.com/aler9/gortsplib/base"
) )
const ( const (
@@ -70,9 +72,9 @@ func (s *ConnServer) NetConn() net.Conn {
} }
// ReadRequest reads a Request. // ReadRequest reads a Request.
func (s *ConnServer) ReadRequest() (*Request, error) { func (s *ConnServer) ReadRequest() (*base.Request, error) {
s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline
return ReadRequest(s.br) return base.ReadRequest(s.br)
} }
// ReadFrameTCPOrRequest reads an InterleavedFrame or a Request. // ReadFrameTCPOrRequest reads an InterleavedFrame or a Request.
@@ -81,32 +83,24 @@ func (s *ConnServer) ReadFrameTCPOrRequest(timeout bool) (interface{}, error) {
s.conf.Conn.SetReadDeadline(time.Now().Add(s.conf.ReadTimeout)) s.conf.Conn.SetReadDeadline(time.Now().Add(s.conf.ReadTimeout))
} }
b, err := s.br.ReadByte() frame := s.tcpFrames.next()
if err != nil { return base.ReadInterleavedFrameOrRequest(frame, s.br)
return nil, err
}
s.br.UnreadByte()
if b == interleavedFrameMagicByte {
frame := s.tcpFrames.next()
err := frame.Read(s.br)
if err != nil {
return nil, err
}
return frame, err
}
return ReadRequest(s.br)
} }
// WriteResponse writes a Response. // WriteResponse writes a Response.
func (s *ConnServer) WriteResponse(res *Response) error { func (s *ConnServer) WriteResponse(res *base.Response) error {
s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
return res.Write(s.bw) return res.Write(s.bw)
} }
// WriteFrameTCP writes an InterleavedFrame. // WriteFrameTCP writes an InterleavedFrame.
func (s *ConnServer) WriteFrameTCP(frame *InterleavedFrame) error { func (s *ConnServer) WriteFrameTCP(trackId int, streamType StreamType, content []byte) error {
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: content,
}
s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
return frame.Write(s.bw) return frame.Write(s.bw)
} }

View File

@@ -96,11 +96,7 @@ func main() {
} }
// write frames to the server // write frames to the server
err = conn.WriteFrameTCP(&gortsplib.InterleavedFrame{ err = conn.WriteFrameTCP(track.Id, gortsplib.StreamTypeRtp, buf[:n])
TrackId: track.Id,
StreamType: gortsplib.StreamTypeRtp,
Content: buf[:n],
})
if err != nil { if err != nil {
break break
} }

View File

@@ -96,7 +96,7 @@ func main() {
} }
// write frames to the server // write frames to the server
err = conn.WriteFrameUDP(track, gortsplib.StreamTypeRtp, buf[:n]) err = conn.WriteFrameUDP(track.Id, gortsplib.StreamTypeRtp, buf[:n])
if err != nil { if err != nil {
break break
} }

View File

@@ -21,13 +21,13 @@ func main() {
for { for {
// read frames // read frames
frame, err := conn.ReadFrameTCP() id, typ, buf, err := conn.ReadFrameTCP()
if err != nil { if err != nil {
fmt.Println("connection is closed (%s)", err) fmt.Println("connection is closed (%s)", err)
break break
} }
fmt.Printf("frame from track %d, type %v: %v\n", fmt.Printf("frame from track %d, type %v: %v\n",
frame.TrackId, frame.StreamType, frame.Content) id, typ, buf)
} }
} }

View File

@@ -24,36 +24,36 @@ func main() {
defer wg.Wait() defer wg.Wait()
defer conn.CloseUDPListeners() defer conn.CloseUDPListeners()
for _, track := range tracks { for trackId := range tracks {
// read RTP frames // read RTP frames
wg.Add(1) wg.Add(1)
go func(track *gortsplib.Track) { go func(trackId int) {
defer wg.Done() defer wg.Done()
for { for {
buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtp) buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtp)
if err != nil { if err != nil {
break break
} }
fmt.Printf("frame from track %d, type RTP: %v\n", track.Id, buf) fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf)
} }
}(track) }(trackId)
// read RTCP frames // read RTCP frames
wg.Add(1) wg.Add(1)
go func(track *gortsplib.Track) { go func(trackId int) {
defer wg.Done() defer wg.Done()
for { for {
buf, err := conn.ReadFrameUDP(track, gortsplib.StreamTypeRtcp) buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtcp)
if err != nil { if err != nil {
break break
} }
fmt.Printf("frame from track %d, type RTCP: %v\n", track.Id, buf) fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf)
} }
}(track) }(trackId)
} }
err = conn.LoopUDP() err = conn.LoopUDP()

View File

@@ -3,6 +3,8 @@ package gortsplib
import ( import (
"fmt" "fmt"
"strings" "strings"
"github.com/aler9/gortsplib/base"
) )
// HeaderAuth is an Authenticate or a WWWW-Authenticate header. // HeaderAuth is an Authenticate or a WWWW-Authenticate header.
@@ -66,7 +68,7 @@ func findValue(v0 string) (string, string, error) {
} }
// ReadHeaderAuth parses an Authenticate or a WWW-Authenticate header. // ReadHeaderAuth parses an Authenticate or a WWW-Authenticate header.
func ReadHeaderAuth(v HeaderValue) (*HeaderAuth, error) { func ReadHeaderAuth(v base.HeaderValue) (*HeaderAuth, error) {
if len(v) == 0 { if len(v) == 0 {
return nil, fmt.Errorf("value not provided") return nil, fmt.Errorf("value not provided")
} }
@@ -154,7 +156,7 @@ func ReadHeaderAuth(v HeaderValue) (*HeaderAuth, error) {
} }
// Write encodes an Authenticate or a WWW-Authenticate header. // Write encodes an Authenticate or a WWW-Authenticate header.
func (ha *HeaderAuth) Write() HeaderValue { func (ha *HeaderAuth) Write() base.HeaderValue {
ret := "" ret := ""
switch ha.Method { switch ha.Method {
@@ -203,5 +205,5 @@ func (ha *HeaderAuth) Write() HeaderValue {
ret += strings.Join(vals, ", ") ret += strings.Join(vals, ", ")
return HeaderValue{ret} return base.HeaderValue{ret}
} }

View File

@@ -4,18 +4,20 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/base"
) )
var casesHeaderAuth = []struct { var casesHeaderAuth = []struct {
name string name string
vin HeaderValue vin base.HeaderValue
vout HeaderValue vout base.HeaderValue
h *HeaderAuth h *HeaderAuth
}{ }{
{ {
"basic", "basic",
HeaderValue{`Basic realm="4419b63f5e51"`}, base.HeaderValue{`Basic realm="4419b63f5e51"`},
HeaderValue{`Basic realm="4419b63f5e51"`}, base.HeaderValue{`Basic realm="4419b63f5e51"`},
&HeaderAuth{ &HeaderAuth{
Method: Basic, Method: Basic,
Realm: func() *string { Realm: func() *string {
@@ -26,8 +28,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest request 1", "digest request 1",
HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`},
HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Realm: func() *string { Realm: func() *string {
@@ -46,8 +48,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest request 2", "digest request 2",
HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale=FALSE`}, base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale=FALSE`},
HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`}, base.HeaderValue{`Digest realm="4419b63f5e51", nonce="8b84a3b789283a8bea8da7fa7d41f08b", stale="FALSE"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Realm: func() *string { Realm: func() *string {
@@ -66,8 +68,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest request 3", "digest request 3",
HeaderValue{`Digest realm="4419b63f5e51",nonce="133767111917411116111311118211673010032", stale="FALSE"`}, base.HeaderValue{`Digest realm="4419b63f5e51",nonce="133767111917411116111311118211673010032", stale="FALSE"`},
HeaderValue{`Digest realm="4419b63f5e51", nonce="133767111917411116111311118211673010032", stale="FALSE"`}, base.HeaderValue{`Digest realm="4419b63f5e51", nonce="133767111917411116111311118211673010032", stale="FALSE"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Realm: func() *string { Realm: func() *string {
@@ -86,8 +88,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest response generic", "digest response generic",
HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, base.HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`},
HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`}, base.HeaderValue{`Digest username="aa", realm="bb", nonce="cc", uri="dd", response="ee"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Username: func() *string { Username: func() *string {
@@ -114,8 +116,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest response with empty field", "digest response with empty field",
HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, base.HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`},
HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`}, base.HeaderValue{`Digest username="", realm="IPCAM", nonce="5d17cd12b9fa8a85ac5ceef0926ea5a6", uri="rtsp://localhost:8554/mystream", response="c072ae90eb4a27f4cdcb90d62266b2a1"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Username: func() *string { Username: func() *string {
@@ -142,8 +144,8 @@ var casesHeaderAuth = []struct {
}, },
{ {
"digest response with no spaces and additional fields", "digest response with no spaces and additional fields",
HeaderValue{`Digest realm="Please log in with a valid username",nonce="752a62306daf32b401a41004555c7663",opaque="",stale=FALSE,algorithm=MD5`}, base.HeaderValue{`Digest realm="Please log in with a valid username",nonce="752a62306daf32b401a41004555c7663",opaque="",stale=FALSE,algorithm=MD5`},
HeaderValue{`Digest realm="Please log in with a valid username", nonce="752a62306daf32b401a41004555c7663", opaque="", stale="FALSE", algorithm="MD5"`}, base.HeaderValue{`Digest realm="Please log in with a valid username", nonce="752a62306daf32b401a41004555c7663", opaque="", stale="FALSE", algorithm="MD5"`},
&HeaderAuth{ &HeaderAuth{
Method: Digest, Method: Digest,
Realm: func() *string { Realm: func() *string {

View File

@@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"github.com/aler9/gortsplib/base"
) )
// HeaderSession is a Session header. // HeaderSession is a Session header.
@@ -16,7 +18,7 @@ type HeaderSession struct {
} }
// ReadHeaderSession parses a Session header. // ReadHeaderSession parses a Session header.
func ReadHeaderSession(v HeaderValue) (*HeaderSession, error) { func ReadHeaderSession(v base.HeaderValue) (*HeaderSession, error) {
if len(v) == 0 { if len(v) == 0 {
return nil, fmt.Errorf("value not provided") return nil, fmt.Errorf("value not provided")
} }
@@ -61,12 +63,12 @@ func ReadHeaderSession(v HeaderValue) (*HeaderSession, error) {
} }
// Write encodes a Session header // Write encodes a Session header
func (hs *HeaderSession) Write() HeaderValue { func (hs *HeaderSession) Write() base.HeaderValue {
val := hs.Session val := hs.Session
if hs.Timeout != nil { if hs.Timeout != nil {
val += ";timeout=" + strconv.FormatUint(uint64(*hs.Timeout), 10) val += ";timeout=" + strconv.FormatUint(uint64(*hs.Timeout), 10)
} }
return HeaderValue{val} return base.HeaderValue{val}
} }

View File

@@ -4,26 +4,28 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/base"
) )
var casesHeaderSession = []struct { var casesHeaderSession = []struct {
name string name string
vin HeaderValue vin base.HeaderValue
vout HeaderValue vout base.HeaderValue
h *HeaderSession h *HeaderSession
}{ }{
{ {
"base", "base",
HeaderValue{`A3eqwsafq3rFASqew`}, base.HeaderValue{`A3eqwsafq3rFASqew`},
HeaderValue{`A3eqwsafq3rFASqew`}, base.HeaderValue{`A3eqwsafq3rFASqew`},
&HeaderSession{ &HeaderSession{
Session: "A3eqwsafq3rFASqew", Session: "A3eqwsafq3rFASqew",
}, },
}, },
{ {
"with timeout", "with timeout",
HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`},
HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`},
&HeaderSession{ &HeaderSession{
Session: "A3eqwsafq3rFASqew", Session: "A3eqwsafq3rFASqew",
Timeout: func() *uint { Timeout: func() *uint {
@@ -34,8 +36,8 @@ var casesHeaderSession = []struct {
}, },
{ {
"with timeout and space", "with timeout and space",
HeaderValue{`A3eqwsafq3rFASqew; timeout=47`}, base.HeaderValue{`A3eqwsafq3rFASqew; timeout=47`},
HeaderValue{`A3eqwsafq3rFASqew;timeout=47`}, base.HeaderValue{`A3eqwsafq3rFASqew;timeout=47`},
&HeaderSession{ &HeaderSession{
Session: "A3eqwsafq3rFASqew", Session: "A3eqwsafq3rFASqew",
Timeout: func() *uint { Timeout: func() *uint {

View File

@@ -4,6 +4,8 @@ import (
"fmt" "fmt"
"strconv" "strconv"
"strings" "strings"
"github.com/aler9/gortsplib/base"
) )
// HeaderTransport is a Transport header. // HeaderTransport is a Transport header.
@@ -56,7 +58,7 @@ func parsePorts(val string) (*[2]int, error) {
} }
// ReadHeaderTransport parses a Transport header. // ReadHeaderTransport parses a Transport header.
func ReadHeaderTransport(v HeaderValue) (*HeaderTransport, error) { func ReadHeaderTransport(v base.HeaderValue) (*HeaderTransport, error) {
if len(v) == 0 { if len(v) == 0 {
return nil, fmt.Errorf("value not provided") return nil, fmt.Errorf("value not provided")
} }
@@ -151,7 +153,7 @@ func ReadHeaderTransport(v HeaderValue) (*HeaderTransport, error) {
} }
// Write encodes a Transport header // Write encodes a Transport header
func (ht *HeaderTransport) Write() HeaderValue { func (ht *HeaderTransport) Write() base.HeaderValue {
var vals []string var vals []string
if ht.Protocol == StreamProtocolUDP { if ht.Protocol == StreamProtocolUDP {
@@ -187,5 +189,5 @@ func (ht *HeaderTransport) Write() HeaderValue {
vals = append(vals, "mode="+*ht.Mode) vals = append(vals, "mode="+*ht.Mode)
} }
return HeaderValue{strings.Join(vals, ";")} return base.HeaderValue{strings.Join(vals, ";")}
} }

View File

@@ -4,18 +4,20 @@ import (
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/aler9/gortsplib/base"
) )
var casesHeaderTransport = []struct { var casesHeaderTransport = []struct {
name string name string
vin HeaderValue vin base.HeaderValue
vout HeaderValue vout base.HeaderValue
h *HeaderTransport h *HeaderTransport
}{ }{
{ {
"udp unicast play request", "udp unicast play request",
HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode="PLAY"`}, base.HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode="PLAY"`},
HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode=play`}, base.HeaderValue{`RTP/AVP;unicast;client_port=3456-3457;mode=play`},
&HeaderTransport{ &HeaderTransport{
Protocol: StreamProtocolUDP, Protocol: StreamProtocolUDP,
Cast: func() *StreamCast { Cast: func() *StreamCast {
@@ -31,8 +33,8 @@ var casesHeaderTransport = []struct {
}, },
{ {
"udp unicast play response", "udp unicast play response",
HeaderValue{`RTP/AVP/UDP;unicast;client_port=3056-3057;server_port=5000-5001`}, base.HeaderValue{`RTP/AVP/UDP;unicast;client_port=3056-3057;server_port=5000-5001`},
HeaderValue{`RTP/AVP;unicast;client_port=3056-3057;server_port=5000-5001`}, base.HeaderValue{`RTP/AVP;unicast;client_port=3056-3057;server_port=5000-5001`},
&HeaderTransport{ &HeaderTransport{
Protocol: StreamProtocolUDP, Protocol: StreamProtocolUDP,
Cast: func() *StreamCast { Cast: func() *StreamCast {
@@ -45,8 +47,8 @@ var casesHeaderTransport = []struct {
}, },
{ {
"udp multicast play request / response", "udp multicast play request / response",
HeaderValue{`RTP/AVP;multicast;destination=225.219.201.15;port=7000-7001;ttl=127`}, base.HeaderValue{`RTP/AVP;multicast;destination=225.219.201.15;port=7000-7001;ttl=127`},
HeaderValue{`RTP/AVP;multicast`}, base.HeaderValue{`RTP/AVP;multicast`},
&HeaderTransport{ &HeaderTransport{
Protocol: StreamProtocolUDP, Protocol: StreamProtocolUDP,
Cast: func() *StreamCast { Cast: func() *StreamCast {
@@ -66,8 +68,8 @@ var casesHeaderTransport = []struct {
}, },
{ {
"tcp play request / response", "tcp play request / response",
HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, base.HeaderValue{`RTP/AVP/TCP;interleaved=0-1`},
HeaderValue{`RTP/AVP/TCP;interleaved=0-1`}, base.HeaderValue{`RTP/AVP/TCP;interleaved=0-1`},
&HeaderTransport{ &HeaderTransport{
Protocol: StreamProtocolTCP, Protocol: StreamProtocolTCP,
InterleavedIds: &[2]int{0, 1}, InterleavedIds: &[2]int{0, 1},

View File

@@ -1,5 +1,9 @@
package gortsplib package gortsplib
import (
"github.com/aler9/gortsplib/base"
)
// MultiBuffer implements software multi buffering, that allows to reuse // MultiBuffer implements software multi buffering, that allows to reuse
// existing buffers without creating new ones, increasing performance. // existing buffers without creating new ones, increasing performance.
type MultiBuffer struct { type MultiBuffer struct {
@@ -33,14 +37,14 @@ func (mb *MultiBuffer) Next() []byte {
type multiFrame struct { type multiFrame struct {
count int count int
frames []*InterleavedFrame frames []*base.InterleavedFrame
cur int cur int
} }
func newMultiFrame(count int, bufsize int) *multiFrame { func newMultiFrame(count int, bufsize int) *multiFrame {
frames := make([]*InterleavedFrame, count) frames := make([]*base.InterleavedFrame, count)
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
frames[i] = &InterleavedFrame{ frames[i] = &base.InterleavedFrame{
Content: make([]byte, 0, bufsize), Content: make([]byte, 0, bufsize),
} }
} }
@@ -51,7 +55,7 @@ func newMultiFrame(count int, bufsize int) *multiFrame {
} }
} }
func (mf *multiFrame) next() *InterleavedFrame { func (mf *multiFrame) next() *base.InterleavedFrame {
ret := mf.frames[mf.cur] ret := mf.frames[mf.cur]
mf.cur += 1 mf.cur += 1
if mf.cur >= mf.count { if mf.cur >= mf.count {

100
utils.go
View File

@@ -1,14 +1,7 @@
package gortsplib package gortsplib
import ( import (
"bufio" "github.com/aler9/gortsplib/base"
"fmt"
"io"
"strconv"
)
const (
rtspMaxContentLength = 4096
) )
// StreamProtocol is the protocol of a stream. // StreamProtocol is the protocol of a stream.
@@ -22,7 +15,7 @@ const (
StreamProtocolTCP StreamProtocolTCP
) )
// String implements fmt.Stringer // String implements fmt.Stringer.
func (sp StreamProtocol) String() string { func (sp StreamProtocol) String() string {
switch sp { switch sp {
case StreamProtocolUDP: case StreamProtocolUDP:
@@ -45,7 +38,7 @@ const (
StreamMulticast StreamMulticast
) )
// String implements fmt.Stringer // String implements fmt.Stringer.
func (sc StreamCast) String() string { func (sc StreamCast) String() string {
switch sc { switch sc {
case StreamUnicast: case StreamUnicast:
@@ -58,29 +51,17 @@ func (sc StreamCast) String() string {
} }
// StreamType is the stream type. // StreamType is the stream type.
type StreamType int type StreamType = base.StreamType
const ( const (
// StreamTypeRtp means that the stream contains RTP packets // StreamTypeRtp means that the stream contains RTP packets
StreamTypeRtp StreamType = iota StreamTypeRtp = base.StreamTypeRtp
// StreamTypeRtcp means that the stream contains RTCP packets // StreamTypeRtcp means that the stream contains RTCP packets
StreamTypeRtcp StreamTypeRtcp = base.StreamTypeRtcp
) )
// String implements fmt.Stringer // SetupMode is the setup mode.
func (st StreamType) String() string {
switch st {
case StreamTypeRtp:
return "RTP"
case StreamTypeRtcp:
return "RTCP"
}
return "unknown"
}
// SetupMode is a setup mode.
type SetupMode int type SetupMode int
const ( const (
@@ -91,7 +72,7 @@ const (
SetupModeRecord SetupModeRecord
) )
// String implements fmt.Stringer // String implements fmt.Stringer.
func (sm SetupMode) String() string { func (sm SetupMode) String() string {
switch sm { switch sm {
case SetupModePlay: case SetupModePlay:
@@ -102,68 +83,3 @@ func (sm SetupMode) String() string {
} }
return "unknown" return "unknown"
} }
func readBytesLimited(rb *bufio.Reader, delim byte, n int) ([]byte, error) {
for i := 1; i <= n; i++ {
byts, err := rb.Peek(i)
if err != nil {
return nil, err
}
if byts[len(byts)-1] == delim {
rb.Discard(len(byts))
return byts, nil
}
}
return nil, fmt.Errorf("buffer length exceeds %d", n)
}
func readByteEqual(rb *bufio.Reader, cmp byte) error {
byt, err := rb.ReadByte()
if err != nil {
return err
}
if byt != cmp {
return fmt.Errorf("expected '%c', got '%c'", cmp, byt)
}
return nil
}
func readContent(rb *bufio.Reader, header Header) ([]byte, error) {
cls, ok := header["Content-Length"]
if !ok || len(cls) != 1 {
return nil, nil
}
cl, err := strconv.ParseInt(cls[0], 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid Content-Length")
}
if cl > rtspMaxContentLength {
return nil, fmt.Errorf("Content-Length exceeds %d", rtspMaxContentLength)
}
ret := make([]byte, cl)
n, err := io.ReadFull(rb, ret)
if err != nil && n != len(ret) {
return nil, err
}
return ret, nil
}
func writeContent(bw *bufio.Writer, content []byte) error {
if len(content) == 0 {
return nil
}
_, err := bw.Write(content)
if err != nil {
return err
}
return nil
}