diff --git a/README.md b/README.md index 16dcbd99..16897c32 100644 --- a/README.md +++ b/README.md @@ -10,13 +10,15 @@ RTSP 1.0 client and server library for the Go programming language, written for Features: * Client - * Query servers about published tracks - * Read tracks from servers with UDP or TCP - * Read only selected tracks - * Publish tracks to servers with UDP or TCP + * Read streams from servers with UDP or TCP + * Publish streams to servers with UDP or TCP + * Query servers about published streams + * Read only selected tracks of a stream * Pause reading or publishing without disconnecting from the server * Server - * Build servers and handle publishers and readers + * Handle requests from clients + * Read and write streams with TCP + * Encrypt streams with TLS (RTSPS) ## Table of contents @@ -35,6 +37,7 @@ Features: * [client-publish-options](examples/client-publish-options.go) * [client-publish-pause](examples/client-publish-pause.go) * [server](examples/server.go) +* [server-tls](examples/server-tls.go) ## API Documentation diff --git a/examples/server-tls.go b/examples/server-tls.go new file mode 100644 index 00000000..10ff29b8 --- /dev/null +++ b/examples/server-tls.go @@ -0,0 +1,194 @@ +// +build ignore + +package main + +import ( + "crypto/tls" + "fmt" + "log" + "sync" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/pkg/base" + "github.com/aler9/gortsplib/pkg/headers" +) + +// This example shows how to +// 1. create a RTSP server which accept only connections encrypted with TLS (RTSPS) +// 2. allow a single client to publish a stream with TCP +// 3. allow multiple clients to read that stream with TCP + +var mutex sync.Mutex +var publisher *gortsplib.ServerConn +var sdp []byte +var readers = make(map[*gortsplib.ServerConn]struct{}) + +// this is called for each incoming connection +func handleConn(conn *gortsplib.ServerConn) { + defer conn.Close() + + log.Printf("client connected") + + // called after receiving a DESCRIBE request. + onDescribe := func(req *base.Request) (*base.Response, error) { + mutex.Lock() + defer mutex.Unlock() + + // no one is publishing yet + if publisher == nil { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Base": base.HeaderValue{req.URL.String() + "/"}, + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Content: sdp, + }, nil + } + + // called after receiving an ANNOUNCE request. + onAnnounce := func(req *base.Request, tracks gortsplib.Tracks) (*base.Response, error) { + mutex.Lock() + defer mutex.Unlock() + + if publisher != nil { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("someone is already publishing") + } + + publisher = conn + sdp = tracks.Write() + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": base.HeaderValue{"12345678"}, + }, + }, nil + } + + // called after receiving a SETUP request. + onSetup := func(req *base.Request, th *headers.Transport) (*base.Response, error) { + // support TCP only + if th.Protocol == gortsplib.StreamProtocolUDP { + return &base.Response{ + StatusCode: base.StatusUnsupportedTransport, + }, nil + } + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Transport": req.Header["Transport"], + "Session": base.HeaderValue{"12345678"}, + }, + }, nil + } + + // called after receiving a PLAY request. + onPlay := func(req *base.Request) (*base.Response, error) { + mutex.Lock() + defer mutex.Unlock() + + readers[conn] = struct{}{} + + conn.EnableReadFrames(true) + conn.EnableReadTimeout(false) + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": base.HeaderValue{"12345678"}, + }, + }, nil + } + + // called after receiving a RECORD request. + onRecord := func(req *base.Request) (*base.Response, error) { + mutex.Lock() + defer mutex.Unlock() + + if conn != publisher { + return &base.Response{ + StatusCode: base.StatusBadRequest, + }, fmt.Errorf("someone is already publishing") + } + + conn.EnableReadFrames(true) + conn.EnableReadTimeout(true) + + return &base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Session": base.HeaderValue{"12345678"}, + }, + }, nil + } + + // called after receiving a Frame. + onFrame := func(trackID int, typ gortsplib.StreamType, buf []byte) { + mutex.Lock() + defer mutex.Unlock() + + // if we are the publisher, route frames to readers + if conn == publisher { + for r := range readers { + r.WriteFrame(trackID, typ, buf) + } + } + } + + err := <-conn.Read(gortsplib.ServerConnReadHandlers{ + OnDescribe: onDescribe, + OnAnnounce: onAnnounce, + OnSetup: onSetup, + OnPlay: onPlay, + OnRecord: onRecord, + OnFrame: onFrame, + }) + log.Printf("client disconnected (%s)", err) + + mutex.Lock() + defer mutex.Unlock() + + if conn == publisher { + publisher = nil + sdp = nil + } +} + +func main() { + // setup certificates - they can be generated with + // openssl genrsa -out server.key 2048 + // openssl req -new -x509 -sha256 -key server.key -out server.crt -days 3650 + cert, err := tls.LoadX509KeyPair("server.crt", "server.key") + if err != nil { + panic(err) + } + conf := gortsplib.ServerConf{ + TLSConfig: &tls.Config{Certificates: []tls.Certificate{cert}}, + } + + // create server + s, err := conf.Serve(":8554") + if err != nil { + panic(err) + } + log.Printf("server is ready") + + // accept connections + for { + conn, err := s.Accept() + if err != nil { + panic(err) + } + + go handleConn(conn) + } +} diff --git a/pkg/base/url.go b/pkg/base/url.go index 199b5913..5f98ebbe 100644 --- a/pkg/base/url.go +++ b/pkg/base/url.go @@ -27,8 +27,8 @@ func ParseURL(s string) (*URL, error) { return nil, err } - if u.Scheme != "rtsp" { - return nil, fmt.Errorf("wrong scheme") + if u.Scheme != "rtsp" && u.Scheme != "rtsps" { + return nil, fmt.Errorf("unsupported scheme '%s'", u.Scheme) } return (*URL)(u), nil diff --git a/server.go b/server.go index 836a0765..3a5aa5e4 100644 --- a/server.go +++ b/server.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "crypto/tls" "net" ) @@ -23,10 +24,15 @@ func (s *Server) Accept() (*ServerConn, error) { return nil, err } + conn := nconn + if s.conf.TLSConfig != nil { + conn = tls.Server(conn, s.conf.TLSConfig) + } + return &ServerConn{ s: s, nconn: nconn, - br: bufio.NewReaderSize(nconn, serverReadBufferSize), - bw: bufio.NewWriterSize(nconn, serverWriteBufferSize), + br: bufio.NewReaderSize(conn, serverReadBufferSize), + bw: bufio.NewWriterSize(conn, serverWriteBufferSize), }, nil } diff --git a/serverconf.go b/serverconf.go index fc7d06b5..87a6cfbe 100644 --- a/serverconf.go +++ b/serverconf.go @@ -1,6 +1,7 @@ package gortsplib import ( + "crypto/tls" "net" "time" ) @@ -16,6 +17,9 @@ func Serve(address string) (*Server, error) { // ServerConf allows to configure a Server. // All fields are optional. type ServerConf struct { + // a TLS configuration to accept TLS (RTSPS) connections. + TLSConfig *tls.Config + // timeout of read operations. // It defaults to 10 seconds ReadTimeout time.Duration diff --git a/serverconf_test.go b/serverconf_test.go index 61db26d3..35670dfd 100644 --- a/serverconf_test.go +++ b/serverconf_test.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "crypto/tls" "fmt" "io" "net" @@ -24,8 +25,12 @@ type testServ struct { readers map[*ServerConn]struct{} } -func newTestServ() (*testServ, error) { - s, err := Serve(":8554") +func newTestServ(tlsConf *tls.Config) (*testServ, error) { + conf := ServerConf{ + TLSConfig: tlsConf, + } + + s, err := conf.Serve(":8554") if err != nil { return nil, err } @@ -182,8 +187,38 @@ func (ts *testServ) handleConn(conn *ServerConn) { } } +func TestServerTeardownResponse(t *testing.T) { + ts, err := newTestServ(nil) + require.NoError(t, err) + defer ts.close() + + conn, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer conn.Close() + bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + + req := base.Request{ + Method: base.Teardown, + URL: base.MustParseURL("rtsp://localhost:8554/"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + }, + } + err = req.Write(bconn.Writer) + require.NoError(t, err) + + var res base.Response + err = res.Read(bconn.Reader) + require.NoError(t, err) + require.Equal(t, base.StatusOK, res.StatusCode) + + buf := make([]byte, 2048) + _, err = bconn.Read(buf) + require.Equal(t, io.EOF, err) +} + func TestServerPublishReadTCP(t *testing.T) { - ts, err := newTestServ() + ts, err := newTestServ(nil) require.NoError(t, err) defer ts.close() @@ -214,32 +249,90 @@ func TestServerPublishReadTCP(t *testing.T) { require.Equal(t, 0, cnt2.wait()) } -func TestServerTeardownResponse(t *testing.T) { - ts, err := newTestServ() +var serverCert = []byte(`-----BEGIN CERTIFICATE----- +MIIDazCCAlOgAwIBAgIUXw1hEC3LFpTsllv7D3ARJyEq7sIwDQYJKoZIhvcNAQEL +BQAwRTELMAkGA1UEBhMCQVUxEzARBgNVBAgMClNvbWUtU3RhdGUxITAfBgNVBAoM +GEludGVybmV0IFdpZGdpdHMgUHR5IEx0ZDAeFw0yMDEyMTMxNzQ0NThaFw0zMDEy +MTExNzQ0NThaMEUxCzAJBgNVBAYTAkFVMRMwEQYDVQQIDApTb21lLVN0YXRlMSEw +HwYDVQQKDBhJbnRlcm5ldCBXaWRnaXRzIFB0eSBMdGQwggEiMA0GCSqGSIb3DQEB +AQUAA4IBDwAwggEKAoIBAQDG8DyyS51810GsGwgWr5rjJK7OE1kTTLSNEEKax8Bj +zOyiaz8rA2JGl2VUEpi2UjDr9Cm7nd+YIEVs91IIBOb7LGqObBh1kGF3u5aZxLkv +NJE+HrLVvUhaDobK2NU+Wibqc/EI3DfUkt1rSINvv9flwTFu1qHeuLWhoySzDKEp +OzYxpFhwjVSokZIjT4Red3OtFz7gl2E6OAWe2qoh5CwLYVdMWtKR0Xuw3BkDPk9I +qkQKx3fqv97LPEzhyZYjDT5WvGrgZ1WDAN3booxXF3oA1H3GHQc4m/vcLatOtb8e +nI59gMQLEbnp08cl873bAuNuM95EZieXTHNbwUnq5iybAgMBAAGjUzBRMB0GA1Ud +DgQWBBQBKhJh8eWu0a4au9X/2fKhkFX2vjAfBgNVHSMEGDAWgBQBKhJh8eWu0a4a +u9X/2fKhkFX2vjAPBgNVHRMBAf8EBTADAQH/MA0GCSqGSIb3DQEBCwUAA4IBAQBj +3aCW0YPKukYgVK9cwN0IbVy/D0C1UPT4nupJcy/E0iC7MXPZ9D/SZxYQoAkdptdO +xfI+RXkpQZLdODNx9uvV+cHyZHZyjtE5ENu/i5Rer2cWI/mSLZm5lUQyx+0KZ2Yu +tEI1bsebDK30msa8QSTn0WidW9XhFnl3gRi4wRdimcQapOWYVs7ih+nAlSvng7NI +XpAyRs8PIEbpDDBMWnldrX4TP6EWYUi49gCp8OUDRREKX3l6Ls1vZ02F34yHIt/7 +7IV/XSKG096bhW+icKBWV0IpcEsgTzPK1J1hMxgjhzIMxGboAeUU+kidthOob6Sd +XQxaORfgM//NzX9LhUPk +-----END CERTIFICATE----- +`) + +var serverKey = []byte(`-----BEGIN RSA PRIVATE KEY----- +MIIEogIBAAKCAQEAxvA8skudfNdBrBsIFq+a4ySuzhNZE0y0jRBCmsfAY8zsoms/ +KwNiRpdlVBKYtlIw6/Qpu53fmCBFbPdSCATm+yxqjmwYdZBhd7uWmcS5LzSRPh6y +1b1IWg6GytjVPlom6nPxCNw31JLda0iDb7/X5cExbtah3ri1oaMkswyhKTs2MaRY +cI1UqJGSI0+EXndzrRc+4JdhOjgFntqqIeQsC2FXTFrSkdF7sNwZAz5PSKpECsd3 +6r/eyzxM4cmWIw0+Vrxq4GdVgwDd26KMVxd6ANR9xh0HOJv73C2rTrW/HpyOfYDE +CxG56dPHJfO92wLjbjPeRGYnl0xzW8FJ6uYsmwIDAQABAoIBACi0BKcyQ3HElSJC +kaAao+Uvnzh4yvPg8Nwf5JDIp/uDdTMyIEWLtrLczRWrjGVZYbsVROinP5VfnPTT +kYwkfKINj2u+gC6lsNuPnRuvHXikF8eO/mYvCTur1zZvsQnF5kp4GGwIqr+qoPUP +bB0UMndG1PdpoMryHe+JcrvTrLHDmCeH10TqOwMsQMLHYLkowvxwJWsmTY7/Qr5S +Wm3PPpOcW2i0uyPVuyuv4yD1368fqnqJ8QFsQp1K6QtYsNnJ71Hut1/IoxK/e6hj +5Z+byKtHVtmcLnABuoOT7BhleJNFBksX9sh83jid4tMBgci+zXNeGmgqo2EmaWAb +agQslkECgYEA8B1rzjOHVQx/vwSzDa4XOrpoHQRfyElrGNz9JVBvnoC7AorezBXQ +M9WTHQIFTGMjzD8pb+YJGi3gj93VN51r0SmJRxBaBRh1ZZI9kFiFzngYev8POgD3 +ygmlS3kTHCNxCK/CJkB+/jMBgtPj5ygDpCWVcTSuWlQFphePkW7jaaECgYEA1Blz +ulqgAyJHZaqgcbcCsI2q6m527hVr9pjzNjIVmkwu38yS9RTCgdlbEVVDnS0hoifl ++jVMEGXjF3xjyMvL50BKbQUH+KAa+V4n1WGlnZOxX9TMny8MBjEuSX2+362vQ3BX +4vOlX00gvoc+sY+lrzvfx/OdPCHQGVYzoKCxhLsCgYA07HcviuIAV/HsO2/vyvhp +xF5gTu+BqNUHNOZDDDid+ge+Jre2yfQLCL8VPLXIQW3Jff53IH/PGl+NtjphuLvj +7UDJvgvpZZuymIojP6+2c3gJ3CASC9aR3JBnUzdoE1O9s2eaoMqc4scpe+SWtZYf +3vzSZ+cqF6zrD/Rf/M35IQKBgHTU4E6ShPm09CcoaeC5sp2WK8OevZw/6IyZi78a +r5Oiy18zzO97U/k6xVMy6F+38ILl/2Rn31JZDVJujniY6eSkIVsUHmPxrWoXV1HO +y++U32uuSFiXDcSLarfIsE992MEJLSAynbF1Rsgsr3gXbGiuToJRyxbIeVy7gwzD +94TpAoGAY4/PejWQj9psZfAhyk5dRGra++gYRQ/gK1IIc1g+Dd2/BxbT/RHr05GK +6vwrfjsoRyMWteC1SsNs/CurjfQ/jqCfHNP5XPvxgd5Ec8sRJIiV7V5RTuWJsPu1 ++3K6cnKEyg+0ekYmLertRFIY6SwWmY1fyKgTvxudMcsBY7dC4xs= +-----END RSA PRIVATE KEY----- +`) + +func TestServerPublishReadTLS(t *testing.T) { + cert, err := tls.X509KeyPair(serverCert, serverKey) + require.NoError(t, err) + tlsConf := &tls.Config{Certificates: []tls.Certificate{cert}} + + ts, err := newTestServ(tlsConf) require.NoError(t, err) defer ts.close() - conn, err := net.Dial("tcp", "localhost:8554") + cnt1, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "tcp", + "rtsps://localhost:8554/teststream", + }) require.NoError(t, err) - defer conn.Close() - bconn := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) + defer cnt1.close() - req := base.Request{ - Method: base.Teardown, - URL: base.MustParseURL("rtsp://localhost:8554/"), - Header: base.Header{ - "CSeq": base.HeaderValue{"1"}, - }, - } - err = req.Write(bconn.Writer) + time.Sleep(1 * time.Second) + + cnt2, err := newContainer("ffmpeg", "read", []string{ + "-rtsp_transport", "tcp", + "-i", "rtsps://localhost:8554/teststream", + "-vframes", "1", + "-f", "image2", + "-y", "/dev/null", + }) require.NoError(t, err) + defer cnt2.close() - var res base.Response - err = res.Read(bconn.Reader) - require.NoError(t, err) - require.Equal(t, base.StatusOK, res.StatusCode) - - buf := make([]byte, 2048) - _, err = bconn.Read(buf) - require.Equal(t, io.EOF, err) + require.Equal(t, 0, cnt2.wait()) } diff --git a/serverconn.go b/serverconn.go index 5c551a0c..287f1316 100644 --- a/serverconn.go +++ b/serverconn.go @@ -349,12 +349,11 @@ func (sc *ServerConn) WriteFrame(trackID int, streamType StreamType, content []b sc.mutex.Lock() defer sc.mutex.Unlock() + sc.nconn.SetWriteDeadline(time.Now().Add(sc.s.conf.WriteTimeout)) frame := base.InterleavedFrame{ TrackID: trackID, StreamType: streamType, Content: content, } - - sc.nconn.SetWriteDeadline(time.Now().Add(sc.s.conf.WriteTimeout)) return frame.Write(sc.bw) }