From 4ca5f8157fa7f4b16cdb332e44bb49af496b35c3 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 12 Jul 2020 19:48:46 +0200 Subject: [PATCH] rewrite ConnClient api; add examples --- Makefile | 1 + README.md | 9 +- conn-client.go | 270 +++++++++++++++++++++++++++++++---------- conn-server.go | 24 ++-- examples/client-tcp.go | 69 +++++++++++ go.mod | 6 +- go.sum | 4 + request.go | 10 +- 8 files changed, 316 insertions(+), 77 deletions(-) create mode 100644 examples/client-tcp.go diff --git a/Makefile b/Makefile index 0b3747a9..565fcc72 100644 --- a/Makefile +++ b/Makefile @@ -43,3 +43,4 @@ IMAGES = $(shell echo test-images/*/ | xargs -n1 basename) test-nodocker: $(eval export CGO_ENABLED = 0) go test -v . + $(foreach f,$(shell ls examples/*),go build -o /dev/null $(f)$(NL)) diff --git a/README.md b/README.md index 9148794c..ff0cef98 100644 --- a/README.md +++ b/README.md @@ -5,9 +5,11 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/aler9/gortsplib)](https://goreportcard.com/report/github.com/aler9/gortsplib) [![GoDoc](https://img.shields.io/badge/godoc-reference-blue)](https://pkg.go.dev/github.com/aler9/gortsplib?tab=doc) -RTSP 1.0 primitives for the Go programming language. +RTSP 1.0 library for the Go programming language, written for [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server). -See [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server) for examples on how to use this library. +## Examples + +[client-tcp.go](examples/client-tcp.go) ## Documentation @@ -15,6 +17,9 @@ https://pkg.go.dev/github.com/aler9/gortsplib ## Links +Related projects +* https://github.com/aler9/rtsp-simple-server + IETF Standards * RTSP 1.0 https://tools.ietf.org/html/rfc2326 * RTSP 2.0 https://tools.ietf.org/html/rfc7826 diff --git a/conn-client.go b/conn-client.go index 2378ed75..34e02953 100644 --- a/conn-client.go +++ b/conn-client.go @@ -4,8 +4,12 @@ import ( "bufio" "fmt" "net" + "net/url" "strconv" + "strings" "time" + + "github.com/pion/sdp" ) const ( @@ -16,13 +20,7 @@ const ( // ConnClientConf allows to configure a ConnClient. type ConnClientConf struct { // pre-existing TCP connection that will be wrapped - NConn net.Conn - - // (optional) a username that will be sent to the server when requested - Username string - - // (optional) a password that will be sent to the server when requested - Password string + Conn net.Conn // (optional) timeout for read requests. // It defaults to 5 seconds @@ -52,26 +50,25 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) { conf.WriteTimeout = 5 * time.Second } - if conf.Username != "" && conf.Password == "" || - conf.Username == "" && conf.Password != "" { - return nil, fmt.Errorf("username and password must be both provided") - } - return &ConnClient{ conf: conf, - br: bufio.NewReaderSize(conf.NConn, _CLIENT_READ_BUFFER_SIZE), - bw: bufio.NewWriterSize(conf.NConn, _CLIENT_WRITE_BUFFER_SIZE), + br: bufio.NewReaderSize(conf.Conn, _CLIENT_READ_BUFFER_SIZE), + bw: bufio.NewWriterSize(conf.Conn, _CLIENT_WRITE_BUFFER_SIZE), }, nil } // NetConn returns the underlying net.Conn. func (c *ConnClient) NetConn() net.Conn { - return c.conf.NConn + return c.conf.Conn } -// ReadInterleavedFrameOrResponse reads an InterleavedFrame or a Response. -func (c *ConnClient) ReadInterleavedFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { - c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) +// ReadFrame reads an InterleavedFrame. +func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { + return frame.read(c.br) +} + +func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { + c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) b, err := c.br.ReadByte() if err != nil { return nil, err @@ -89,13 +86,46 @@ func (c *ConnClient) ReadInterleavedFrameOrResponse(frame *InterleavedFrame) (in return readResponse(c.br) } -// ReadInterleavedFrame reads an InterleavedFrame. -func (c *ConnClient) ReadInterleavedFrame(frame *InterleavedFrame) error { - c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - return frame.read(c.br) +// Do writes a Request and reads a Response. +func (c *ConnClient) Do(req *Request) (*Response, error) { + err := c.WriteRequest(req) + if err != nil { + return nil, err + } + + c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + res, err := readResponse(c.br) + if err != nil { + return nil, err + } + + // get session from response + if sxRaw, ok := res.Header["Session"]; ok && len(sxRaw) == 1 { + sx, err := ReadHeaderSession(sxRaw[0]) + if err != nil { + return nil, fmt.Errorf("unable to parse session header: %s", err) + } + c.session = sx.Session + } + + // setup authentication + if res.StatusCode == StatusUnauthorized && req.Url.User != nil && c.auth == nil { + pass, _ := req.Url.User.Password() + auth, err := NewAuthClient(res.Header["WWW-Authenticate"], req.Url.User.Username(), pass) + if err != nil { + return nil, fmt.Errorf("unable to setup authentication: %s", err) + } + c.auth = auth + + // send request again + return c.Do(req) + } + + return res, nil } -func (c *ConnClient) writeRequest(req *Request) error { +// WriteRequest writes a request and does not wait for a response. +func (c *ConnClient) WriteRequest(req *Request) error { if req.Header == nil { req.Header = make(Header) } @@ -114,54 +144,172 @@ func (c *ConnClient) writeRequest(req *Request) error { c.curCSeq += 1 req.Header["CSeq"] = []string{strconv.FormatInt(int64(c.curCSeq), 10)} - c.conf.NConn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) + c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) return req.write(c.bw) } -// WriteRequest writes a request and reads a response. -func (c *ConnClient) WriteRequest(req *Request) (*Response, error) { - err := c.writeRequest(req) +// WriteFrame writes an InterleavedFrame. +func (c *ConnClient) WriteFrame(frame *InterleavedFrame) error { + c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) + return frame.write(c.bw) +} + +// Options writes an OPTIONS request and reads a response, that contains +// the methods allowed by the server. Since this method is not implemented by every +// RTSP server, the function does not fail if the returned code is not StatusOK. +func (c *ConnClient) Options(u *url.URL) (*Response, error) { + // strip path + u = &url.URL{ + Scheme: "rtsp", + Host: u.Host, + User: u.User, + Path: "/", + } + + res, err := c.Do(&Request{ + Method: OPTIONS, + Url: u, + }) if err != nil { return nil, err } - c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - res, err := readResponse(c.br) - if err != nil { - return nil, err - } - - // get session from response - if sxRaw, ok := res.Header["Session"]; ok && len(sxRaw) == 1 { - sx, err := ReadHeaderSession(sxRaw[0]) - if err != nil { - return nil, fmt.Errorf("unable to parse session header: %s", err) - } - c.session = sx.Session - } - - // setup authentication - if res.StatusCode == StatusUnauthorized && c.conf.Username != "" && c.auth == nil { - auth, err := NewAuthClient(res.Header["WWW-Authenticate"], c.conf.Username, c.conf.Password) - if err != nil { - return nil, fmt.Errorf("unable to setup authentication: %s", err) - } - c.auth = auth - - // send request again - return c.WriteRequest(req) - } - return res, nil } -// WriteRequestNoResponse writes a request and does not wait for a response. -func (c *ConnClient) WriteRequestNoResponse(req *Request) error { - return c.writeRequest(req) +// Describe writes a DESCRIBE request and reads a response, that contains +// a SDP document that describes the stream available in the given url. +func (c *ConnClient) Describe(u *url.URL) (*sdp.SessionDescription, *Response, error) { + res, err := c.Do(&Request{ + Method: DESCRIBE, + Url: u, + }) + if err != nil { + return nil, nil, err + } + + if res.StatusCode != StatusOK { + return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + contentType, ok := res.Header["Content-Type"] + if !ok || len(contentType) != 1 { + return nil, nil, fmt.Errorf("Content-Type not provided") + } + + if contentType[0] != "application/sdp" { + return nil, nil, fmt.Errorf("wrong Content-Type, expected application/sdp") + } + + sdpd := &sdp.SessionDescription{} + err = sdpd.Unmarshal(string(res.Content)) + if err != nil { + return nil, nil, err + } + + return sdpd, res, nil } -// WriteInterleavedFrame writes an InterleavedFrame. -func (c *ConnClient) WriteInterleavedFrame(frame *InterleavedFrame) error { - c.conf.NConn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) - return frame.write(c.bw) +// Setup writes a SETUP request, that indicates that we want to read +// a stream described by the given media, with the given transport, +// and reads a response. +func (c *ConnClient) Setup(u *url.URL, media *sdp.MediaDescription, transport []string) (*Response, error) { + // build an URL with the control attribute from media + u = func() *url.URL { + control := func() string { + for _, attr := range media.Attributes { + if attr.Key == "control" { + return attr.Value + } + } + return "" + }() + + // no control attribute, use original url + if control == "" { + return u + } + + // control attribute with absolute path + if strings.HasPrefix(control, "rtsp://") { + newu, err := url.Parse(control) + if err != nil { + return u + } + + return &url.URL{ + Scheme: "rtsp", + Host: u.Host, + User: u.User, + Path: newu.Path, + RawQuery: newu.RawQuery, + } + } + + // control attribute with relative path + return &url.URL{ + Scheme: "rtsp", + Host: u.Host, + User: u.User, + Path: func() string { + ret := u.Path + if len(ret) == 0 || ret[len(ret)-1] != '/' { + ret += "/" + } + ret += control + return ret + }(), + RawQuery: u.RawQuery, + } + }() + + res, err := c.Do(&Request{ + Method: SETUP, + Url: u, + Header: Header{ + "Transport": []string{strings.Join(transport, ";")}, + }, + }) + if err != nil { + return nil, err + } + + if res.StatusCode != StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + return res, nil +} + +// Play writes a PLAY request, that indicates that we want to start the +// stream, and reads a response. +func (c *ConnClient) Play(u *url.URL) (*Response, error) { + err := c.WriteRequest(&Request{ + Method: PLAY, + Url: u, + }) + if err != nil { + return nil, err + } + + frame := &InterleavedFrame{ + Content: make([]byte, 512*1024), + } + + // v4lrtspserver sends frames before the response. + // ignore them and wait for the response. + for { + recv, err := c.readFrameOrResponse(frame) + if err != nil { + return nil, err + } + + if res, ok := recv.(*Response); ok { + if res.StatusCode != StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + return res, nil + } + } } diff --git a/conn-server.go b/conn-server.go index aeac4e1a..0d94d5cc 100644 --- a/conn-server.go +++ b/conn-server.go @@ -14,7 +14,7 @@ const ( // ConnServerConf allows to configure a ConnServer. type ConnServerConf struct { // pre-existing TCP connection that will be wrapped - NConn net.Conn + Conn net.Conn // (optional) timeout for read requests. // It defaults to 5 seconds @@ -43,25 +43,25 @@ func NewConnServer(conf ConnServerConf) *ConnServer { return &ConnServer{ conf: conf, - br: bufio.NewReaderSize(conf.NConn, _SERVER_READ_BUFFER_SIZE), - bw: bufio.NewWriterSize(conf.NConn, _SERVER_WRITE_BUFFER_SIZE), + br: bufio.NewReaderSize(conf.Conn, _SERVER_READ_BUFFER_SIZE), + bw: bufio.NewWriterSize(conf.Conn, _SERVER_WRITE_BUFFER_SIZE), } } // NetConn returns the underlying net.Conn. func (s *ConnServer) NetConn() net.Conn { - return s.conf.NConn + return s.conf.Conn } // ReadRequest reads a Request. func (s *ConnServer) ReadRequest() (*Request, error) { - s.conf.NConn.SetReadDeadline(time.Time{}) // disable deadline + s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline return readRequest(s.br) } -// ReadInterleavedFrameOrRequest reads an InterleavedFrame or a Request. -func (s *ConnServer) ReadInterleavedFrameOrRequest(frame *InterleavedFrame) (interface{}, error) { - s.conf.NConn.SetReadDeadline(time.Time{}) // disable deadline +// ReadFrameOrRequest reads an InterleavedFrame or a Request. +func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame) (interface{}, error) { + s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline b, err := s.br.ReadByte() if err != nil { return nil, err @@ -81,12 +81,12 @@ func (s *ConnServer) ReadInterleavedFrameOrRequest(frame *InterleavedFrame) (int // WriteResponse writes a response. func (s *ConnServer) WriteResponse(res *Response) error { - s.conf.NConn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) + s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) return res.write(s.bw) } -// WriteInterleavedFrame writes an InterleavedFrame. -func (s *ConnServer) WriteInterleavedFrame(frame *InterleavedFrame) error { - s.conf.NConn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) +// WriteFrame writes an InterleavedFrame. +func (s *ConnServer) WriteFrame(frame *InterleavedFrame) error { + s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout)) return frame.write(s.bw) } diff --git a/examples/client-tcp.go b/examples/client-tcp.go new file mode 100644 index 00000000..738004d6 --- /dev/null +++ b/examples/client-tcp.go @@ -0,0 +1,69 @@ +// +build ignore + +package main + +import ( + "fmt" + "net" + "net/url" + "time" + + "github.com/aler9/gortsplib" +) + +func main() { + u, err := url.Parse("rtsp://user:pass@example.com/mystream") + if err != nil { + panic(err) + } + + conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second) + if err != nil { + panic(err) + } + defer conn.Close() + + rconn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn}) + if err != nil { + panic(err) + } + + _, err = rconn.Options(u) + if err != nil { + panic(err) + } + + sdpd, _, err := rconn.Describe(u) + if err != nil { + panic(err) + } + + for i, media := range sdpd.MediaDescriptions { + _, err := rconn.Setup(u, media, []string{ + "RTP/AVP/TCP", + "unicast", + fmt.Sprintf("interleaved=%d-%d", (i * 2), (i*2)+1), + }) + if err != nil { + panic(err) + } + } + + _, err = rconn.Play(u) + if err != nil { + panic(err) + } + + frame := &gortsplib.InterleavedFrame{ + Content: make([]byte, 512*1024), + } + + for { + err := rconn.ReadFrame(frame) + if err != nil { + panic(err) + } + + fmt.Println("incoming", frame.Channel, frame.Content) + } +} diff --git a/go.mod b/go.mod index f69a9976..c96e2489 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,8 @@ module github.com/aler9/gortsplib go 1.13 -require github.com/stretchr/testify v1.4.0 +require ( + github.com/pion/sdp v1.3.0 + github.com/pkg/errors v0.9.1 // indirect + github.com/stretchr/testify v1.4.0 +) diff --git a/go.sum b/go.sum index 8fdee585..7a305f4e 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,9 @@ github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pion/sdp v1.3.0 h1:21lpgEILHyolpsIrbCBagZaAPj4o057cFjzaFebkVOs= +github.com/pion/sdp v1.3.0/go.mod h1:ceA2lTyftydQTuCIbUNoH77aAt6CiQJaRpssA4Gee8I= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/request.go b/request.go index 93b6892e..e7823ec0 100644 --- a/request.go +++ b/request.go @@ -107,7 +107,15 @@ func readRequest(rb *bufio.Reader) (*Request, error) { } func (req *Request) write(bw *bufio.Writer) error { - _, err := bw.Write([]byte(string(req.Method) + " " + req.Url.String() + " " + _RTSP_PROTO + "\r\n")) + // remove credentials + u := &url.URL{ + Scheme: req.Url.Scheme, + Host: req.Url.Host, + Path: req.Url.Path, + RawQuery: req.Url.RawQuery, + } + + _, err := bw.Write([]byte(string(req.Method) + " " + u.String() + " " + _RTSP_PROTO + "\r\n")) if err != nil { return err }