rewrite ConnClient api; add examples

This commit is contained in:
aler9
2020-07-12 19:48:46 +02:00
parent 6b3c41d319
commit 4ca5f8157f
8 changed files with 316 additions and 77 deletions

View File

@@ -43,3 +43,4 @@ IMAGES = $(shell echo test-images/*/ | xargs -n1 basename)
test-nodocker:
$(eval export CGO_ENABLED = 0)
go test -v .
$(foreach f,$(shell ls examples/*),go build -o /dev/null $(f)$(NL))

View File

@@ -5,9 +5,11 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/aler9/gortsplib)](https://goreportcard.com/report/github.com/aler9/gortsplib)
[![GoDoc](https://img.shields.io/badge/godoc-reference-blue)](https://pkg.go.dev/github.com/aler9/gortsplib?tab=doc)
RTSP 1.0 primitives for the Go programming language.
RTSP 1.0 library for the Go programming language, written for [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server).
See [rtsp-simple-server](https://github.com/aler9/rtsp-simple-server) for examples on how to use this library.
## Examples
[client-tcp.go](examples/client-tcp.go)
## Documentation
@@ -15,6 +17,9 @@ https://pkg.go.dev/github.com/aler9/gortsplib
## Links
Related projects
* https://github.com/aler9/rtsp-simple-server
IETF Standards
* RTSP 1.0 https://tools.ietf.org/html/rfc2326
* RTSP 2.0 https://tools.ietf.org/html/rfc7826

View File

@@ -4,8 +4,12 @@ import (
"bufio"
"fmt"
"net"
"net/url"
"strconv"
"strings"
"time"
"github.com/pion/sdp"
)
const (
@@ -16,13 +20,7 @@ const (
// ConnClientConf allows to configure a ConnClient.
type ConnClientConf struct {
// pre-existing TCP connection that will be wrapped
NConn net.Conn
// (optional) a username that will be sent to the server when requested
Username string
// (optional) a password that will be sent to the server when requested
Password string
Conn net.Conn
// (optional) timeout for read requests.
// It defaults to 5 seconds
@@ -52,26 +50,25 @@ func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
conf.WriteTimeout = 5 * time.Second
}
if conf.Username != "" && conf.Password == "" ||
conf.Username == "" && conf.Password != "" {
return nil, fmt.Errorf("username and password must be both provided")
}
return &ConnClient{
conf: conf,
br: bufio.NewReaderSize(conf.NConn, _CLIENT_READ_BUFFER_SIZE),
bw: bufio.NewWriterSize(conf.NConn, _CLIENT_WRITE_BUFFER_SIZE),
br: bufio.NewReaderSize(conf.Conn, _CLIENT_READ_BUFFER_SIZE),
bw: bufio.NewWriterSize(conf.Conn, _CLIENT_WRITE_BUFFER_SIZE),
}, nil
}
// NetConn returns the underlying net.Conn.
func (c *ConnClient) NetConn() net.Conn {
return c.conf.NConn
return c.conf.Conn
}
// ReadInterleavedFrameOrResponse reads an InterleavedFrame or a Response.
func (c *ConnClient) ReadInterleavedFrameOrResponse(frame *InterleavedFrame) (interface{}, error) {
c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
// ReadFrame reads an InterleavedFrame.
func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error {
return frame.read(c.br)
}
func (c *ConnClient) readFrameOrResponse(frame *InterleavedFrame) (interface{}, error) {
c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
b, err := c.br.ReadByte()
if err != nil {
return nil, err
@@ -89,13 +86,46 @@ func (c *ConnClient) ReadInterleavedFrameOrResponse(frame *InterleavedFrame) (in
return readResponse(c.br)
}
// ReadInterleavedFrame reads an InterleavedFrame.
func (c *ConnClient) ReadInterleavedFrame(frame *InterleavedFrame) error {
c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
return frame.read(c.br)
// Do writes a Request and reads a Response.
func (c *ConnClient) Do(req *Request) (*Response, error) {
err := c.WriteRequest(req)
if err != nil {
return nil, err
}
c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
res, err := readResponse(c.br)
if err != nil {
return nil, err
}
// get session from response
if sxRaw, ok := res.Header["Session"]; ok && len(sxRaw) == 1 {
sx, err := ReadHeaderSession(sxRaw[0])
if err != nil {
return nil, fmt.Errorf("unable to parse session header: %s", err)
}
c.session = sx.Session
}
// setup authentication
if res.StatusCode == StatusUnauthorized && req.Url.User != nil && c.auth == nil {
pass, _ := req.Url.User.Password()
auth, err := NewAuthClient(res.Header["WWW-Authenticate"], req.Url.User.Username(), pass)
if err != nil {
return nil, fmt.Errorf("unable to setup authentication: %s", err)
}
c.auth = auth
// send request again
return c.Do(req)
}
return res, nil
}
func (c *ConnClient) writeRequest(req *Request) error {
// WriteRequest writes a request and does not wait for a response.
func (c *ConnClient) WriteRequest(req *Request) error {
if req.Header == nil {
req.Header = make(Header)
}
@@ -114,54 +144,172 @@ func (c *ConnClient) writeRequest(req *Request) error {
c.curCSeq += 1
req.Header["CSeq"] = []string{strconv.FormatInt(int64(c.curCSeq), 10)}
c.conf.NConn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return req.write(c.bw)
}
// WriteRequest writes a request and reads a response.
func (c *ConnClient) WriteRequest(req *Request) (*Response, error) {
err := c.writeRequest(req)
// WriteFrame writes an InterleavedFrame.
func (c *ConnClient) WriteFrame(frame *InterleavedFrame) error {
c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.write(c.bw)
}
// Options writes an OPTIONS request and reads a response, that contains
// the methods allowed by the server. Since this method is not implemented by every
// RTSP server, the function does not fail if the returned code is not StatusOK.
func (c *ConnClient) Options(u *url.URL) (*Response, error) {
// strip path
u = &url.URL{
Scheme: "rtsp",
Host: u.Host,
User: u.User,
Path: "/",
}
res, err := c.Do(&Request{
Method: OPTIONS,
Url: u,
})
if err != nil {
return nil, err
}
c.conf.NConn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
res, err := readResponse(c.br)
if err != nil {
return nil, err
}
// get session from response
if sxRaw, ok := res.Header["Session"]; ok && len(sxRaw) == 1 {
sx, err := ReadHeaderSession(sxRaw[0])
if err != nil {
return nil, fmt.Errorf("unable to parse session header: %s", err)
}
c.session = sx.Session
}
// setup authentication
if res.StatusCode == StatusUnauthorized && c.conf.Username != "" && c.auth == nil {
auth, err := NewAuthClient(res.Header["WWW-Authenticate"], c.conf.Username, c.conf.Password)
if err != nil {
return nil, fmt.Errorf("unable to setup authentication: %s", err)
}
c.auth = auth
// send request again
return c.WriteRequest(req)
}
return res, nil
}
// WriteRequestNoResponse writes a request and does not wait for a response.
func (c *ConnClient) WriteRequestNoResponse(req *Request) error {
return c.writeRequest(req)
// Describe writes a DESCRIBE request and reads a response, that contains
// a SDP document that describes the stream available in the given url.
func (c *ConnClient) Describe(u *url.URL) (*sdp.SessionDescription, *Response, error) {
res, err := c.Do(&Request{
Method: DESCRIBE,
Url: u,
})
if err != nil {
return nil, nil, err
}
if res.StatusCode != StatusOK {
return nil, nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
contentType, ok := res.Header["Content-Type"]
if !ok || len(contentType) != 1 {
return nil, nil, fmt.Errorf("Content-Type not provided")
}
if contentType[0] != "application/sdp" {
return nil, nil, fmt.Errorf("wrong Content-Type, expected application/sdp")
}
sdpd := &sdp.SessionDescription{}
err = sdpd.Unmarshal(string(res.Content))
if err != nil {
return nil, nil, err
}
return sdpd, res, nil
}
// WriteInterleavedFrame writes an InterleavedFrame.
func (c *ConnClient) WriteInterleavedFrame(frame *InterleavedFrame) error {
c.conf.NConn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.write(c.bw)
// Setup writes a SETUP request, that indicates that we want to read
// a stream described by the given media, with the given transport,
// and reads a response.
func (c *ConnClient) Setup(u *url.URL, media *sdp.MediaDescription, transport []string) (*Response, error) {
// build an URL with the control attribute from media
u = func() *url.URL {
control := func() string {
for _, attr := range media.Attributes {
if attr.Key == "control" {
return attr.Value
}
}
return ""
}()
// no control attribute, use original url
if control == "" {
return u
}
// control attribute with absolute path
if strings.HasPrefix(control, "rtsp://") {
newu, err := url.Parse(control)
if err != nil {
return u
}
return &url.URL{
Scheme: "rtsp",
Host: u.Host,
User: u.User,
Path: newu.Path,
RawQuery: newu.RawQuery,
}
}
// control attribute with relative path
return &url.URL{
Scheme: "rtsp",
Host: u.Host,
User: u.User,
Path: func() string {
ret := u.Path
if len(ret) == 0 || ret[len(ret)-1] != '/' {
ret += "/"
}
ret += control
return ret
}(),
RawQuery: u.RawQuery,
}
}()
res, err := c.Do(&Request{
Method: SETUP,
Url: u,
Header: Header{
"Transport": []string{strings.Join(transport, ";")},
},
})
if err != nil {
return nil, err
}
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
// Play writes a PLAY request, that indicates that we want to start the
// stream, and reads a response.
func (c *ConnClient) Play(u *url.URL) (*Response, error) {
err := c.WriteRequest(&Request{
Method: PLAY,
Url: u,
})
if err != nil {
return nil, err
}
frame := &InterleavedFrame{
Content: make([]byte, 512*1024),
}
// v4lrtspserver sends frames before the response.
// ignore them and wait for the response.
for {
recv, err := c.readFrameOrResponse(frame)
if err != nil {
return nil, err
}
if res, ok := recv.(*Response); ok {
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
}
}

View File

@@ -14,7 +14,7 @@ const (
// ConnServerConf allows to configure a ConnServer.
type ConnServerConf struct {
// pre-existing TCP connection that will be wrapped
NConn net.Conn
Conn net.Conn
// (optional) timeout for read requests.
// It defaults to 5 seconds
@@ -43,25 +43,25 @@ func NewConnServer(conf ConnServerConf) *ConnServer {
return &ConnServer{
conf: conf,
br: bufio.NewReaderSize(conf.NConn, _SERVER_READ_BUFFER_SIZE),
bw: bufio.NewWriterSize(conf.NConn, _SERVER_WRITE_BUFFER_SIZE),
br: bufio.NewReaderSize(conf.Conn, _SERVER_READ_BUFFER_SIZE),
bw: bufio.NewWriterSize(conf.Conn, _SERVER_WRITE_BUFFER_SIZE),
}
}
// NetConn returns the underlying net.Conn.
func (s *ConnServer) NetConn() net.Conn {
return s.conf.NConn
return s.conf.Conn
}
// ReadRequest reads a Request.
func (s *ConnServer) ReadRequest() (*Request, error) {
s.conf.NConn.SetReadDeadline(time.Time{}) // disable deadline
s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline
return readRequest(s.br)
}
// ReadInterleavedFrameOrRequest reads an InterleavedFrame or a Request.
func (s *ConnServer) ReadInterleavedFrameOrRequest(frame *InterleavedFrame) (interface{}, error) {
s.conf.NConn.SetReadDeadline(time.Time{}) // disable deadline
// ReadFrameOrRequest reads an InterleavedFrame or a Request.
func (s *ConnServer) ReadFrameOrRequest(frame *InterleavedFrame) (interface{}, error) {
s.conf.Conn.SetReadDeadline(time.Time{}) // disable deadline
b, err := s.br.ReadByte()
if err != nil {
return nil, err
@@ -81,12 +81,12 @@ func (s *ConnServer) ReadInterleavedFrameOrRequest(frame *InterleavedFrame) (int
// WriteResponse writes a response.
func (s *ConnServer) WriteResponse(res *Response) error {
s.conf.NConn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
return res.write(s.bw)
}
// WriteInterleavedFrame writes an InterleavedFrame.
func (s *ConnServer) WriteInterleavedFrame(frame *InterleavedFrame) error {
s.conf.NConn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
// WriteFrame writes an InterleavedFrame.
func (s *ConnServer) WriteFrame(frame *InterleavedFrame) error {
s.conf.Conn.SetWriteDeadline(time.Now().Add(s.conf.WriteTimeout))
return frame.write(s.bw)
}

69
examples/client-tcp.go Normal file
View File

@@ -0,0 +1,69 @@
// +build ignore
package main
import (
"fmt"
"net"
"net/url"
"time"
"github.com/aler9/gortsplib"
)
func main() {
u, err := url.Parse("rtsp://user:pass@example.com/mystream")
if err != nil {
panic(err)
}
conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second)
if err != nil {
panic(err)
}
defer conn.Close()
rconn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn})
if err != nil {
panic(err)
}
_, err = rconn.Options(u)
if err != nil {
panic(err)
}
sdpd, _, err := rconn.Describe(u)
if err != nil {
panic(err)
}
for i, media := range sdpd.MediaDescriptions {
_, err := rconn.Setup(u, media, []string{
"RTP/AVP/TCP",
"unicast",
fmt.Sprintf("interleaved=%d-%d", (i * 2), (i*2)+1),
})
if err != nil {
panic(err)
}
}
_, err = rconn.Play(u)
if err != nil {
panic(err)
}
frame := &gortsplib.InterleavedFrame{
Content: make([]byte, 512*1024),
}
for {
err := rconn.ReadFrame(frame)
if err != nil {
panic(err)
}
fmt.Println("incoming", frame.Channel, frame.Content)
}
}

6
go.mod
View File

@@ -2,4 +2,8 @@ module github.com/aler9/gortsplib
go 1.13
require github.com/stretchr/testify v1.4.0
require (
github.com/pion/sdp v1.3.0
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.4.0
)

4
go.sum
View File

@@ -1,5 +1,9 @@
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pion/sdp v1.3.0 h1:21lpgEILHyolpsIrbCBagZaAPj4o057cFjzaFebkVOs=
github.com/pion/sdp v1.3.0/go.mod h1:ceA2lTyftydQTuCIbUNoH77aAt6CiQJaRpssA4Gee8I=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@@ -107,7 +107,15 @@ func readRequest(rb *bufio.Reader) (*Request, error) {
}
func (req *Request) write(bw *bufio.Writer) error {
_, err := bw.Write([]byte(string(req.Method) + " " + req.Url.String() + " " + _RTSP_PROTO + "\r\n"))
// remove credentials
u := &url.URL{
Scheme: req.Url.Scheme,
Host: req.Url.Host,
Path: req.Url.Path,
RawQuery: req.Url.RawQuery,
}
_, err := bw.Write([]byte(string(req.Method) + " " + u.String() + " " + _RTSP_PROTO + "\r\n"))
if err != nil {
return err
}