diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 83ccbe07..29493537 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -2,9 +2,9 @@ name: lint on: push: - branches: [ main, v4 ] + branches: [ main ] pull_request: - branches: [ main, v4 ] + branches: [ main ] jobs: golangci-lint: diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6e666782..ebd8726d 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -2,9 +2,9 @@ name: test on: push: - branches: [ main, v4 ] + branches: [ main ] pull_request: - branches: [ main, v4 ] + branches: [ main ] jobs: test: diff --git a/README.md b/README.md index 893b2be0..2481eed6 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ Features: * Client * Support secure protocol variants (RTSPS, TLS, SRTP, MIKEY) + * Support RTSP-over-HTTP, RTSP-over-HTTPS * Query servers about available media streams * Read media streams from a server ("play") * Read streams with the UDP, UDP-multicast or TCP transport protocol @@ -29,6 +30,7 @@ Features: * Pause without disconnecting from the server * Server * Support secure protocol variants (RTSPS, TLS, SRTP, MIKEY) + * Support RTSP-over-HTTP, RTSP-over-HTTPS * Handle requests from clients * Validate client credentials * Read media streams from clients ("record") @@ -178,6 +180,7 @@ In RTSP, media streams are transmitted by using RTP packets, which are encoded i * [MediaMTX](https://github.com/bluenviron/mediamtx) * [gohlslib](https://github.com/bluenviron/gohlslib) +* [gortmplib](https://github.com/bluenviron/gortmplib) * [mediacommon](https://github.com/bluenviron/mediacommon) * [pion/sdp (SDP library used internally)](https://github.com/pion/sdp) * [pion/rtp (RTP library used internally)](https://github.com/pion/rtp) diff --git a/async_processor.go b/async_processor.go deleted file mode 100644 index 7a5e12d6..00000000 --- a/async_processor.go +++ /dev/null @@ -1,58 +0,0 @@ -package gortsplib - -import ( - "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" -) - -// this is an asynchronous queue processor -// that allows to detach the routine that is reading a stream -// from the routine that is writing a stream. -type asyncProcessor struct { - bufferSize int - - running bool - buffer *ringbuffer.RingBuffer - stopError error - - chStopped chan struct{} -} - -func (w *asyncProcessor) initialize() { - w.buffer, _ = ringbuffer.New(uint64(w.bufferSize)) -} - -func (w *asyncProcessor) close() { - if w.running { - w.buffer.Close() - <-w.chStopped - } -} - -func (w *asyncProcessor) start() { - w.running = true - w.chStopped = make(chan struct{}) - go w.run() -} - -func (w *asyncProcessor) run() { - w.stopError = w.runInner() - close(w.chStopped) -} - -func (w *asyncProcessor) runInner() error { - for { - tmp, ok := w.buffer.Pull() - if !ok { - return nil - } - - err := tmp.(func() error)() - if err != nil { - return err - } - } -} - -func (w *asyncProcessor) push(cb func() error) bool { - return w.buffer.Push(cb) -} diff --git a/async_processor_test.go b/async_processor_test.go deleted file mode 100644 index e2e8456f..00000000 --- a/async_processor_test.go +++ /dev/null @@ -1,24 +0,0 @@ -package gortsplib - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/require" -) - -func TestAsyncProcessorCloseAfterError(t *testing.T) { - p := &asyncProcessor{bufferSize: 8} - p.initialize() - - p.push(func() error { - return fmt.Errorf("ok") - }) - - p.start() - - <-p.chStopped - require.EqualError(t, p.stopError, "ok") - - p.close() -} diff --git a/client.go b/client.go index 864b365e..e5116287 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,7 @@ Examples are available at https://github.com/bluenviron/gortsplib/tree/main/exam package gortsplib import ( + "bufio" "context" "crypto/rand" "crypto/tls" @@ -22,6 +23,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/asyncprocessor" "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/bytecounter" @@ -460,15 +462,17 @@ type Client struct { // a TLS configuration to connect to TLS (RTSPS) servers. // It defaults to nil. TLSConfig *tls.Config + // tunnel. + Tunnel Tunnel + // transport protocol (UDP, Multicast or TCP). + // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). + // It defaults to nil. + Transport *TransportProtocol // enable communication with servers which don't provide UDP server ports // or use different server ports than the announced ones. // This can be a security issue. // It defaults to false. AnyPortEnable bool - // transport protocol (UDP, Multicast or TCP). - // If nil, it is chosen automatically (first UDP, then, if it fails, TCP). - // It defaults to nil. - Transport *TransportProtocol // If the client is reading with UDP, it must receive // at least a packet within this timeout, otherwise it switches to TCP. // It defaults to 3 seconds. @@ -569,7 +573,7 @@ type Client struct { keepAliveTimer *time.Timer closeError error writerMutex sync.RWMutex - writer *asyncProcessor + writer *asyncprocessor.Processor reader *clientReader timeDecoder *rtptime.GlobalDecoder2 mustClose bool @@ -579,13 +583,17 @@ type Client struct { bytesSent *uint64 // in - chOptions chan optionsReq - chDescribe chan describeReq - chAnnounce chan announceReq - chSetup chan setupReq - chPlay chan playReq - chRecord chan recordReq - chPause chan pauseReq + chOptions chan optionsReq + chDescribe chan describeReq + chAnnounce chan announceReq + chSetup chan setupReq + chPlay chan playReq + chRecord chan recordReq + chPause chan pauseReq + chResponse chan *base.Response + chRequest chan *base.Request + chReadError chan error + chWriterError chan error // out done chan struct{} @@ -720,6 +728,10 @@ func (c *Client) Start2() error { c.chPlay = make(chan playReq) c.chRecord = make(chan recordReq) c.chPause = make(chan pauseReq) + c.chResponse = make(chan *base.Response) + c.chRequest = make(chan *base.Request) + c.chReadError = make(chan error) + c.chWriterError = make(chan error) c.done = make(chan struct{}) go c.run() @@ -788,34 +800,6 @@ func (c *Client) run() { func (c *Client) runInner() error { for { - chReaderResponse := func() chan *base.Response { - if c.reader != nil { - return c.reader.chResponse - } - return nil - }() - - chReaderRequest := func() chan *base.Request { - if c.reader != nil { - return c.reader.chRequest - } - return nil - }() - - chReaderError := func() chan error { - if c.reader != nil { - return c.reader.chError - } - return nil - }() - - chWriterError := func() chan struct{} { - if c.writer != nil { - return c.writer.chStopped - } - return nil - }() - select { case req := <-c.chOptions: res, err := c.doOptions(req.url) @@ -887,23 +871,23 @@ func (c *Client) runInner() error { } c.keepAliveTimer = time.NewTimer(c.keepAlivePeriod) - case <-chWriterError: - return c.writer.stopError - - case err := <-chReaderError: - c.reader = nil - return err - - case res := <-chReaderResponse: + case res := <-c.chResponse: c.OnResponse(res) // these are responses to keepalives, ignore them. - case req := <-chReaderRequest: + case req := <-c.chRequest: err := c.handleServerRequest(req) if err != nil { return err } + case err := <-c.chReadError: + c.reader = nil + return err + + case err := <-c.chWriterError: + return err + case <-c.ctx.Done(): return liberrors.ErrClientTerminated{} } @@ -919,11 +903,13 @@ func (c *Client) waitResponse(requestCseqStr string) (*base.Response, error) { case <-t.C: return nil, liberrors.ErrClientRequestTimedOut{} - case err := <-c.reader.chError: - c.reader = nil - return nil, err + case req := <-c.chRequest: + err := c.handleServerRequest(req) + if err != nil { + return nil, err + } - case res := <-c.reader.chResponse: + case res := <-c.chResponse: c.OnResponse(res) // accept response if CSeq equals request CSeq, or if CSeq is not present @@ -931,11 +917,9 @@ func (c *Client) waitResponse(requestCseqStr string) (*base.Response, error) { return res, nil } - case req := <-c.reader.chRequest: - err := c.handleServerRequest(req) - if err != nil { - return nil, err - } + case err := <-c.chReadError: + c.reader = nil + return nil, err case <-c.ctx.Done(): return nil, liberrors.ErrClientTerminated{} @@ -1133,8 +1117,8 @@ func (c *Client) stopTransportRoutines() { func (c *Client) createWriter() { c.writerMutex.Lock() - c.writer = &asyncProcessor{ - bufferSize: func() int { + c.writer = &asyncprocessor.Processor{ + BufferSize: func() int { if c.state == clientStateRecord || c.backChannelSetupped { return c.WriteQueueSize } @@ -1144,19 +1128,25 @@ func (c *Client) createWriter() { // decrease RAM consumption by allocating less buffers. return 8 }(), + OnError: func(ctx context.Context, err error) { + select { + case <-ctx.Done(): + case <-c.ctx.Done(): + case c.chWriterError <- err: + } + }, } - - c.writer.initialize() + c.writer.Initialize() c.writerMutex.Unlock() } func (c *Client) startWriter() { - c.writer.start() + c.writer.Start() } func (c *Client) destroyWriter() { - c.writer.close() + c.writer.Close() c.writerMutex.Lock() c.writer = nil @@ -1175,25 +1165,45 @@ func (c *Client) connOpen() error { dialCtx, dialCtxCancel := context.WithTimeout(c.ctx, c.ReadTimeout) defer dialCtxCancel() - nconn, err := c.DialContext(dialCtx, "tcp", canonicalAddr(&base.URL{ + addr := canonicalAddr(&base.URL{ Scheme: c.Scheme, Host: c.Host, - })) - if err != nil { - return err + }) + + var tlsConfig *tls.Config + if c.Scheme == "rtsps" { + tlsConfig = c.TLSConfig + if tlsConfig == nil { + host, _, _ := net.SplitHostPort(addr) + tlsConfig = &tls.Config{ + ServerName: host, + } + } } - if c.Scheme == "rtsps" { - tlsConfig := c.TLSConfig - if tlsConfig == nil { - tlsConfig = &tls.Config{} + var nconn net.Conn + + if c.Tunnel == TunnelHTTP { + var err error + nconn, err = newClientHTTPTunnel(dialCtx, c.DialContext, addr, tlsConfig) + if err != nil { + return err + } + } else { + var err error + nconn, err = c.DialContext(dialCtx, "tcp", addr) + if err != nil { + return err + } + + if tlsConfig != nil { + nconn = tls.Client(nconn, tlsConfig) } - nconn = tls.Client(nconn, tlsConfig) } c.nconn = nconn bc := bytecounter.New(c.nconn, c.bytesReceived, c.bytesSent) - c.conn = conn.NewConn(bc) + c.conn = conn.NewConn(bufio.NewReader(bc), bc) c.reader = &clientReader{ c: c, } @@ -1648,7 +1658,7 @@ func (c *Client) doSetup( th.Profile = headers.TransportProfileAVP } - if th.Profile == headers.TransportProfileSAVP || c.Scheme == "rtsp" { + if c.Tunnel == TunnelNone && (th.Profile == headers.TransportProfileSAVP || c.Scheme == "rtsp") { protocol = TransportUDP } else { protocol = TransportTCP @@ -2462,7 +2472,9 @@ func (c *Client) Transport2() *ClientTransport { defer c.propsMutex.RUnlock() return &ClientTransport{ - Conn: ConnTransport{}, + Conn: ConnTransport{ + Tunnel: c.Tunnel, + }, Session: c.setuppedTransport, } } diff --git a/client_format.go b/client_format.go index 068f298f..4e8bdb06 100644 --- a/client_format.go +++ b/client_format.go @@ -145,7 +145,7 @@ func (cf *clientFormat) writePacketRTP(pkt *rtp.Packet, ntp time.Time) error { return nil } - ok := cf.cm.c.writer.push(func() error { + ok := cf.cm.c.writer.Push(func() error { return cf.writePacketRTPInQueue(buf) }) if !ok { diff --git a/client_http_tunnel.go b/client_http_tunnel.go new file mode 100644 index 00000000..e9732ed4 --- /dev/null +++ b/client_http_tunnel.go @@ -0,0 +1,182 @@ +package gortsplib + +import ( + "bufio" + "context" + "crypto/tls" + "encoding/base64" + "fmt" + "net" + "net/http" + "strings" + "time" + + "github.com/google/uuid" +) + +type clientHTTPTunnel struct { + readChan net.Conn + readBuf *bufio.Reader + writeChan net.Conn +} + +func (c *clientHTTPTunnel) Read(p []byte) (n int, err error) { + return c.readBuf.Read(p) +} + +func (c *clientHTTPTunnel) Write(p []byte) (n int, err error) { + return c.writeChan.Write([]byte(base64.StdEncoding.EncodeToString(p))) +} + +func (c *clientHTTPTunnel) Close() error { + c.readChan.Close() + c.writeChan.Close() + return nil +} + +func (c *clientHTTPTunnel) LocalAddr() net.Addr { + return c.readChan.LocalAddr() +} + +func (c *clientHTTPTunnel) RemoteAddr() net.Addr { + return c.readChan.RemoteAddr() +} + +func (c *clientHTTPTunnel) SetDeadline(_ time.Time) error { + panic("unimplemented") +} + +func (c *clientHTTPTunnel) SetReadDeadline(t time.Time) error { + return c.readChan.SetReadDeadline(t) +} + +func (c *clientHTTPTunnel) SetWriteDeadline(t time.Time) error { + return c.writeChan.SetWriteDeadline(t) +} + +func newClientHTTPTunnel( + ctx context.Context, + dialContext func(ctx context.Context, network, address string) (net.Conn, error), + addr string, + tlsConfig *tls.Config, +) (net.Conn, error) { + c := &clientHTTPTunnel{} + + var err error + c.readChan, err = dialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + + if tlsConfig != nil { + c.readChan = tls.Client(c.readChan, tlsConfig) + } + + ok := false + + defer func() { + if !ok { + c.readChan.Close() + } + }() + + ctxCheckerReadDone := make(chan struct{}) + defer func() { <-ctxCheckerReadDone }() + + ctxCheckerReadTerminate := make(chan struct{}) + defer close(ctxCheckerReadTerminate) + + go func() { + defer close(ctxCheckerReadDone) + select { + case <-ctx.Done(): + c.readChan.Close() + case <-ctxCheckerReadTerminate: + } + }() + + tunnelID := strings.ReplaceAll(uuid.New().String(), "-", "") + + // do not use http.Request + // since Content-Length requires a Body of same size + _, err = c.readChan.Write([]byte( + "GET / HTTP/1.1\r\n" + + "Host: " + addr + "\r\n" + + "X-Sessioncookie: " + tunnelID + "\r\n" + + "Accept: application/x-rtsp-tunnelled\r\n" + + "Content-Length: 30000\r\n" + + "\r\n", + )) + if err != nil { + return nil, err + } + + c.readBuf = bufio.NewReader(c.readChan) + res, err := http.ReadResponse(c.readBuf, nil) + if err != nil { + return nil, err + } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + c.writeChan, err = dialContext(ctx, "tcp", addr) + if err != nil { + return nil, err + } + + if tlsConfig != nil { + c.writeChan = tls.Client(c.writeChan, tlsConfig) + } + + defer func() { + if !ok { + c.writeChan.Close() + } + }() + + ctxCheckerWriteDone := make(chan struct{}) + defer func() { <-ctxCheckerWriteDone }() + + ctxCheckerWriteTerminate := make(chan struct{}) + defer close(ctxCheckerWriteTerminate) + + go func() { + defer close(ctxCheckerWriteDone) + select { + case <-ctx.Done(): + c.writeChan.Close() + case <-ctxCheckerWriteTerminate: + } + }() + + // do not use http.Request + // since Content-Length requires a Body of same size + _, err = c.writeChan.Write([]byte( + "POST / HTTP/1.1\r\n" + + "Host: " + addr + "\r\n" + + "X-Sessioncookie: " + tunnelID + "\r\n" + + "Content-Type: application/x-rtsp-tunnelled\r\n" + + "Content-Length: 30000\r\n" + + "\r\n", + )) + if err != nil { + return nil, err + } + + writeBuf := bufio.NewReader(c.writeChan) + res, err = http.ReadResponse(writeBuf, nil) + if err != nil { + return nil, err + } + res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("bad status code: %v", res.StatusCode) + } + + ok = true + return c, nil +} diff --git a/client_media.go b/client_media.go index 0a376a72..9e7b75ed 100644 --- a/client_media.go +++ b/client_media.go @@ -423,7 +423,7 @@ func (cm *clientMedia) writePacketRTCP(pkt rtcp.Packet) error { return nil } - ok := cm.c.writer.push(func() error { + ok := cm.c.writer.Push(func() error { return cm.writePacketRTCPInQueue(buf) }) if !ok { diff --git a/client_play_test.go b/client_play_test.go index 1c4f6511..53e7d7a1 100644 --- a/client_play_test.go +++ b/client_play_test.go @@ -1,6 +1,7 @@ package gortsplib import ( + "bufio" "bytes" "crypto/rand" "crypto/tls" @@ -146,7 +147,7 @@ func TestClientPlayFormats(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -307,7 +308,7 @@ func TestClientPlay(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -717,7 +718,7 @@ func TestClientPlaySRTPVariants(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -921,7 +922,7 @@ func TestClientPlayPartial(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1076,7 +1077,7 @@ func TestClientPlayContentBase(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1207,7 +1208,7 @@ func TestClientPlayAnyPort(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1364,7 +1365,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1482,7 +1483,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - co := conn.NewConn(nconn) + co := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := co.ReadRequest() require.NoError(t, err2) @@ -1548,7 +1549,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - co := conn.NewConn(nconn) + co := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := co.ReadRequest() require.NoError(t, err2) @@ -1656,7 +1657,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1753,7 +1754,7 @@ func TestClientPlayAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1889,7 +1890,7 @@ func TestClientPlayDifferentInterleavedIDs(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2011,7 +2012,7 @@ func TestClientPlayRedirect(t *testing.T) { nconn, err = l.Accept() require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2046,7 +2047,7 @@ func TestClientPlayRedirect(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2197,7 +2198,7 @@ func TestClientPlayRedirectPreventDecrypt(t *testing.T) { nconn, err = l.Accept() require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2297,7 +2298,7 @@ func TestClientPlayPausePlay(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2462,7 +2463,7 @@ func TestClientPlayRTCPReport(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2633,7 +2634,7 @@ func TestClientPlayErrorTimeout(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2776,7 +2777,7 @@ func TestClientPlayIgnoreTCPInvalidMedia(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -2895,7 +2896,7 @@ func TestClientPlayKeepAlive(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -3067,7 +3068,7 @@ func TestClientPlayDifferentSource(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -3212,7 +3213,7 @@ func TestClientPlayDecodeErrors(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -3473,7 +3474,7 @@ func TestClientPlayPacketNTP(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -3652,7 +3653,7 @@ func TestClientPlayBackChannel(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) diff --git a/client_reader.go b/client_reader.go index f64d115c..d68fa9a0 100644 --- a/client_reader.go +++ b/client_reader.go @@ -12,17 +12,9 @@ type clientReader struct { mutex sync.Mutex allowInterleavedFrames bool - - chResponse chan *base.Response - chRequest chan *base.Request - chError chan error } func (r *clientReader) start() { - r.chResponse = make(chan *base.Response) - r.chRequest = make(chan *base.Request) - r.chError = make(chan error) - go r.run() } @@ -35,17 +27,16 @@ func (r *clientReader) setAllowInterleavedFrames(v bool) { func (r *clientReader) wait() { for { select { - case <-r.chError: + case <-r.c.chResponse: + case <-r.c.chRequest: + case <-r.c.chReadError: return - - case <-r.chResponse: - case <-r.chRequest: } } } func (r *clientReader) run() { - r.chError <- r.runInner() + r.c.chReadError <- r.runInner() } func (r *clientReader) runInner() error { @@ -57,10 +48,10 @@ func (r *clientReader) runInner() error { switch what := what.(type) { case *base.Response: - r.chResponse <- what + r.c.chResponse <- what case *base.Request: - r.chRequest <- what + r.c.chRequest <- what case *base.InterleavedFrame: r.mutex.Lock() diff --git a/client_record_test.go b/client_record_test.go index a4013d1e..6da2cec5 100644 --- a/client_record_test.go +++ b/client_record_test.go @@ -1,6 +1,7 @@ package gortsplib import ( + "bufio" "bytes" "crypto/rand" "crypto/tls" @@ -180,7 +181,7 @@ func TestClientRecord(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -516,7 +517,7 @@ func TestClientRecordSocketError(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -635,7 +636,7 @@ func TestClientRecordPauseRecordSerial(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -784,7 +785,7 @@ func TestClientRecordPauseRecordParallel(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -963,7 +964,7 @@ func TestClientRecordAutomaticProtocol(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1095,7 +1096,7 @@ func TestClientRecordDecodeErrors(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1266,7 +1267,7 @@ func TestClientRecordRTCPReport(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -1452,7 +1453,7 @@ func TestClientRecordIgnoreTCPRTPPackets(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) diff --git a/client_test.go b/client_test.go index 15201a35..0e11abe8 100644 --- a/client_test.go +++ b/client_test.go @@ -1,12 +1,18 @@ package gortsplib import ( + "bufio" + "bytes" + "crypto/tls" "net" + "net/http" + "net/url" "strings" "testing" "github.com/stretchr/testify/require" + "github.com/bluenviron/gortsplib/v4/internal/base64streamreader" "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/conn" @@ -103,7 +109,7 @@ func TestClientCloseDuringRequest(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -150,7 +156,7 @@ func TestClientSession(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) defer nconn.Close() req, err2 := conn.ReadRequest() @@ -215,7 +221,7 @@ func TestClientAuth(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) defer nconn.Close() req, err2 := conn.ReadRequest() @@ -301,7 +307,7 @@ func TestClientCSeq(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -377,7 +383,7 @@ func TestClientDescribeCharset(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -441,7 +447,7 @@ func TestClientReplyToServerRequest(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) defer nconn.Close() req, err2 := conn.ReadRequest() @@ -535,7 +541,7 @@ func TestClientRelativeContentBase(t *testing.T) { nconn, err2 := l.Accept() require.NoError(t, err2) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) req, err2 := conn.ReadRequest() require.NoError(t, err2) @@ -586,3 +592,184 @@ func TestClientRelativeContentBase(t *testing.T) { require.Equal(t, "rtsp://localhost:8554/relative-content-base", desc.BaseURL.String()) } + +func TestClientHTTPTunnel(t *testing.T) { + for _, ca := range []string{"http", "https"} { + t.Run(ca, func(t *testing.T) { + var l net.Listener + var err error + + if ca == "http" { + l, err = net.Listen("tcp", "localhost:8554") + require.NoError(t, err) + defer l.Close() + } else { + var cert tls.Certificate + cert, err = tls.X509KeyPair(serverCert, serverKey) + require.NoError(t, err) + + l, err = tls.Listen("tcp", "localhost:8554", &tls.Config{Certificates: []tls.Certificate{cert}}) + require.NoError(t, err) + defer l.Close() + } + + var scheme string + if ca == "http" { + scheme = "rtsp" + } else { + scheme = "rtsps" + } + + serverDone := make(chan struct{}) + defer func() { <-serverDone }() + + go func() { + defer close(serverDone) + + nconn1, err2 := l.Accept() + require.NoError(t, err2) + defer nconn1.Close() + + buf1 := bufio.NewReader(nconn1) + req1, err2 := http.ReadRequest(buf1) + require.NoError(t, err2) + + require.Equal(t, &http.Request{ + Method: http.MethodGet, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + URL: &url.URL{ + Path: "/", + }, + Host: "localhost:8554", + RequestURI: "/", + Header: http.Header{ + "Accept": []string{"application/x-rtsp-tunnelled"}, + "Content-Length": []string{"30000"}, + "X-Sessioncookie": req1.Header["X-Sessioncookie"], + }, + ContentLength: 30000, + Body: req1.Body, + }, req1) + + require.NotEmpty(t, req1.Header.Get("X-Sessioncookie")) + + h := http.Header{} + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "close") + h.Set("Content-Type", "application/x-rtsp-tunnelled") + h.Set("Pragma", "no-cache") + res := http.Response{ + StatusCode: http.StatusOK, + ProtoMajor: 1, + ProtoMinor: req1.ProtoMinor, + Header: h, + ContentLength: -1, + } + var resBuf bytes.Buffer + res.Write(&resBuf) //nolint:errcheck + _, err = nconn1.Write(resBuf.Bytes()) + require.NoError(t, err) + + nconn2, err2 := l.Accept() + require.NoError(t, err2) + defer nconn2.Close() + + buf2 := bufio.NewReader(nconn2) + req2, err2 := http.ReadRequest(buf2) + require.NoError(t, err2) + + require.Equal(t, &http.Request{ + Method: http.MethodPost, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + URL: &url.URL{ + Path: "/", + }, + Host: "localhost:8554", + RequestURI: "/", + Header: http.Header{ + "Content-Type": []string{"application/x-rtsp-tunnelled"}, + "Content-Length": []string{"30000"}, + "X-Sessioncookie": req2.Header["X-Sessioncookie"], + }, + ContentLength: 30000, + Body: req2.Body, + }, req2) + + require.Equal(t, req1.Header.Get("X-Sessioncookie"), req2.Header.Get("X-Sessioncookie")) + + h = http.Header{} + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "close") + h.Set("Content-Type", "application/x-rtsp-tunnelled") + h.Set("Pragma", "no-cache") + res = http.Response{ + StatusCode: http.StatusOK, + ProtoMajor: 1, + ProtoMinor: req1.ProtoMinor, + Header: h, + ContentLength: -1, + } + resBuf = bytes.Buffer{} + res.Write(&resBuf) //nolint:errcheck + _, err = nconn2.Write(resBuf.Bytes()) + require.NoError(t, err) + + conn := conn.NewConn(bufio.NewReader(base64streamreader.New(buf2)), nconn1) + + req, err2 := conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Options, req.Method) + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Public": base.HeaderValue{strings.Join([]string{ + string(base.Describe), + }, ", ")}, + }, + }) + require.NoError(t, err2) + + req, err2 = conn.ReadRequest() + require.NoError(t, err2) + require.Equal(t, base.Describe, req.Method) + require.Equal(t, mustParseURL(scheme+"://localhost:8554/teststream"), req.URL) + + medias := []*description.Media{testH264Media} + + err2 = conn.WriteResponse(&base.Response{ + StatusCode: base.StatusOK, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp; charset=utf-8"}, + "Content-Base": base.HeaderValue{"/relative-content-base"}, + }, + Body: mediasToSDP(medias), + }) + require.NoError(t, err2) + }() + + u, err := base.ParseURL(scheme + "://localhost:8554/teststream") + require.NoError(t, err) + + c := Client{ + Scheme: u.Scheme, + Host: u.Host, + Tunnel: TunnelHTTP, + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + + err = c.Start2() + require.NoError(t, err) + defer c.Close() + + _, _, err = c.Describe(u) + require.NoError(t, err) + }) + } +} diff --git a/conn_transport.go b/conn_transport.go index bc8a14a6..1735c74c 100644 --- a/conn_transport.go +++ b/conn_transport.go @@ -1,4 +1,6 @@ package gortsplib // ConnTransport contains details about the transport of a connection. -type ConnTransport struct{} +type ConnTransport struct { + Tunnel Tunnel +} diff --git a/examples/client-play-backchannel/main.go b/examples/client-play-backchannel/main.go index 59681ac1..6443dfd5 100644 --- a/examples/client-play-backchannel/main.go +++ b/examples/client-play-backchannel/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. generate a dummy G711 audio stream. // 2. connect to a RTSP server, find a back channel that supports G711. // 3. route the G711 stream to the channel. diff --git a/examples/client-play-format-av1-to-jpeg/main.go b/examples/client-play-format-av1-to-jpeg/main.go index b40f93c3..9b7c1a12 100644 --- a/examples/client-play-format-av1-to-jpeg/main.go +++ b/examples/client-play-format-av1-to-jpeg/main.go @@ -19,7 +19,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a AV1 stream. // 3. decode the AV1 stream into RGBA frames. diff --git a/examples/client-play-format-av1/main.go b/examples/client-play-format-av1/main.go index 9305b980..422bd7e7 100644 --- a/examples/client-play-format-av1/main.go +++ b/examples/client-play-format-av1/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a AV1 stream. // 3. decode the AV1 stream into RGBA frames. diff --git a/examples/client-play-format-g711/main.go b/examples/client-play-format-g711/main.go index d2dbe62b..5bc01d42 100644 --- a/examples/client-play-format-g711/main.go +++ b/examples/client-play-format-g711/main.go @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a G711 stream. // 3. decode the G711 stream into audio samples. diff --git a/examples/client-play-format-h264-mpeg4audio-to-disk/main.go b/examples/client-play-format-h264-mpeg4audio-to-disk/main.go index 8c82c628..d24a5e3d 100644 --- a/examples/client-play-format-h264-mpeg4audio-to-disk/main.go +++ b/examples/client-play-format-h264-mpeg4audio-to-disk/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H264 stream and a MPEG-4 audio stream. // 3. save the content of these streams in a file in MPEG-TS format. diff --git a/examples/client-play-format-h264-to-disk/main.go b/examples/client-play-format-h264-to-disk/main.go index d19ce8bf..500f0af4 100644 --- a/examples/client-play-format-h264-to-disk/main.go +++ b/examples/client-play-format-h264-to-disk/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H264 stream. // 3. save the content of the format in a file in MPEG-TS format. diff --git a/examples/client-play-format-h264-to-jpeg/main.go b/examples/client-play-format-h264-to-jpeg/main.go index 855e4967..57b93243 100644 --- a/examples/client-play-format-h264-to-jpeg/main.go +++ b/examples/client-play-format-h264-to-jpeg/main.go @@ -19,7 +19,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H264 stream. // 3. decode the H264 stream into RGBA frames. diff --git a/examples/client-play-format-h264/main.go b/examples/client-play-format-h264/main.go index 142ba85a..066cb272 100644 --- a/examples/client-play-format-h264/main.go +++ b/examples/client-play-format-h264/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's an H264 stream. // 3. decode the H264 stream into RGBA frames. diff --git a/examples/client-play-format-h265-to-disk/main.go b/examples/client-play-format-h265-to-disk/main.go index dae8f3b0..f85ce094 100644 --- a/examples/client-play-format-h265-to-disk/main.go +++ b/examples/client-play-format-h265-to-disk/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H265 stream. // 3. save the content of the format in a file in MPEG-TS format. diff --git a/examples/client-play-format-h265-to-jpeg/main.go b/examples/client-play-format-h265-to-jpeg/main.go index 2cafc673..f94d24d2 100644 --- a/examples/client-play-format-h265-to-jpeg/main.go +++ b/examples/client-play-format-h265-to-jpeg/main.go @@ -19,7 +19,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H265 stream. // 3. decode the H265 stream into RGBA frames. diff --git a/examples/client-play-format-h265/main.go b/examples/client-play-format-h265/main.go index 02674fbe..9396a2bc 100644 --- a/examples/client-play-format-h265/main.go +++ b/examples/client-play-format-h265/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a H265 stream. // 3. decode the H265 stream into RGBA frames. diff --git a/examples/client-play-format-lpcm/main.go b/examples/client-play-format-lpcm/main.go index 8f326072..04481057 100644 --- a/examples/client-play-format-lpcm/main.go +++ b/examples/client-play-format-lpcm/main.go @@ -10,7 +10,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a LPCM stream. // 3. get LPCM samples of that format. diff --git a/examples/client-play-format-mjpeg/main.go b/examples/client-play-format-mjpeg/main.go index c60e48e9..19bc5437 100644 --- a/examples/client-play-format-mjpeg/main.go +++ b/examples/client-play-format-mjpeg/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a M-JPEG stream. // 3. get JPEG images of that format. diff --git a/examples/client-play-format-mpeg4audio-to-disk/main.go b/examples/client-play-format-mpeg4audio-to-disk/main.go index a4b309b1..d0e4d816 100644 --- a/examples/client-play-format-mpeg4audio-to-disk/main.go +++ b/examples/client-play-format-mpeg4audio-to-disk/main.go @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a MPEG-4 audio stream. // 3. save the content of the format in a file in MPEG-TS format. diff --git a/examples/client-play-format-mpeg4audio/main.go b/examples/client-play-format-mpeg4audio/main.go index 00d500e4..8dd54bc9 100644 --- a/examples/client-play-format-mpeg4audio/main.go +++ b/examples/client-play-format-mpeg4audio/main.go @@ -10,7 +10,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a MPEG-4 audio stream. // 3. get access units of that format. diff --git a/examples/client-play-format-opus-to-disk/main.go b/examples/client-play-format-opus-to-disk/main.go index 71d513f9..840c04c9 100644 --- a/examples/client-play-format-opus-to-disk/main.go +++ b/examples/client-play-format-opus-to-disk/main.go @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a Opus stream. // 3. save the content of the format in a file in MPEG-TS format. diff --git a/examples/client-play-format-opus/main.go b/examples/client-play-format-opus/main.go index 0e36a327..92e977ca 100644 --- a/examples/client-play-format-opus/main.go +++ b/examples/client-play-format-opus/main.go @@ -10,7 +10,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's an Opus stream. // 3. get Opus packets of that format. diff --git a/examples/client-play-format-vp8/main.go b/examples/client-play-format-vp8/main.go index a02bc1d6..3a5abdb1 100644 --- a/examples/client-play-format-vp8/main.go +++ b/examples/client-play-format-vp8/main.go @@ -13,7 +13,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a VP8 stream. // 3. decode the VP8 stream into RGBA frames. diff --git a/examples/client-play-format-vp9/main.go b/examples/client-play-format-vp9/main.go index c382f1f1..aa786325 100644 --- a/examples/client-play-format-vp9/main.go +++ b/examples/client-play-format-vp9/main.go @@ -13,7 +13,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. check if there's a VP9 stream. // 3. decode the VP9 stream into RGBA frames. diff --git a/examples/client-play-options/main.go b/examples/client-play-options/main.go index 0aa728f3..50379502 100644 --- a/examples/client-play-options/main.go +++ b/examples/client-play-options/main.go @@ -13,7 +13,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. set additional client options. // 2. connect to a RTSP server and read all medias on a path. diff --git a/examples/client-play-pause/main.go b/examples/client-play-pause/main.go index 5337ea69..2e20eb95 100644 --- a/examples/client-play-pause/main.go +++ b/examples/client-play-pause/main.go @@ -13,7 +13,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server and read all medias on a path. // 2. wait for 5 seconds. // 3. pause for 5 seconds. diff --git a/examples/client-play-timestamp/main.go b/examples/client-play-timestamp/main.go index 445bf87d..1596853e 100644 --- a/examples/client-play-timestamp/main.go +++ b/examples/client-play-timestamp/main.go @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. read all media streams on a path. // 3. Get PTS and NTP of incoming RTP packets. diff --git a/examples/client-play-to-record/main.go b/examples/client-play-to-record/main.go index bbae9e29..3f61f267 100644 --- a/examples/client-play-to-record/main.go +++ b/examples/client-play-to-record/main.go @@ -11,7 +11,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. read all medias on a path. // 3. re-publish all medias on another path. diff --git a/examples/client-play/main.go b/examples/client-play/main.go index 482be3ba..1b0fea75 100644 --- a/examples/client-play/main.go +++ b/examples/client-play/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. read all media streams on a path. diff --git a/examples/client-query/main.go b/examples/client-query/main.go index a3f287d2..0ea401eb 100644 --- a/examples/client-query/main.go +++ b/examples/client-query/main.go @@ -8,7 +8,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/base" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server. // 2. get and print informations about medias published on a path. diff --git a/examples/client-record-format-av1/main.go b/examples/client-record-format-av1/main.go index 4869be8b..7d274ffb 100644 --- a/examples/client-record-format-av1/main.go +++ b/examples/client-record-format-av1/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce an AV1 format. // 2. generate dummy RGBA images. // 3. encode images with AV1. diff --git a/examples/client-record-format-g711/main.go b/examples/client-record-format-g711/main.go index cd42af8e..175dce57 100644 --- a/examples/client-record-format-g711/main.go +++ b/examples/client-record-format-g711/main.go @@ -13,7 +13,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a G711 format. // 2. generate dummy LPCM audio samples. // 3. encode audio samples with G711. diff --git a/examples/client-record-format-h264-from-disk/main.go b/examples/client-record-format-h264-from-disk/main.go index dda71989..cc3af984 100644 --- a/examples/client-record-format-h264-from-disk/main.go +++ b/examples/client-record-format-h264-from-disk/main.go @@ -17,7 +17,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. read H264 frames from a video file in MPEG-TS format. // 2. connect to a RTSP server, announce a H264 format. // 3. wrap frames into RTP packets. diff --git a/examples/client-record-format-h264/main.go b/examples/client-record-format-h264/main.go index 60168e4f..aa4d99f1 100644 --- a/examples/client-record-format-h264/main.go +++ b/examples/client-record-format-h264/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce an H264 format. // 2. generate dummy RGBA images. // 3. encode images with H264. diff --git a/examples/client-record-format-h265/main.go b/examples/client-record-format-h265/main.go index 832f1140..a03dd483 100644 --- a/examples/client-record-format-h265/main.go +++ b/examples/client-record-format-h265/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce an H265 format. // 2. generate dummy RGBA images. // 3. encode images with H265. diff --git a/examples/client-record-format-lpcm/main.go b/examples/client-record-format-lpcm/main.go index 23fd8189..0b9f114f 100644 --- a/examples/client-record-format-lpcm/main.go +++ b/examples/client-record-format-lpcm/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce an LPCM format. // 2. generate dummy LPCM audio samples. // 3. generate RTP packets from LPCM audio samples. diff --git a/examples/client-record-format-mjpeg/main.go b/examples/client-record-format-mjpeg/main.go index f7c6a5bf..193c33d5 100644 --- a/examples/client-record-format-mjpeg/main.go +++ b/examples/client-record-format-mjpeg/main.go @@ -14,7 +14,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a M-JPEG format. // 2. generate dummy RGBA images. // 3. encode images with JPEG. diff --git a/examples/client-record-format-mpeg4audio/main.go b/examples/client-record-format-mpeg4audio/main.go index b3a82fb8..79b6d5b6 100644 --- a/examples/client-record-format-mpeg4audio/main.go +++ b/examples/client-record-format-mpeg4audio/main.go @@ -14,7 +14,7 @@ import ( "github.com/bluenviron/mediacommon/v2/pkg/codecs/mpeg4audio" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a MPEG-4 Audio (AAC) format. // 2. generate dummy LPCM audio samples. // 3. encode audio samples with MPEG-4 Audio (AAC). diff --git a/examples/client-record-format-opus/main.go b/examples/client-record-format-opus/main.go index bd10bf2a..49377140 100644 --- a/examples/client-record-format-opus/main.go +++ b/examples/client-record-format-opus/main.go @@ -16,7 +16,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a Opus format. // 2. generate dummy LPCM audio samples. // 3. encode audio samples with Opus. diff --git a/examples/client-record-format-vp8/main.go b/examples/client-record-format-vp8/main.go index d6d80444..95de1e06 100644 --- a/examples/client-record-format-vp8/main.go +++ b/examples/client-record-format-vp8/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a VP8 format. // 2. generate dummy RGBA images. // 3. encode images with VP8. diff --git a/examples/client-record-format-vp9/main.go b/examples/client-record-format-vp9/main.go index 76c2d9ea..436313fe 100644 --- a/examples/client-record-format-vp9/main.go +++ b/examples/client-record-format-vp9/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce a VP9 format. // 2. generate dummy RGBA images. // 3. encode images with VP9. diff --git a/examples/client-record-options/main.go b/examples/client-record-options/main.go index 54b95a3e..f28eb918 100644 --- a/examples/client-record-options/main.go +++ b/examples/client-record-options/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. set additional client options. // 2. connect to a RTSP server, announce an H264 format. // 3. generate dummy RGBA images. diff --git a/examples/client-record-pause/main.go b/examples/client-record-pause/main.go index bac670ca..1b271453 100644 --- a/examples/client-record-pause/main.go +++ b/examples/client-record-pause/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. connect to a RTSP server, announce an H264 format. // 2. generate dummy RGBA images. // 3. encode images with H264. diff --git a/examples/proxy-backchannel/main.go b/examples/proxy-backchannel/main.go index 313b345a..95a82f03 100644 --- a/examples/proxy-backchannel/main.go +++ b/examples/proxy-backchannel/main.go @@ -3,7 +3,7 @@ package main import "log" -// This example shows how to +// This example shows how to: // 1. create a server that serves a single stream. // 2. create a client, that reads an existing stream from another server or camera, containing a back channel. // 3. route the stream from the client to the server, and from the server to all connected readers. diff --git a/examples/proxy/main.go b/examples/proxy/main.go index 01824c1f..7f419d01 100644 --- a/examples/proxy/main.go +++ b/examples/proxy/main.go @@ -3,7 +3,7 @@ package main import "log" -// This example shows how to +// This example shows how to: // 1. create a server that serves a single stream. // 2. create a client, that reads an existing stream from another server or camera. // 3. route the stream from the client to the server, and from the server to all connected readers. diff --git a/examples/server-auth/main.go b/examples/server-auth/main.go index 43fac880..b004790b 100644 --- a/examples/server-auth/main.go +++ b/examples/server-auth/main.go @@ -14,7 +14,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which accepts plain connections. // 2. allow a single client to publish a stream, if it provides credentials. // 3. allow several clients to read the stream, if they provide credentials. diff --git a/examples/server-play-backchannel/main.go b/examples/server-play-backchannel/main.go index 323a2fea..885159b0 100644 --- a/examples/server-play-backchannel/main.go +++ b/examples/server-play-backchannel/main.go @@ -12,7 +12,7 @@ import ( "github.com/pion/rtp" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which accepts plain connections. // 2. create a stream with an audio direct channel and an audio back channel. // 3. write the audio direct channel to readers, read the back channel from readers. diff --git a/examples/server-play-format-h264-from-disk/main.go b/examples/server-play-format-h264-from-disk/main.go index 9c30c4af..6101b626 100644 --- a/examples/server-play-format-h264-from-disk/main.go +++ b/examples/server-play-format-h264-from-disk/main.go @@ -11,7 +11,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which accepts plain connections. // 2. read from disk a MPEG-TS file which contains a H264 track. // 3. serve the content of the file to all connected readers. diff --git a/examples/server-record-format-h264-to-disk/main.go b/examples/server-record-format-h264-to-disk/main.go index 0f4ad1b1..17f1d5fd 100644 --- a/examples/server-record-format-h264-to-disk/main.go +++ b/examples/server-record-format-h264-to-disk/main.go @@ -15,7 +15,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format/rtph264" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which accepts plain connections. // 2. allow a single client to publish a stream, containing a H264 format. // 3. save the content of the H264 media in a file in MPEG-TS format. diff --git a/examples/server-secure/main.go b/examples/server-secure/main.go index bd86aa92..1f2b0201 100644 --- a/examples/server-secure/main.go +++ b/examples/server-secure/main.go @@ -14,7 +14,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which uses secure protocols only (RTSPS, TLS, SRTP). // 2. allow a single client to publish a stream. // 3. allow several clients to read the stream. diff --git a/examples/server/main.go b/examples/server/main.go index 10d57ede..1f138410 100644 --- a/examples/server/main.go +++ b/examples/server/main.go @@ -13,7 +13,7 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/format" ) -// This example shows how to +// This example shows how to: // 1. create a RTSP server which accepts plain connections. // 2. allow a single client to publish a stream. // 3. allow several clients to read the stream. diff --git a/internal/asyncprocessor/async_processor.go b/internal/asyncprocessor/async_processor.go new file mode 100644 index 00000000..9731bf10 --- /dev/null +++ b/internal/asyncprocessor/async_processor.go @@ -0,0 +1,72 @@ +// Package asyncprocessor contains an asynchronous processor. +package asyncprocessor + +import ( + "context" + + "github.com/bluenviron/gortsplib/v4/pkg/ringbuffer" +) + +// Processor is an asynchronous queue processor +// that allows to detach the routine that is reading a stream +// from the routine that is writing a stream. +type Processor struct { + BufferSize int + OnError func(context.Context, error) + + running bool + buffer *ringbuffer.RingBuffer + ctx context.Context + ctxCancel func() + + done chan struct{} +} + +// Initialize initializes the processor. +func (w *Processor) Initialize() { + w.buffer, _ = ringbuffer.New(uint64(w.BufferSize)) + w.ctx, w.ctxCancel = context.WithCancel(context.Background()) + w.done = make(chan struct{}) +} + +// Close closes the processor. +func (w *Processor) Close() { + w.ctxCancel() + w.buffer.Close() + + if w.running { + <-w.done + } +} + +// Start starts the processor. +func (w *Processor) Start() { + w.running = true + go w.run() +} + +func (w *Processor) run() { + defer close(w.done) + + err := w.runInner() + w.OnError(w.ctx, err) +} + +func (w *Processor) runInner() error { + for { + tmp, ok := w.buffer.Pull() + if !ok { + return nil + } + + err := tmp.(func() error)() + if err != nil { + return err + } + } +} + +// Push pushes data to the queue. +func (w *Processor) Push(cb func() error) bool { + return w.buffer.Push(cb) +} diff --git a/internal/asyncprocessor/async_processor_test.go b/internal/asyncprocessor/async_processor_test.go new file mode 100644 index 00000000..38651ad3 --- /dev/null +++ b/internal/asyncprocessor/async_processor_test.go @@ -0,0 +1,71 @@ +package asyncprocessor + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCloseBeforeStart(_ *testing.T) { + p := &Processor{ + BufferSize: 8, + } + p.Initialize() + defer p.Close() +} + +func TestCloseAfterError(t *testing.T) { + done := make(chan struct{}) + + p := &Processor{ + BufferSize: 8, + OnError: func(_ context.Context, err error) { + require.EqualError(t, err, "ok") + close(done) + }, + } + p.Initialize() + defer p.Close() + + p.Push(func() error { + return fmt.Errorf("ok") + }) + + p.Start() + + <-done +} + +func TestCloseBeforeError(_ *testing.T) { + p := &Processor{ + BufferSize: 8, + OnError: func(_ context.Context, _ error) {}, + } + p.Initialize() + defer p.Close() + + p.Push(func() error { + return nil + }) + + p.Start() +} + +func TestCloseDuringError(_ *testing.T) { + p := &Processor{ + BufferSize: 8, + OnError: func(ctx context.Context, _ error) { + <-ctx.Done() + }, + } + p.Initialize() + defer p.Close() + + p.Push(func() error { + return fmt.Errorf("ok") + }) + + p.Start() +} diff --git a/internal/base64streamreader/reader.go b/internal/base64streamreader/reader.go new file mode 100644 index 00000000..c0335791 --- /dev/null +++ b/internal/base64streamreader/reader.go @@ -0,0 +1,65 @@ +// Package base64streamreader contains a base64 reader for a stream-based connection. +package base64streamreader + +import ( + "bytes" + "encoding/base64" + "io" +) + +const ( + readSize = 1024 +) + +type reader struct { + r io.Reader + predec []byte + postdec []byte +} + +func (r *reader) Read(p []byte) (int, error) { + for len(r.postdec) == 0 { + todec := r.predec + + if len(todec)%4 != 0 { + todec = todec[:(len(todec)/4)*4] + } + + if i := bytes.IndexByte(todec, '='); i >= 0 { + if len(todec) > (i+1) && todec[i+1] == '=' { + i++ + } + todec = todec[:i+1] + } + + if len(todec) == 0 { + buf := make([]byte, readSize) + n, err := r.r.Read(buf) + if err != nil && n == 0 { + return 0, err + } + + r.predec = append(r.predec, buf[:n]...) + continue + } + + r.predec = r.predec[len(todec):] + + out, err := base64.StdEncoding.DecodeString(string(todec)) + if err != nil { + return 0, err + } + + r.postdec = append(r.postdec, out...) + } + + n := copy(p, r.postdec) + r.postdec = r.postdec[n:] + + return n, nil +} + +// New allocates a base64 stream reader. +func New(r io.Reader) io.Reader { + return &reader{r: r} +} diff --git a/internal/base64streamreader/reader_test.go b/internal/base64streamreader/reader_test.go new file mode 100644 index 00000000..5f8fe0f0 --- /dev/null +++ b/internal/base64streamreader/reader_test.go @@ -0,0 +1,115 @@ +package base64streamreader + +import ( + "io" + "testing" + + "github.com/stretchr/testify/require" +) + +type dummyReader struct { + input []string + pos int +} + +func (r *dummyReader) Read(p []byte) (int, error) { + if r.pos >= len(r.input) { + return 0, io.EOF + } + + n := copy(p, r.input[r.pos]) + r.pos++ + + return n, nil +} + +func TestReader(t *testing.T) { + for _, ca := range []struct { + name string + input []string + output []string + }{ + { + "standard", + []string{ + "dGVzdGluZyAxIDIgMw==", + }, + []string{"testing 1 2 3"}, + }, + { + "concatenated", + []string{ + "dGVzdGluZyAxIDIgMw==b3RoZXIgdGVzdA==", + }, + []string{ + "testing 1 2 3", + "other test", + }, + }, + { + "splitted evenly", + []string{ + "dGVz", + "dGluZyAxIDIgMw==", + }, + []string{ + "tes", + "ting 1 2 3", + }, + }, + { + "splitted unevenly", + []string{ + "dGV", + "zdGluZyAxIDIgMw==", + }, + []string{ + "testing 1 2 3", + }, + }, + { + "concatenated and splitted evenly", + []string{ + "dGVzdGluZyAxIDIgMw==b3RoZXIgdGVz", + "dA==", + }, + []string{ + "testing 1 2 3", + "other tes", + "t", + }, + }, + { + "concatenated and splitted unevenly", + []string{ + "dGVzdGluZyAxIDIgMw==b3RoZXIgdGVzdA=", + "=", + }, + []string{ + "testing 1 2 3", + "other tes", + "t", + }, + }, + } { + t.Run(ca.name, func(t *testing.T) { + dr := &dummyReader{input: ca.input} + r := New(dr) + + var output []string + + for { + buf := make([]byte, 512) + n, err := r.Read(buf) + if err == io.EOF { + break + } + require.NoError(t, err) + + output = append(output, string(buf[:n])) + } + + require.Equal(t, ca.output, output) + }) + } +} diff --git a/internal/teste2e/client_vs_server_test.go b/internal/teste2e/client_vs_server_test.go index bb816db0..f00f2af6 100644 --- a/internal/teste2e/client_vs_server_test.go +++ b/internal/teste2e/client_vs_server_test.go @@ -5,6 +5,7 @@ package teste2e import ( "crypto/tls" "net" + "strings" "testing" "time" @@ -46,60 +47,100 @@ func TestClientVsServer(t *testing.T) { for _, ca := range []struct { publisherScheme string publisherProto string + publisherTunnel string readerScheme string readerProto string + readerTunnel string }{ { publisherScheme: "rtsp", publisherProto: "udp", + publisherTunnel: "none", readerScheme: "rtsp", readerProto: "udp", + readerTunnel: "none", }, { publisherScheme: "rtsp", publisherProto: "tcp", + publisherTunnel: "none", readerScheme: "rtsp", readerProto: "udp", + readerTunnel: "none", }, { publisherScheme: "rtsp", publisherProto: "tcp", + publisherTunnel: "none", readerScheme: "rtsp", readerProto: "tcp", + readerTunnel: "none", }, { publisherScheme: "rtsp", publisherProto: "udp", + publisherTunnel: "none", readerScheme: "rtsp", readerProto: "tcp", + readerTunnel: "none", }, { publisherScheme: "rtsp", publisherProto: "udp", + publisherTunnel: "none", readerScheme: "rtsp", readerProto: "multicast", + readerTunnel: "none", }, { publisherScheme: "rtsps", publisherProto: "tcp", + publisherTunnel: "none", readerScheme: "rtsps", readerProto: "tcp", + readerTunnel: "none", }, { publisherScheme: "rtsps", publisherProto: "udp", + publisherTunnel: "none", readerScheme: "rtsps", readerProto: "tcp", + readerTunnel: "none", }, { publisherScheme: "rtsps", publisherProto: "udp", + publisherTunnel: "none", readerScheme: "rtsps", readerProto: "multicast", + readerTunnel: "none", + }, + { + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherTunnel: "http", + readerScheme: "rtsp", + readerProto: "udp", + readerTunnel: "none", + }, + { + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherTunnel: "none", + readerScheme: "rtsp", + readerProto: "tcp", + readerTunnel: "http", }, } { - t.Run(ca.publisherScheme+"_"+ca.publisherProto+"_"+ - ca.readerScheme+"_"+ca.readerProto, func(t *testing.T) { + t.Run(strings.Join([]string{ + ca.publisherScheme, + ca.publisherProto, + ca.publisherTunnel, + ca.readerScheme, + ca.readerProto, + ca.readerTunnel, + }, "_"), func(t *testing.T) { ss := &sampleServer{} if ca.publisherScheme == "rtsps" { @@ -124,6 +165,13 @@ func TestClientVsServer(t *testing.T) { }, } + var publisherTunnel gortsplib.Tunnel + if ca.publisherTunnel == "http" { + publisherTunnel = gortsplib.TunnelHTTP + } else { + publisherTunnel = gortsplib.TunnelNone + } + var publisherProto gortsplib.Transport switch ca.publisherProto { case "udp": @@ -134,6 +182,7 @@ func TestClientVsServer(t *testing.T) { publisher := &gortsplib.Client{ TLSConfig: &tls.Config{InsecureSkipVerify: true}, + Tunnel: publisherTunnel, Transport: &publisherProto, } err = publisher.StartRecording(ca.publisherScheme+"://127.0.0.1:8554/test/stream?key=val", desc) @@ -142,6 +191,13 @@ func TestClientVsServer(t *testing.T) { time.Sleep(1 * time.Second) + var readerTunnel gortsplib.Tunnel + if ca.readerTunnel == "http" { + readerTunnel = gortsplib.TunnelHTTP + } else { + readerTunnel = gortsplib.TunnelNone + } + var readerProto gortsplib.Transport switch ca.readerProto { case "udp": @@ -159,6 +215,7 @@ func TestClientVsServer(t *testing.T) { Scheme: u.Scheme, Host: u.Host, TLSConfig: &tls.Config{InsecureSkipVerify: true}, + Tunnel: readerTunnel, Transport: &readerProto, } err = reader.Start2() diff --git a/internal/teste2e/server_vs_external_test.go b/internal/teste2e/server_vs_external_test.go index 0c0e11e3..bf5f88a3 100644 --- a/internal/teste2e/server_vs_external_test.go +++ b/internal/teste2e/server_vs_external_test.go @@ -8,6 +8,7 @@ import ( "os/exec" "path/filepath" "strconv" + "strings" "sync" "testing" "time" @@ -89,208 +90,294 @@ func TestServerVsExternal(t *testing.T) { wg.Wait() for _, ca := range []struct { - publisherSoft string - publisherScheme string - publisherProto string - publisherSecure string - readerSoft string - readerScheme string - readerProto string - readerSecure string + publisherSoft string + publisherScheme string + publisherProto string + publisherProfile string + publisherTunnel string + readerSoft string + readerScheme string + readerProto string + readerProfile string + readerTunnel string }{ { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "udp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "udp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsp", - readerProto: "udp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "udp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "udp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "udp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsp", - readerProto: "udp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "udp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "multicast", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "multicast", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsp", - readerProto: "multicast", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "multicast", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsp", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsp", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsp", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsp", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "udp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "udp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsp", - publisherProto: "udp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsp", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsp", + publisherProto: "udp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsps", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsps", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsps", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsps", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsps", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsps", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsps", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsps", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsps", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "ffmpeg", - readerScheme: "rtsps", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsps", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsps", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsps", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsps", - readerProto: "tcp", - readerSecure: "unsecure", + publisherSoft: "gstreamer", + publisherScheme: "rtsps", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsps", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", }, { - publisherSoft: "ffmpeg", - publisherScheme: "rtsps", - publisherProto: "tcp", - publisherSecure: "unsecure", - readerSoft: "gstreamer", - readerScheme: "rtsps", - readerProto: "udp", - readerSecure: "secure", + publisherSoft: "ffmpeg", + publisherScheme: "rtsps", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsps", + readerProto: "udp", + readerProfile: "savp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsps", - publisherProto: "udp", - publisherSecure: "secure", - readerSoft: "gstreamer", - readerScheme: "rtsps", - readerProto: "udp", - readerSecure: "secure", + publisherSoft: "gstreamer", + publisherScheme: "rtsps", + publisherProto: "udp", + publisherProfile: "savp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsps", + readerProto: "udp", + readerProfile: "savp", + readerTunnel: "none", }, { - publisherSoft: "gstreamer", - publisherScheme: "rtsps", - publisherProto: "udp", - publisherSecure: "secure", - readerSoft: "gstreamer", - readerScheme: "rtsps", - readerProto: "multicast", - readerSecure: "secure", + publisherSoft: "gstreamer", + publisherScheme: "rtsps", + publisherProto: "udp", + publisherProfile: "savp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsps", + readerProto: "multicast", + readerProfile: "savp", + readerTunnel: "none", + }, + { + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "http", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "none", + }, + { + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "ffmpeg", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "http", + }, + { + publisherSoft: "gstreamer", + publisherScheme: "rtsp", + publisherProto: "tcp", + publisherProfile: "avp", + publisherTunnel: "none", + readerSoft: "gstreamer", + readerScheme: "rtsp", + readerProto: "tcp", + readerProfile: "avp", + readerTunnel: "http", }, } { - t.Run(ca.publisherSoft+"_"+ca.publisherScheme+"_"+ca.publisherProto+"_"+ca.publisherSecure+"_"+ - ca.readerSoft+"_"+ca.readerScheme+"_"+ca.readerProto+"_"+ca.readerSecure, func(t *testing.T) { + t.Run(strings.Join([]string{ + ca.publisherSoft, + ca.publisherScheme, + ca.publisherProto, + ca.publisherProfile, + ca.publisherTunnel, + ca.readerSoft, + ca.readerScheme, + ca.readerProto, + ca.readerProfile, + ca.readerTunnel, + }, "_"), func(t *testing.T) { ss := &sampleServer{} if ca.publisherScheme == "rtsps" { @@ -318,8 +405,15 @@ func TestServerVsExternal(t *testing.T) { defer cnt1.close() case "gstreamer": + var scheme string + if ca.publisherTunnel == "http" { + scheme = "rtsph" + } else { + scheme = ca.publisherScheme + } + var profile string - if ca.publisherSecure == "secure" { + if ca.publisherProfile == "savp" { profile = "GST_RTSP_PROFILE_SAVP" } else { profile = "GST_RTSP_PROFILE_AVP" @@ -327,7 +421,7 @@ func TestServerVsExternal(t *testing.T) { cnt1, err := newContainer("gstreamer", "publish", []string{ "filesrc location=emptyvideo.mkv ! matroskademux ! video/x-h264 ! rtspclientsink " + - "location=" + ca.publisherScheme + "://127.0.0.1:8554/test/stream?key=val" + + "location=" + scheme + "://127.0.0.1:8554/test/stream?key=val" + " protocols=" + ca.publisherProto + " profiles=" + profile + " tls-validation-flags=0 latency=0 timeout=0 rtx-time=0", @@ -343,9 +437,14 @@ func TestServerVsExternal(t *testing.T) { switch ca.readerSoft { case "ffmpeg": var proto string - if ca.readerProto == "multicast" { + switch { + case ca.readerTunnel == "http": + proto = "http" + + case ca.readerProto == "multicast": proto = "udp_multicast" - } else { + + default: proto = ca.readerProto } @@ -361,6 +460,13 @@ func TestServerVsExternal(t *testing.T) { require.Equal(t, 0, cnt2.wait()) case "gstreamer": + var scheme string + if ca.readerTunnel == "http" { + scheme = "rtsph" + } else { + scheme = ca.readerScheme + } + var proto string if ca.readerProto == "multicast" { proto = "udp-mcast" @@ -369,7 +475,7 @@ func TestServerVsExternal(t *testing.T) { } cnt2, err := newContainer("gstreamer", "read", []string{ - "rtspsrc location=" + ca.readerScheme + "://127.0.0.1:8554/test/stream?key=val" + + "rtspsrc location=" + scheme + "://127.0.0.1:8554/test/stream?key=val" + " protocols=" + proto + " tls-validation-flags=0 latency=0 " + "! application/x-rtp,media=video ! decodebin ! video/x-raw ! fakesink num-buffers=1", diff --git a/pkg/conn/conn.go b/pkg/conn/conn.go index 5bdc9186..235338ff 100644 --- a/pkg/conn/conn.go +++ b/pkg/conn/conn.go @@ -8,24 +8,20 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/base" ) -const ( - readBufferSize = 4096 -) - // Conn is a RTSP connection. type Conn struct { - w io.Writer br *bufio.Reader + w io.Writer // reuse interleaved frames. they should never be passed to secondary routines fr base.InterleavedFrame } // NewConn allocates a Conn. -func NewConn(rw io.ReadWriter) *Conn { +func NewConn(br *bufio.Reader, w io.Writer) *Conn { return &Conn{ - w: rw, - br: bufio.NewReaderSize(rw, readBufferSize), + br: br, + w: w, } } diff --git a/pkg/conn/conn_test.go b/pkg/conn/conn_test.go index e0518620..cc4efa22 100644 --- a/pkg/conn/conn_test.go +++ b/pkg/conn/conn_test.go @@ -1,6 +1,7 @@ package conn import ( + "bufio" "bytes" "testing" @@ -68,7 +69,7 @@ func TestRead(t *testing.T) { } { t.Run(ca.name, func(t *testing.T) { buf := bytes.NewBuffer(ca.enc) - conn := NewConn(buf) + conn := NewConn(bufio.NewReader(buf), buf) dec, err := conn.Read() require.NoError(t, err) require.Equal(t, ca.dec, dec) @@ -85,7 +86,7 @@ func TestReadConsecutiveFrameMagicBytes(t *testing.T) { // another interleaved frame 0x24, 0x6, 0x0, 0x4, 0x1, 0x2, 0x3, 0x4, }) - conn := NewConn(buf) + conn := NewConn(bufio.NewReader(buf), buf) dec1, err := conn.Read() require.NoError(t, err) require.Equal(t, @@ -104,14 +105,14 @@ func TestReadConsecutiveFrameMagicBytes(t *testing.T) { func TestReadError(t *testing.T) { var buf bytes.Buffer - conn := NewConn(&buf) + conn := NewConn(bufio.NewReader(&buf), &buf) _, err := conn.Read() require.Error(t, err) } func TestWriteRequest(t *testing.T) { var buf bytes.Buffer - conn := NewConn(&buf) + conn := NewConn(bufio.NewReader(&buf), &buf) err := conn.WriteRequest(&base.Request{ Method: "OPTIONS", URL: mustParseURL("rtsp://example.com/media.mp4"), @@ -126,7 +127,7 @@ func TestWriteRequest(t *testing.T) { func TestWriteResponse(t *testing.T) { var buf bytes.Buffer - conn := NewConn(&buf) + conn := NewConn(bufio.NewReader(&buf), &buf) err := conn.WriteResponse(&base.Response{ StatusCode: base.StatusOK, StatusMessage: "OK", @@ -145,7 +146,7 @@ func TestWriteResponse(t *testing.T) { func TestWriteInterleavedFrame(t *testing.T) { var buf bytes.Buffer - conn := NewConn(&buf) + conn := NewConn(bufio.NewReader(&buf), &buf) err := conn.WriteInterleavedFrame(&base.InterleavedFrame{ Channel: 6, Payload: []byte{0x01, 0x02, 0x03, 0x04}, diff --git a/pkg/multibuffer/multibuffer.go b/pkg/multibuffer/multibuffer.go index 0e30dd34..aa7c4bb2 100644 --- a/pkg/multibuffer/multibuffer.go +++ b/pkg/multibuffer/multibuffer.go @@ -3,6 +3,8 @@ package multibuffer // MultiBuffer implements software multi buffering, that allows to reuse // existing buffers without creating new ones, improving performance. +// +// Deprecated: not used anymore, will be removed in next version. type MultiBuffer struct { count uint64 buffers [][]byte diff --git a/scripts/test.mk b/scripts/test.mk index 74ef27b9..f5d8f6e8 100644 --- a/scripts/test.mk +++ b/scripts/test.mk @@ -6,13 +6,16 @@ endif test-examples: go build -o /dev/null ./examples/... +test-internal: + go test -v $(RACE) -coverprofile=coverage-internal.txt ./internal/... + test-pkg: go test -v $(RACE) -coverprofile=coverage-pkg.txt ./pkg/... test-root: go test -v $(RACE) -coverprofile=coverage-root.txt . -test-nodocker: test-examples test-pkg test-root +test-nodocker: test-examples test-internal test-pkg test-root define DOCKERFILE_TEST ARG ARCH diff --git a/server.go b/server.go index da7c2c7b..54ac8d0a 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package gortsplib import ( "context" "crypto/tls" + "errors" "fmt" "net" "strconv" @@ -19,6 +20,8 @@ const ( serverAuthRealm = "ipcam" ) +var errHTTPUpgraded = errors.New("upgraded to HTTP conn") + func extractPort(address string) (int, error) { _, tmp, err := net.SplitHostPort(address) if err != nil { @@ -47,6 +50,13 @@ type sessionRequestReq struct { res chan sessionRequestRes } +type sessionHandleHTTPChannelReq struct { + sc *ServerConn + write bool + tunnelID string + res chan error +} + type chGetMulticastIPReq struct { res chan net.IP } @@ -129,25 +139,27 @@ type Server struct { sessionTimeout time.Duration checkStreamPeriod time.Duration - ctx context.Context - ctxCancel func() - wg sync.WaitGroup - multicastNet *net.IPNet - multicastNextIP net.IP - tcpListener *serverTCPListener - udpRTPListener *serverUDPListener - udpRTCPListener *serverUDPListener - sessions map[string]*ServerSession - conns map[*ServerConn]struct{} - closeError error + ctx context.Context + ctxCancel func() + wg sync.WaitGroup + multicastNet *net.IPNet + multicastNextIP net.IP + tcpListener *serverTCPListener + udpRTPListener *serverUDPListener + udpRTCPListener *serverUDPListener + conns map[*ServerConn]struct{} + httpReadChannels map[*ServerConn]chan error + sessions map[string]*ServerSession + closeError error // in - chNewConn chan net.Conn - chAcceptErr chan error - chCloseConn chan *ServerConn - chHandleRequest chan sessionRequestReq - chCloseSession chan *ServerSession - chGetMulticastIP chan chGetMulticastIPReq + chNewConn chan net.Conn + chAcceptErr chan error + chCloseConn chan *ServerConn + chHandleHTTPChannel chan sessionHandleHTTPChannelReq + chHandleRequest chan sessionRequestReq + chCloseSession chan *ServerSession + chGetMulticastIP chan chGetMulticastIPReq } // Start starts the server. @@ -305,18 +317,18 @@ func (s *Server) Start() error { s.ctx, s.ctxCancel = context.WithCancel(context.Background()) - s.sessions = make(map[string]*ServerSession) s.conns = make(map[*ServerConn]struct{}) + s.httpReadChannels = make(map[*ServerConn]chan error) + s.sessions = make(map[string]*ServerSession) s.chNewConn = make(chan net.Conn) s.chAcceptErr = make(chan error) s.chCloseConn = make(chan *ServerConn) + s.chHandleHTTPChannel = make(chan sessionHandleHTTPChannelReq) s.chHandleRequest = make(chan sessionRequestReq) s.chCloseSession = make(chan *ServerSession) s.chGetMulticastIP = make(chan chGetMulticastIPReq) - s.tcpListener = &serverTCPListener{ - s: s, - } + s.tcpListener = &serverTCPListener{s: s} err := s.tcpListener.initialize() if err != nil { if s.udpRTPListener != nil { @@ -355,6 +367,8 @@ func (s *Server) run() { s.ctxCancel() + s.tcpListener.close() + if s.udpRTCPListener != nil { s.udpRTCPListener.close() } @@ -362,8 +376,6 @@ func (s *Server) run() { if s.udpRTPListener != nil { s.udpRTPListener.close() } - - s.tcpListener.close() } func (s *Server) runInner() error { @@ -385,10 +397,36 @@ func (s *Server) runInner() error { continue } delete(s.conns, sc) + delete(s.httpReadChannels, sc) sc.Close() + case req := <-s.chHandleHTTPChannel: + if !req.write { + req.sc.httpReadTunnelID = req.tunnelID + s.httpReadChannels[req.sc] = req.res + } else { + readChan, readChanRes := s.findHTTPReadChannel(req.sc, req.tunnelID) + if readChan == nil { + req.res <- fmt.Errorf("did not found a corresponding HTTP GET request") + } else { + delete(s.httpReadChannels, readChan) + close(readChanRes) + req.res <- errHTTPUpgraded + + nconn := newServerHTTPTunnel(req.sc.nconn, req.sc.httpReadBuf, readChan.nconn) + sc := &ServerConn{ + s: s, + nconn: nconn, + isHTTP: true, + } + sc.initialize() + s.conns[sc] = struct{}{} + } + } + case req := <-s.chHandleRequest: - if ss, ok := s.sessions[req.id]; ok { + ss, ok := s.sessions[req.id] + if ok { if !req.sc.ip().Equal(ss.author.ip()) || req.sc.zone() != ss.author.zone() { req.res <- sessionRequestRes{ @@ -399,17 +437,6 @@ func (s *Server) runInner() error { } continue } - - select { - case ss.chHandleRequest <- req: - case <-ss.ctx.Done(): - req.res <- sessionRequestRes{ - res: &base.Response{ - StatusCode: base.StatusBadRequest, - }, - err: liberrors.ErrServerTerminated{}, - } - } } else { if !req.create { req.res <- sessionRequestRes{ @@ -427,19 +454,10 @@ func (s *Server) runInner() error { } ss.initialize() s.sessions[ss.secretID] = ss - - select { - case ss.chHandleRequest <- req: - case <-ss.ctx.Done(): - req.res <- sessionRequestRes{ - res: &base.Response{ - StatusCode: base.StatusBadRequest, - }, - err: liberrors.ErrServerTerminated{}, - } - } } + ss.handleRequestNoWait(req) + case ss := <-s.chCloseSession: if sss, ok := s.sessions[ss.secretID]; !ok || sss != ss { continue @@ -478,6 +496,16 @@ func (s *Server) StartAndWait() error { return s.Wait() } +func (s *Server) findHTTPReadChannel(writeChan *ServerConn, tunnelID string) (*ServerConn, chan error) { + for readChan, readChanRes := range s.httpReadChannels { + if readChan.remoteAddr.IP.Equal(writeChan.remoteAddr.IP) && + readChan.httpReadTunnelID == tunnelID { + return readChan, readChanRes + } + } + return nil, nil +} + func (s *Server) getMulticastIP() (net.IP, error) { res := make(chan net.IP) select { @@ -518,6 +546,34 @@ func (s *Server) closeSession(ss *ServerSession) { } } +func (s *Server) handleHTTPChannel(req sessionHandleHTTPChannelReq) error { + req.res = make(chan error) + + select { + case s.chHandleHTTPChannel <- req: + case <-req.sc.ctx.Done(): + return fmt.Errorf("terminated") + case <-s.ctx.Done(): + return fmt.Errorf("terminated") + } + + if !req.write { + t := time.NewTimer(5 * time.Second) + defer t.Stop() + + select { + case <-req.res: + case <-req.sc.ctx.Done(): + return fmt.Errorf("terminated") + case <-t.C: + return fmt.Errorf("did not found a corresponding HTTP POST request") + } + return errHTTPUpgraded + } + + return <-req.res +} + func (s *Server) handleRequest(req sessionRequestReq) (*base.Response, *ServerSession, error) { select { case s.chHandleRequest <- req: diff --git a/server_conn.go b/server_conn.go index 7ab95a80..da57efc1 100644 --- a/server_conn.go +++ b/server_conn.go @@ -1,6 +1,7 @@ package gortsplib import ( + "bufio" "context" "crypto/rand" "crypto/tls" @@ -15,7 +16,6 @@ import ( "github.com/bluenviron/gortsplib/v4/pkg/auth" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/bytecounter" - "github.com/bluenviron/gortsplib/v4/pkg/conn" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/headers" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" @@ -195,21 +195,25 @@ type readReq struct { // ServerConn is a server-side RTSP connection. type ServerConn struct { - s *Server - nconn net.Conn + s *Server + nconn net.Conn + isHTTP bool - ctx context.Context - ctxCancel func() - propsMutex sync.RWMutex - userData interface{} - remoteAddr *net.TCPAddr - bc *bytecounter.ByteCounter - conn *conn.Conn - session *ServerSession - reader *serverConnReader - authNonce string + ctx context.Context + ctxCancel func() + propsMutex sync.RWMutex + userData interface{} + remoteAddr *net.TCPAddr + bc *bytecounter.ByteCounter + session *ServerSession + reader *serverConnReader + authNonce string + httpReadBuf *bufio.Reader + httpReadTunnelID string // in + chRequest chan readReq + chReadError chan error chRemoveSession chan *ServerSession // out @@ -219,7 +223,7 @@ type ServerConn struct { func (sc *ServerConn) initialize() { ctx, ctxCancel := context.WithCancel(sc.s.ctx) - if sc.s.TLSConfig != nil { + if sc.s.TLSConfig != nil && !sc.isHTTP { sc.nconn = tls.Server(sc.nconn, sc.s.TLSConfig) } @@ -227,6 +231,8 @@ func (sc *ServerConn) initialize() { sc.ctx = ctx sc.ctxCancel = ctxCancel sc.remoteAddr = sc.nconn.RemoteAddr().(*net.TCPAddr) + sc.chRequest = make(chan readReq) + sc.chReadError = make(chan error) sc.chRemoveSession = make(chan *ServerSession) sc.done = make(chan struct{}) @@ -278,7 +284,14 @@ func (sc *ServerConn) Session() *ServerSession { // Transport returns transport details. func (sc *ServerConn) Transport() *ConnTransport { - return &ConnTransport{} + return &ConnTransport{ + Tunnel: func() Tunnel { + if sc.isHTTP { + return TunnelHTTP + } + return TunnelNone + }(), + } } // Stats returns connection statistics. @@ -349,7 +362,6 @@ func (sc *ServerConn) run() { }) } - sc.conn = conn.NewConn(sc.bc) sc.reader = &serverConnReader{ sc: sc, } @@ -359,7 +371,9 @@ func (sc *ServerConn) run() { sc.ctxCancel() - sc.nconn.Close() + if !errors.Is(err, errHTTPUpgraded) { + sc.nconn.Close() + } if sc.reader != nil { sc.reader.wait() @@ -382,10 +396,10 @@ func (sc *ServerConn) run() { func (sc *ServerConn) runInner() error { for { select { - case req := <-sc.reader.chRequest: + case req := <-sc.chRequest: req.res <- sc.handleRequestOuter(req.req) - case err := <-sc.reader.chError: + case err := <-sc.chReadError: sc.reader = nil return err @@ -615,7 +629,7 @@ func (sc *ServerConn) handleRequestOuter(req *base.Request) error { } sc.nconn.SetWriteDeadline(time.Now().Add(sc.s.WriteTimeout)) - err2 := sc.conn.WriteResponse(res) + err2 := sc.reader.conn.WriteResponse(res) if err == nil && err2 != nil { err = err2 } diff --git a/server_conn_reader.go b/server_conn_reader.go index ddd4ed24..595d5d86 100644 --- a/server_conn_reader.go +++ b/server_conn_reader.go @@ -1,14 +1,27 @@ package gortsplib import ( + "bufio" + "bytes" "errors" "fmt" + "io" + "net/http" "time" "github.com/bluenviron/gortsplib/v4/pkg/base" + "github.com/bluenviron/gortsplib/v4/pkg/conn" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" + "github.com/bluenviron/mediacommon/v2/pkg/rewindablereader" ) +func makeReadWriter(r io.Reader, w io.Writer) io.ReadWriter { + return struct { + io.Reader + io.Writer + }{r, w} +} + type switchReadFuncError struct { tcp bool } @@ -23,32 +36,43 @@ func isSwitchReadFuncError(err error) bool { } type serverConnReader struct { - sc *ServerConn - - chRequest chan readReq - chError chan error + sc *ServerConn + conn *conn.Conn } func (cr *serverConnReader) initialize() { - cr.chRequest = make(chan readReq) - cr.chError = make(chan error) - go cr.run() } func (cr *serverConnReader) wait() { for { select { - case <-cr.chError: + case <-cr.sc.chReadError: return - case req := <-cr.chRequest: + case req := <-cr.sc.chRequest: req.res <- fmt.Errorf("terminated") } } } func (cr *serverConnReader) run() { + cr.sc.chReadError <- cr.runInner() +} + +func (cr *serverConnReader) runInner() error { + var rw io.ReadWriter = cr.sc.bc + + if !cr.sc.isHTTP { + var err error + rw, err = cr.upgradeToHTTP(rw) + if err != nil { + return err + } + } + + cr.conn = conn.NewConn(bufio.NewReader(rw), rw) + readFunc := cr.readFuncStandard for { @@ -64,17 +88,90 @@ func (cr *serverConnReader) run() { continue } - cr.chError <- err - break + return err } } +func (cr *serverConnReader) upgradeToHTTP(in io.ReadWriter) (io.ReadWriter, error) { + rr := &rewindablereader.Reader{R: in} + + buf := make([]byte, 4) + _, err := io.ReadFull(rr, buf) + if err != nil { + return nil, err + } + + rr.Rewind() + + if bytes.Equal(buf, []byte{'G', 'E', 'T', ' '}) || + bytes.Equal(buf, []byte{'P', 'O', 'S', 'T'}) { + buf := bufio.NewReader(rr) + var req *http.Request + req, err = http.ReadRequest(buf) + if err != nil { + return nil, err + } + + if (req.Method != http.MethodGet && req.Method != http.MethodPost) || + (req.Method == http.MethodGet && req.Header.Get("Accept") != "application/x-rtsp-tunnelled") || + (req.Method == http.MethodPost && req.Header.Get("Content-Type") != "application/x-rtsp-tunnelled") || + req.Header.Get("X-Sessioncookie") == "" { + res := http.Response{ + StatusCode: http.StatusBadRequest, + ProtoMajor: req.ProtoMajor, + ProtoMinor: req.ProtoMinor, + Request: req, + } + var buf2 bytes.Buffer + res.Write(&buf2) //nolint:errcheck + cr.sc.nconn.SetWriteDeadline(time.Now().Add(cr.sc.s.WriteTimeout)) + _, err = in.Write(buf2.Bytes()) + if err != nil { + return nil, err + } + + return nil, fmt.Errorf("invalid HTTP request") + } + + h := http.Header{} + h.Set("Cache-Control", "no-cache") + h.Set("Connection", "close") + h.Set("Content-Type", "application/x-rtsp-tunnelled") + h.Set("Pragma", "no-cache") + res := http.Response{ + StatusCode: http.StatusOK, + ProtoMajor: 1, + ProtoMinor: req.ProtoMinor, + Header: h, + ContentLength: -1, + } + var buf2 bytes.Buffer + res.Write(&buf2) //nolint:errcheck + cr.sc.nconn.SetWriteDeadline(time.Now().Add(cr.sc.s.WriteTimeout)) + _, err = in.Write(buf2.Bytes()) + if err != nil { + return nil, err + } + + cr.sc.httpReadBuf = buf + + err = cr.sc.s.handleHTTPChannel(sessionHandleHTTPChannelReq{ + sc: cr.sc, + write: (req.Method == http.MethodPost), + tunnelID: req.Header.Get("X-Sessioncookie"), + }) + return nil, err + } + + return makeReadWriter(rr, in), nil +} + func (cr *serverConnReader) readFuncStandard() error { // reset deadline cr.sc.nconn.SetReadDeadline(time.Time{}) for { - what, err := cr.sc.conn.Read() + what, err := cr.conn.Read() if err != nil { return err } @@ -83,7 +180,7 @@ func (cr *serverConnReader) readFuncStandard() error { case *base.Request: cres := make(chan error) req := readReq{req: what, res: cres} - cr.chRequest <- req + cr.sc.chRequest <- req err = <-cres if err != nil { @@ -110,7 +207,7 @@ func (cr *serverConnReader) readFuncTCP() error { cr.sc.nconn.SetReadDeadline(time.Now().Add(cr.sc.s.ReadTimeout)) } - what, err := cr.sc.conn.Read() + what, err := cr.conn.Read() if err != nil { return err } @@ -119,7 +216,7 @@ func (cr *serverConnReader) readFuncTCP() error { case *base.Request: cres := make(chan error) req := readReq{req: what, res: cres} - cr.chRequest <- req + cr.sc.chRequest <- req err = <-cres if err != nil { diff --git a/server_http_tunnel.go b/server_http_tunnel.go new file mode 100644 index 00000000..32b77381 --- /dev/null +++ b/server_http_tunnel.go @@ -0,0 +1,58 @@ +package gortsplib + +import ( + "bufio" + "io" + "net" + "time" + + "github.com/bluenviron/gortsplib/v4/internal/base64streamreader" +) + +type serverHTTPTunnel struct { + r net.Conn + rb io.Reader + w net.Conn +} + +func (m *serverHTTPTunnel) Read(p []byte) (int, error) { + return m.rb.Read(p) +} + +func (m *serverHTTPTunnel) Write(p []byte) (int, error) { + return m.w.Write(p) +} + +func (m *serverHTTPTunnel) Close() error { + m.r.Close() + m.w.Close() + return nil +} + +func (m *serverHTTPTunnel) LocalAddr() net.Addr { + return m.r.LocalAddr() +} + +func (m *serverHTTPTunnel) RemoteAddr() net.Addr { + return m.r.RemoteAddr() +} + +func (m *serverHTTPTunnel) SetDeadline(_ time.Time) error { + panic("unimplemented") +} + +func (m *serverHTTPTunnel) SetReadDeadline(t time.Time) error { + return m.r.SetReadDeadline(t) +} + +func (m *serverHTTPTunnel) SetWriteDeadline(t time.Time) error { + return m.w.SetWriteDeadline(t) +} + +func newServerHTTPTunnel(r net.Conn, rb *bufio.Reader, w net.Conn) net.Conn { + return &serverHTTPTunnel{ + r: r, + rb: base64streamreader.New(rb), + w: w, + } +} diff --git a/server_multicast_writer.go b/server_multicast_writer.go index 52645aa3..3dcd8834 100644 --- a/server_multicast_writer.go +++ b/server_multicast_writer.go @@ -1,8 +1,10 @@ package gortsplib import ( + "context" "net" + "github.com/bluenviron/gortsplib/v4/internal/asyncprocessor" "github.com/bluenviron/gortsplib/v4/pkg/liberrors" ) @@ -11,7 +13,7 @@ type serverMulticastWriter struct { rtpl *serverUDPListener rtcpl *serverUDPListener - writer *asyncProcessor + writer *asyncprocessor.Processor rtpAddr *net.UDPAddr rtcpAddr *net.UDPAddr } @@ -49,11 +51,12 @@ func (h *serverMulticastWriter) initialize() error { h.rtpAddr = rtpAddr h.rtcpAddr = rtcpAddr - h.writer = &asyncProcessor{ - bufferSize: h.s.WriteQueueSize, + h.writer = &asyncprocessor.Processor{ + BufferSize: h.s.WriteQueueSize, + OnError: func(_ context.Context, _ error) {}, } - h.writer.initialize() - h.writer.start() + h.writer.Initialize() + h.writer.Start() return nil } @@ -61,7 +64,7 @@ func (h *serverMulticastWriter) initialize() error { func (h *serverMulticastWriter) close() { h.rtpl.close() h.rtcpl.close() - h.writer.close() + h.writer.Close() } func (h *serverMulticastWriter) ip() net.IP { @@ -69,7 +72,7 @@ func (h *serverMulticastWriter) ip() net.IP { } func (h *serverMulticastWriter) writePacketRTP(byts []byte) error { - ok := h.writer.push(func() error { + ok := h.writer.Push(func() error { return h.rtpl.write(byts, h.rtpAddr) }) if !ok { @@ -80,7 +83,7 @@ func (h *serverMulticastWriter) writePacketRTP(byts []byte) error { } func (h *serverMulticastWriter) writePacketRTCP(byts []byte) error { - ok := h.writer.push(func() error { + ok := h.writer.Push(func() error { return h.rtcpl.write(byts, h.rtcpAddr) }) if !ok { diff --git a/server_play_test.go b/server_play_test.go index 5ccaba03..25f1d32c 100644 --- a/server_play_test.go +++ b/server_play_test.go @@ -1,6 +1,7 @@ package gortsplib import ( + "bufio" "bytes" "crypto/rand" "crypto/tls" @@ -277,7 +278,7 @@ func TestServerPlayPath(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -369,7 +370,7 @@ func TestServerPlaySetupErrors(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) var desc *description.Session var th *headers.Transport @@ -539,7 +540,7 @@ func TestServerPlaySetupErrorSameUDPPortsAndIP(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) inTH := &headers.Transport{ Delivery: ptrOf(headers.TransportDeliveryUnicast), @@ -777,7 +778,7 @@ func TestServerPlay(t *testing.T) { } return nconn }() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) <-nconnOpened @@ -1166,7 +1167,7 @@ func TestServerPlaySocketError(t *testing.T) { require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1330,7 +1331,7 @@ func TestServerPlayDecodeErrors(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1453,7 +1454,7 @@ func TestServerPlayRTCPReport(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1578,7 +1579,7 @@ func TestServerPlayVLCMulticast(t *testing.T) { nconn, err := net.Dial("tcp", listenIP+":8554") require.NoError(t, err) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) defer nconn.Close() res, err := writeReqReadRes(conn, base.Request{ @@ -1665,7 +1666,7 @@ func TestServerPlayTCPResponseBeforeFrames(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1763,7 +1764,7 @@ func TestServerPlayPause(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1861,7 +1862,7 @@ func TestServerPlayPlayPausePausePlay(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1949,7 +1950,7 @@ func TestServerPlayTimeout(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2040,7 +2041,7 @@ func TestServerPlayWithoutTeardown(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2121,7 +2122,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2146,7 +2147,7 @@ func TestServerPlayUDPChangeConn(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) var res *base.Response res, err = writeReqReadRes(conn, base.Request{ @@ -2209,7 +2210,7 @@ func TestServerPlayPartialMedias(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2242,7 +2243,7 @@ func TestServerPlayAdditionalInfos(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2465,7 +2466,7 @@ func TestServerPlayNoInterleavedIDs(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2548,7 +2549,7 @@ func TestServerPlayStreamStats(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -2664,7 +2665,7 @@ func TestServerPlayBackChannel(t *testing.T) { require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, true) diff --git a/server_record_test.go b/server_record_test.go index ae8c64d5..63eda07a 100644 --- a/server_record_test.go +++ b/server_record_test.go @@ -1,6 +1,7 @@ package gortsplib import ( + "bufio" "bytes" "crypto/rand" "crypto/tls" @@ -142,7 +143,7 @@ func TestServerRecordErrorAnnounce(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) _, err = writeReqReadRes(conn, ca.req) require.NoError(t, err) @@ -201,7 +202,7 @@ func TestServerRecordErrorSetup(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -335,7 +336,7 @@ func TestServerRecordPath(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) media := testH264Media media.Control = ca.control @@ -416,7 +417,7 @@ func TestServerRecordErrorSetupMediaTwice(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -490,7 +491,7 @@ func TestServerRecordErrorRecordPartialMedias(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) forma := &format.Generic{ PayloadTyp: 96, @@ -692,7 +693,7 @@ func TestServerRecord(t *testing.T) { } return nconn }() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) <-nconnOpened @@ -979,7 +980,7 @@ func TestServerRecordErrorInvalidProtocol(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1039,7 +1040,7 @@ func TestServerRecordRTCPReport(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1173,7 +1174,7 @@ func TestServerRecordTimeout(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1255,7 +1256,7 @@ func TestServerRecordWithoutTeardown(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1328,7 +1329,7 @@ func TestServerRecordUDPChangeConn(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1355,7 +1356,7 @@ func TestServerRecordUDPChangeConn(t *testing.T) { nconn, err = net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) var res *base.Response res, err = writeReqReadRes(conn, base.Request{ @@ -1454,7 +1455,7 @@ func TestServerRecordDecodeErrors(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{{ Type: description.MediaTypeApplication, @@ -1642,7 +1643,7 @@ func TestServerRecordPacketNTP(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1763,7 +1764,7 @@ func TestServerRecordPausePause(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{{ Type: description.MediaTypeApplication, diff --git a/server_session.go b/server_session.go index 3d8f6a2d..75fd35ee 100644 --- a/server_session.go +++ b/server_session.go @@ -17,6 +17,7 @@ import ( "github.com/pion/rtcp" "github.com/pion/rtp" + "github.com/bluenviron/gortsplib/v4/internal/asyncprocessor" "github.com/bluenviron/gortsplib/v4/pkg/base" "github.com/bluenviron/gortsplib/v4/pkg/description" "github.com/bluenviron/gortsplib/v4/pkg/format" @@ -445,7 +446,7 @@ type ServerSession struct { udpLastPacketTime *int64 // record udpCheckStreamTimer *time.Timer writerMutex sync.RWMutex - writer *asyncProcessor + writer *asyncprocessor.Processor timeDecoder *rtptime.GlobalDecoder2 tcpFrame *base.InterleavedFrame tcpBuffer []byte @@ -454,6 +455,7 @@ type ServerSession struct { chHandleRequest chan sessionRequestReq chRemoveConn chan *ServerConn chAsyncStartWriter chan struct{} + chWriterError chan error } func (ss *ServerSession) initialize() { @@ -472,6 +474,7 @@ func (ss *ServerSession) initialize() { ss.chHandleRequest = make(chan sessionRequestReq) ss.chRemoveConn = make(chan *ServerConn) ss.chAsyncStartWriter = make(chan struct{}) + ss.chWriterError = make(chan error) ss.s.wg.Add(1) go ss.run() @@ -841,8 +844,8 @@ func (ss *ServerSession) checkState(allowed map[ServerSessionState]struct{}) err func (ss *ServerSession) createWriter() { ss.writerMutex.Lock() - ss.writer = &asyncProcessor{ - bufferSize: func() int { + ss.writer = &asyncprocessor.Processor{ + BufferSize: func() int { if ss.state == ServerSessionStatePrePlay { return ss.s.WriteQueueSize } @@ -852,19 +855,25 @@ func (ss *ServerSession) createWriter() { // decrease RAM consumption by allocating less buffers. return 8 }(), + OnError: func(ctx context.Context, err error) { + select { + case <-ctx.Done(): + case <-ss.ctx.Done(): + case ss.chWriterError <- err: + } + }, } - - ss.writer.initialize() + ss.writer.Initialize() ss.writerMutex.Unlock() } func (ss *ServerSession) startWriter() { - ss.writer.start() + ss.writer.Start() } func (ss *ServerSession) destroyWriter() { - ss.writer.close() + ss.writer.Close() ss.writerMutex.Lock() ss.writer = nil @@ -926,13 +935,6 @@ func (ss *ServerSession) run() { func (ss *ServerSession) runInner() error { for { - chWriterError := func() chan struct{} { - if ss.writer != nil { - return ss.writer.chStopped - } - return nil - }() - select { case req := <-ss.chHandleRequest: ss.lastRequestTime = ss.s.timeNow() @@ -1029,8 +1031,8 @@ func (ss *ServerSession) runInner() error { ss.udpCheckStreamTimer = time.NewTimer(ss.s.checkStreamPeriod) - case <-chWriterError: - return ss.writer.stopError + case err := <-ss.chWriterError: + return err case <-ss.ctx.Done(): return liberrors.ErrServerTerminated{} @@ -1954,6 +1956,19 @@ func (ss *ServerSession) handleRequest(req sessionRequestReq) (*base.Response, * } } +func (ss *ServerSession) handleRequestNoWait(req sessionRequestReq) { + select { + case ss.chHandleRequest <- req: + case <-ss.ctx.Done(): + req.res <- sessionRequestRes{ + res: &base.Response{ + StatusCode: base.StatusBadRequest, + }, + err: liberrors.ErrServerTerminated{}, + } + } +} + func (ss *ServerSession) removeConn(sc *ServerConn) { select { case ss.chRemoveConn <- sc: diff --git a/server_session_format.go b/server_session_format.go index 93a067ac..dc2677c2 100644 --- a/server_session_format.go +++ b/server_session_format.go @@ -154,7 +154,7 @@ func (sf *serverSessionFormat) writePacketRTPEncoded(payload []byte) error { return nil } - ok := sf.sm.ss.writer.push(func() error { + ok := sf.sm.ss.writer.Push(func() error { return sf.writePacketRTPInQueue(payload) }) if !ok { @@ -179,7 +179,7 @@ func (sf *serverSessionFormat) writePacketRTPInQueueTCP(payload []byte) error { sf.sm.ss.tcpFrame.Channel = sf.sm.tcpChannel sf.sm.ss.tcpFrame.Payload = payload sf.sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sf.sm.ss.s.WriteTimeout)) - err := sf.sm.ss.tcpConn.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer) + err := sf.sm.ss.tcpConn.reader.conn.WriteInterleavedFrame(sf.sm.ss.tcpFrame, sf.sm.ss.tcpBuffer) if err != nil { return err } diff --git a/server_session_media.go b/server_session_media.go index 6708bb06..4d9939a5 100644 --- a/server_session_media.go +++ b/server_session_media.go @@ -458,7 +458,7 @@ func (sm *serverSessionMedia) writePacketRTCPEncoded(payload []byte) error { return nil } - ok := sm.ss.writer.push(func() error { + ok := sm.ss.writer.Push(func() error { return sm.writePacketRTCPInQueue(payload) }) if !ok { @@ -483,7 +483,7 @@ func (sm *serverSessionMedia) writePacketRTCPInQueueTCP(payload []byte) error { sm.ss.tcpFrame.Channel = sm.tcpChannel + 1 sm.ss.tcpFrame.Payload = payload sm.ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(sm.ss.s.WriteTimeout)) - err := sm.ss.tcpConn.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) + err := sm.ss.tcpConn.reader.conn.WriteInterleavedFrame(sm.ss.tcpFrame, sm.ss.tcpBuffer) if err != nil { return err } diff --git a/server_test.go b/server_test.go index d435f0a2..a241d1f1 100644 --- a/server_test.go +++ b/server_test.go @@ -1,9 +1,13 @@ package gortsplib import ( + "bufio" + "crypto/tls" + "encoding/base64" "fmt" "net" "net/http" + "sync/atomic" "testing" "time" @@ -280,7 +284,7 @@ func TestServerConnClose(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) <-nconnClosed @@ -305,7 +309,7 @@ func TestServerCSeq(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) res, err := writeReqReadRes(conn, base.Request{ Method: base.Options, @@ -339,7 +343,7 @@ func TestServerErrorCSeqMissing(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) res, err := writeReqReadRes(conn, base.Request{ Method: base.Options, @@ -370,7 +374,7 @@ func TestServerErrorNilURL(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) res, err := writeReqReadRes(conn, base.Request{ Method: base.Describe, @@ -406,7 +410,7 @@ func TestServerDescribeNonNilBody(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) res, err := writeReqReadRes(conn, base.Request{ Method: base.Describe, @@ -467,7 +471,7 @@ func TestServerErrorMethodNotImplemented(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -563,7 +567,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { nconn1, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn1.Close() - conn1 := conn.NewConn(nconn1) + conn1 := conn.NewConn(bufio.NewReader(nconn1), nconn1) desc1 := doDescribe(t, conn1, false) @@ -583,7 +587,7 @@ func TestServerErrorTCPTwoConnOneSession(t *testing.T) { nconn2, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn2.Close() - conn2 := conn.NewConn(nconn2) + conn2 := conn.NewConn(bufio.NewReader(nconn2), nconn2) desc2 := doDescribe(t, conn2, false) @@ -649,7 +653,7 @@ func TestServerErrorTCPOneConnTwoSessions(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -717,7 +721,7 @@ func TestServerSetupMultipleTransports(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -819,7 +823,7 @@ func TestServerGetSetParameter(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -910,7 +914,7 @@ func TestServerErrorInvalidSession(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) res, err := writeReqReadRes(conn, base.Request{ Method: method, @@ -967,7 +971,7 @@ func TestServerAuth(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1030,7 +1034,7 @@ func TestServerAuthFail(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) medias := []*description.Media{testH264Media} @@ -1108,7 +1112,7 @@ func TestServerSessionClose(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1187,7 +1191,7 @@ func TestServerSessionAutoClose(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1255,7 +1259,7 @@ func TestServerSessionTeardown(t *testing.T) { nconn, err := net.Dial("tcp", "localhost:8554") require.NoError(t, err) defer nconn.Close() - conn := conn.NewConn(nconn) + conn := conn.NewConn(bufio.NewReader(nconn), nconn) desc := doDescribe(t, conn, false) @@ -1290,3 +1294,135 @@ func TestServerStreamErrorNoServer(t *testing.T) { err := stream.Initialize() require.Error(t, err) } + +func TestServerHTTPTunnel(t *testing.T) { + for _, ca := range []string{"http", "https"} { + t.Run(ca, func(t *testing.T) { + done := make(chan struct{}) + n := new(uint64) + + s := &Server{ + Handler: &testServerHandler{ + onConnClose: func(ctx *ServerHandlerOnConnCloseCtx) { + switch atomic.AddUint64(n, 1) { + case 1: + require.EqualError(t, ctx.Error, "upgraded to HTTP conn") + + case 2: + require.EqualError(t, ctx.Error, "upgraded to HTTP conn") + close(done) + } + }, + onDescribe: func(_ *ServerHandlerOnDescribeCtx) (*base.Response, *ServerStream, error) { + return &base.Response{ + StatusCode: base.StatusNotFound, + }, nil, nil + }, + }, + RTSPAddress: "localhost:8554", + } + + if ca == "https" { + cert, err := tls.X509KeyPair(serverCert, serverKey) + require.NoError(t, err) + s.TLSConfig = &tls.Config{Certificates: []tls.Certificate{cert}} + } + + err := s.Start() + require.NoError(t, err) + defer s.Close() + + nconn1, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn1.Close() + + if ca == "https" { + nconn1 = tls.Client(nconn1, &tls.Config{InsecureSkipVerify: true}) + } + + _, err = nconn1.Write([]byte( + "GET / HTTP/1.1\r\n" + + "Host: localhost:8554\r\n" + + "X-Sessioncookie: testtunid\r\n" + + "Accept: application/x-rtsp-tunnelled\r\n" + + "Content-Length: 30000\r\n" + + "\r\n", + )) + require.NoError(t, err) + + buf1 := bufio.NewReader(nconn1) + res, err := http.ReadResponse(buf1, nil) + require.NoError(t, err) + res.Body.Close() + + require.Equal(t, &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: -1, + Close: true, + Header: http.Header{ + "Cache-Control": []string{"no-cache"}, + "Content-Type": []string{"application/x-rtsp-tunnelled"}, + "Pragma": []string{"no-cache"}, + }, + Body: res.Body, + }, res) + + nconn2, err := net.Dial("tcp", "localhost:8554") + require.NoError(t, err) + defer nconn2.Close() + + if ca == "https" { + nconn2 = tls.Client(nconn2, &tls.Config{InsecureSkipVerify: true}) + } + + _, err = nconn2.Write([]byte( + "POST / HTTP/1.1\r\n" + + "Host: localhost:8554\r\n" + + "X-Sessioncookie: testtunid\r\n" + + "Content-Type: application/x-rtsp-tunnelled\r\n" + + "Content-Length: 30000\r\n" + + "\r\n", + )) + require.NoError(t, err) + + buf2 := bufio.NewReader(nconn2) + res, err = http.ReadResponse(buf2, nil) + require.NoError(t, err) + res.Body.Close() + + require.Equal(t, &http.Response{ + Status: "200 OK", + StatusCode: http.StatusOK, + Proto: "HTTP/1.1", + ProtoMajor: 1, + ProtoMinor: 1, + ContentLength: -1, + Close: true, + Header: http.Header{ + "Cache-Control": []string{"no-cache"}, + "Content-Type": []string{"application/x-rtsp-tunnelled"}, + "Pragma": []string{"no-cache"}, + }, + Body: res.Body, + }, res) + + conn := conn.NewConn(bufio.NewReader(buf1), base64.NewEncoder(base64.StdEncoding, nconn2)) + + rres, err := writeReqReadRes(conn, base.Request{ + Method: base.Describe, + URL: mustParseURL("rtsp://localhost:8554/teststream?param=value"), + Header: base.Header{ + "CSeq": base.HeaderValue{"1"}, + }, + }) + require.NoError(t, err) + require.Equal(t, base.StatusNotFound, rres.StatusCode) + + <-done + }) + } +} diff --git a/tunnel.go b/tunnel.go new file mode 100644 index 00000000..f3f58648 --- /dev/null +++ b/tunnel.go @@ -0,0 +1,10 @@ +package gortsplib + +// Tunnel is a tunnel method. +type Tunnel int + +// tunnel methods. +const ( + TunnelNone Tunnel = iota + TunnelHTTP +)