From 65f6afcd9f5303526e52fd5a1711aed870f1de45 Mon Sep 17 00:00:00 2001 From: aler9 <46489434+aler9@users.noreply.github.com> Date: Sun, 15 Nov 2020 17:21:39 +0100 Subject: [PATCH] new client api --- README.md | 16 +- connclient.go | 415 +++++++-------------------- connclientpublish.go | 154 ++++++++++ connclientread.go | 201 +++++++++++++ connclientudpl.go | 47 +++- connserver.go | 4 +- dialer.go | 52 ++-- dialer_test.go | 496 ++++++++++++++++++++------------- examples/client-publish-tcp.go | 8 +- examples/client-publish-udp.go | 44 +-- examples/client-query.go | 42 +++ examples/client-read-tcp.go | 11 +- examples/client-read-udp.go | 48 +--- 13 files changed, 927 insertions(+), 611 deletions(-) create mode 100644 connclientpublish.go create mode 100644 connclientread.go create mode 100644 examples/client-query.go diff --git a/README.md b/README.md index 7638fc2a..708631a7 100644 --- a/README.md +++ b/README.md @@ -9,17 +9,21 @@ RTSP 1.0 client and server library for the Go programming language, written for Features: -* Read streams from servers with TCP or UDP -* Publish streams to servers with TCP or UDP -* Build servers -* Provides most methods and primitives of the protocol +* Client + * Query servers about published streams + * Read streams from servers with UDP or TCP + * Publish streams to servers with UDP or TCP + * Pause reading or publishing without disconnecting from the server +* Server + * Handle server-side connections ## Examples -* [client-read-tcp](examples/client-read-tcp.go) +* [client-query](examples/client-query.go) * [client-read-udp](examples/client-read-udp.go) -* [client-publish-tcp](examples/client-publish-tcp.go) +* [client-read-tcp](examples/client-read-tcp.go) * [client-publish-udp](examples/client-publish-udp.go) +* [client-publish-tcp](examples/client-publish-tcp.go) ## Documentation diff --git a/connclient.go b/connclient.go index bcb84903..769df882 100644 --- a/connclient.go +++ b/connclient.go @@ -34,7 +34,7 @@ const ( clientUDPFrameReadBufferSize = 2048 ) -type connClientState int +type connClientState int32 const ( connClientStateInitial connClientState = iota @@ -42,8 +42,34 @@ const ( connClientStatePlay connClientStatePreRecord connClientStateRecord + connClientStateUDPError ) +func (s connClientState) String() string { + switch s { + case connClientStateInitial: + return "initial" + case connClientStatePrePlay: + return "prePlay" + case connClientStatePlay: + return "play" + case connClientStatePreRecord: + return "preRecord" + case connClientStateRecord: + return "record" + case connClientStateUDPError: + return "udpError" + } + return "uknown" +} +func (s *connClientState) load() connClientState { + return connClientState(atomic.LoadInt32((*int32)(s))) +} + +func (s *connClientState) store(v connClientState) { + atomic.StoreInt32((*int32)(s), int32(v)) +} + // ConnClient is a client-side RTSP connection. type ConnClient struct { d Dialer @@ -53,7 +79,7 @@ type ConnClient struct { session string cseq int auth *auth.Client - state connClientState + state *connClientState streamUrl *base.URL streamProtocol *StreamProtocol tracks Tracks @@ -64,18 +90,23 @@ type ConnClient struct { response *base.Response frame *base.InterleavedFrame tcpFrameBuffer *multibuffer.MultiBuffer + readFrameFunc func() (int, StreamType, []byte, error) writeFrameFunc func(trackId int, streamType StreamType, content []byte) error getParameterSupported bool + backgroundUDPError error - reportWriterTerminate chan struct{} - reportWriterDone chan struct{} + backgroundTerminate chan struct{} + backgroundDone chan struct{} + udpFrame chan base.InterleavedFrame } // Close closes all the ConnClient resources. func (c *ConnClient) Close() error { - if c.state == connClientStatePlay { - close(c.reportWriterTerminate) - <-c.reportWriterDone + s := c.state.load() + + if s == connClientStatePlay { + close(c.backgroundTerminate) + <-c.backgroundDone c.Do(&base.Request{ Method: base.TEARDOWN, @@ -84,7 +115,14 @@ func (c *ConnClient) Close() error { }) } - err := c.nconn.Close() + if s == connClientStatePlay { + if *c.streamProtocol == StreamProtocolUDP { + go func() { + for range c.udpFrame { + } + }() + } + } for _, l := range c.udpRtpListeners { l.close() @@ -94,16 +132,28 @@ func (c *ConnClient) Close() error { l.close() } + if s == connClientStatePlay { + if *c.streamProtocol == StreamProtocolUDP { + close(c.udpFrame) + } + } + + err := c.nconn.Close() return err } -func (c *ConnClient) checkState(allowed map[connClientState]struct{}) error { - if _, ok := allowed[c.state]; ok { - return nil +func (c *ConnClient) checkState(allowed map[connClientState]struct{}) (connClientState, error) { + s := c.state.load() + if _, ok := allowed[s]; ok { + return s, nil } - return fmt.Errorf("client must be in state %v, while is in state %v", - allowed, c.state) + var allowedList []connClientState + for s := range allowed { + allowedList = append(allowedList, s) + } + return 0, fmt.Errorf("client must be in state %v, while is in state %v", + allowedList, s) } // NetConn returns the underlying net.Conn. @@ -123,68 +173,8 @@ func (c *ConnClient) readFrameTCPOrResponse() (interface{}, error) { return base.ReadInterleavedFrameOrResponse(c.frame, c.response, c.br) } -// ReadFrameUDP reads an UDP frame. -func (c *ConnClient) ReadFrameUDP(trackId int, streamType StreamType) ([]byte, error) { - var buf []byte - var err error - if streamType == StreamTypeRtp { - buf, err = c.udpRtpListeners[trackId].read() - } else { - buf, err = c.udpRtcpListeners[trackId].read() - } - if err != nil { - return nil, err - } - - atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix()) - - c.rtcpReceivers[trackId].OnFrame(streamType, buf) - - return buf, nil -} - -// ReadFrameTCP reads an InterleavedFrame. -// This can't be used when publishing. -func (c *ConnClient) ReadFrameTCP() (int, StreamType, []byte, error) { - c.frame.Content = c.tcpFrameBuffer.Next() - - c.nconn.SetReadDeadline(time.Now().Add(c.d.ReadTimeout)) - err := c.frame.Read(c.br) - if err != nil { - return 0, 0, nil, err - } - - c.rtcpReceivers[c.frame.TrackId].OnFrame(c.frame.StreamType, c.frame.Content) - - return c.frame.TrackId, c.frame.StreamType, c.frame.Content, nil -} - -func (c *ConnClient) writeFrameUDP(trackId int, streamType StreamType, content []byte) error { - if streamType == StreamTypeRtp { - return c.udpRtpListeners[trackId].write(content) - } - return c.udpRtcpListeners[trackId].write(content) -} - -func (c *ConnClient) writeFrameTCP(trackId int, streamType StreamType, content []byte) error { - frame := base.InterleavedFrame{ - TrackId: trackId, - StreamType: streamType, - Content: content, - } - - c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) - return frame.Write(c.bw) -} - -// WriteFrame writes a frame. -// This can be used only after Record(). -func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error { - return c.writeFrameFunc(trackId, streamType, content) -} - // Do writes a Request and reads a Response. -// Interleaved frames sent before the response are ignored. +// Interleaved frames received before the response are ignored. func (c *ConnClient) Do(req *base.Request) (*base.Response, error) { if req.Header == nil { req.Header = make(base.Header) @@ -262,7 +252,7 @@ func (c *ConnClient) Do(req *base.Request) (*base.Response, error) { // Since this method is not implemented by every RTSP server, the function // does not fail if the returned code is StatusNotFound. func (c *ConnClient) Options(u *base.URL) (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ + _, err := c.checkState(map[connClientState]struct{}{ connClientStateInitial: {}, connClientStatePrePlay: {}, connClientStatePreRecord: {}, @@ -302,7 +292,7 @@ func (c *ConnClient) Options(u *base.URL) (*base.Response, error) { // Describe writes a DESCRIBE request and reads a Response. func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ + _, err := c.checkState(map[connClientState]struct{}{ connClientStateInitial: {}, connClientStatePrePlay: {}, connClientStatePreRecord: {}, @@ -400,7 +390,7 @@ func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode, // if rtpPort and rtcpPort are zero, they are chosen automatically. func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.StreamProtocol, track *Track, rtpPort int, rtcpPort int) (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ + s, err := c.checkState(map[connClientState]struct{}{ connClientStateInitial: {}, connClientStatePrePlay: {}, connClientStatePreRecord: {}, @@ -409,12 +399,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S return nil, err } - if mode == headers.TransportModeRecord && c.state != connClientStatePreRecord { + if mode == headers.TransportModeRecord && s != connClientStatePreRecord { return nil, fmt.Errorf("cannot read and publish at the same time") } - if mode == headers.TransportModePlay && c.state != connClientStatePrePlay && - c.state != connClientStateInitial { + if mode == headers.TransportModePlay && s != connClientStatePrePlay && + s != connClientStateInitial { return nil, fmt.Errorf("cannot read and publish at the same time") } @@ -451,12 +441,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S var err error rtpListener, rtcpListener, err = func() (*connClientUDPListener, *connClientUDPListener, error) { if rtpPort != 0 { - rtpListener, err := newConnClientUDPListener(c.d, rtpPort) + rtpListener, err := newConnClientUDPListener(c, rtpPort) if err != nil { return nil, nil, err } - rtcpListener, err := newConnClientUDPListener(c.d, rtcpPort) + rtcpListener, err := newConnClientUDPListener(c, rtcpPort) if err != nil { rtpListener.close() return nil, nil, err @@ -471,12 +461,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000 rtcpPort = rtpPort + 1 - rtpListener, err := newConnClientUDPListener(c.d, rtpPort) + rtpListener, err := newConnClientUDPListener(c, rtpPort) if err != nil { continue } - rtcpListener, err := newConnClientUDPListener(c.d, rtcpPort) + rtcpListener, err := newConnClientUDPListener(c, rtcpPort) if err != nil { rtpListener.close() continue @@ -545,8 +535,7 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S } c.streamUrl = u - streamProtocol := proto - c.streamProtocol = &streamProtocol + c.streamProtocol = &proto c.tracks = append(c.tracks, track) @@ -563,241 +552,31 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S rtpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone rtpListener.remotePort = (*th.ServerPorts)[0] + rtpListener.trackId = track.Id + rtpListener.streamType = StreamTypeRtp c.udpRtpListeners[track.Id] = rtpListener rtcpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP rtcpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone rtcpListener.remotePort = (*th.ServerPorts)[1] + rtcpListener.trackId = track.Id + rtcpListener.streamType = StreamTypeRtcp c.udpRtcpListeners[track.Id] = rtcpListener } if mode == headers.TransportModePlay { - c.state = connClientStatePrePlay + *c.state = connClientStatePrePlay } else { - c.state = connClientStatePreRecord + *c.state = connClientStatePreRecord } return res, nil } -// Play writes a PLAY request and reads a Response. -// This can be called only after Setup(). -func (c *ConnClient) Play() (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ - connClientStatePrePlay: {}, - }) - if err != nil { - return nil, err - } - - res, err := c.Do(&base.Request{ - Method: base.PLAY, - URL: c.streamUrl, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) - } - - if *c.streamProtocol == StreamProtocolUDP { - c.writeFrameFunc = c.writeFrameUDP - } else { - c.writeFrameFunc = c.writeFrameTCP - } - - c.state = connClientStatePlay - - // open the firewall by sending packets to the counterpart - if *c.streamProtocol == StreamProtocolUDP { - for trackId := range c.udpRtpListeners { - c.WriteFrame(trackId, StreamTypeRtp, - []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) - - c.WriteFrame(trackId, StreamTypeRtcp, - []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) - } - } - - c.reportWriterTerminate = make(chan struct{}) - c.reportWriterDone = make(chan struct{}) - - go func() { - defer close(c.reportWriterDone) - - reportWriterTicker := time.NewTicker(clientReceiverReportPeriod) - defer reportWriterTicker.Stop() - - for { - select { - case <-c.reportWriterTerminate: - return - - case <-reportWriterTicker.C: - for trackId := range c.rtcpReceivers { - frame := c.rtcpReceivers[trackId].Report() - c.WriteFrame(trackId, StreamTypeRtcp, frame) - } - } - } - }() - - return res, nil -} - -// Announce writes an ANNOUNCE request and reads a Response. -func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ - connClientStateInitial: {}, - }) - if err != nil { - return nil, err - } - - res, err := c.Do(&base.Request{ - Method: base.ANNOUNCE, - URL: u, - Header: base.Header{ - "Content-Type": base.HeaderValue{"application/sdp"}, - }, - Content: tracks.Write(), - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) - } - - c.streamUrl = u - c.state = connClientStatePreRecord - - return res, nil -} - -// Record writes a RECORD request and reads a Response. -// This can be called only after Announce() and Setup(). -func (c *ConnClient) Record() (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ - connClientStatePreRecord: {}, - }) - if err != nil { - return nil, err - } - - res, err := c.Do(&base.Request{ - Method: base.RECORD, - URL: c.streamUrl, - }) - if err != nil { - return nil, err - } - - if res.StatusCode != base.StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) - } - - if *c.streamProtocol == StreamProtocolUDP { - c.writeFrameFunc = c.writeFrameUDP - } else { - c.writeFrameFunc = c.writeFrameTCP - } - - c.state = connClientStateRecord - - return nil, nil -} - -// LoopUDP must be called after Play() or Record(); it keeps -// the TCP connection open with keepalives, and returns when the TCP -// connection closes. -func (c *ConnClient) LoopUDP() error { - err := c.checkState(map[connClientState]struct{}{ - connClientStatePlay: {}, - connClientStateRecord: {}, - }) - if err != nil { - return err - } - - if *c.streamProtocol != StreamProtocolUDP { - return fmt.Errorf("stream protocol is not UDP") - } - - if c.state == connClientStatePlay { - readDone := make(chan error) - go func() { - for { - c.nconn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.d.ReadTimeout)) - var res base.Response - err := res.Read(c.br) - if err != nil { - readDone <- err - return - } - } - }() - - keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) - defer keepaliveTicker.Stop() - - checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod) - defer checkStreamTicker.Stop() - - for { - select { - case err := <-readDone: - c.nconn.Close() - return err - - case <-keepaliveTicker.C: - _, err := c.Do(&base.Request{ - Method: func() base.Method { - // the vlc integrated rtsp server requires GET_PARAMETER - if c.getParameterSupported { - return base.GET_PARAMETER - } - return base.OPTIONS - }(), - // use the stream path, otherwise some cameras do not reply - URL: c.streamUrl, - SkipResponse: true, - }) - if err != nil { - c.nconn.Close() - <-readDone - return err - } - - case <-checkStreamTicker.C: - now := time.Now() - - for _, lastUnix := range c.udpLastFrameTimes { - last := time.Unix(atomic.LoadInt64(lastUnix), 0) - - if now.Sub(last) >= c.d.ReadTimeout { - c.nconn.Close() - <-readDone - return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") - } - } - } - } - } - - // connClientStateRecord - c.nconn.SetReadDeadline(time.Time{}) // disable deadline - var res base.Response - return res.Read(c.br) -} - // Pause writes a PAUSE request and reads a Response. // This can be called only after Play() or Record(). func (c *ConnClient) Pause() (*base.Response, error) { - err := c.checkState(map[connClientState]struct{}{ + s, err := c.checkState(map[connClientState]struct{}{ connClientStatePlay: {}, connClientStateRecord: {}, }) @@ -805,9 +584,24 @@ func (c *ConnClient) Pause() (*base.Response, error) { return nil, err } - if c.state == connClientStatePlay { - close(c.reportWriterTerminate) - <-c.reportWriterDone + close(c.backgroundTerminate) + <-c.backgroundDone + + if s == connClientStatePlay { + if *c.streamProtocol == StreamProtocolUDP { + ch := c.udpFrame + go func() { + for range ch { + } + }() + + for trackId := range c.udpRtpListeners { + c.udpRtpListeners[trackId].stop() + c.udpRtcpListeners[trackId].stop() + } + + close(ch) + } } res, err := c.Do(&base.Request{ @@ -822,10 +616,11 @@ func (c *ConnClient) Pause() (*base.Response, error) { return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) } - if c.state == connClientStatePlay { - c.state = connClientStatePrePlay - } else { - c.state = connClientStatePreRecord + switch s { + case connClientStatePlay: + c.state.store(connClientStatePrePlay) + case connClientStateRecord: + c.state.store(connClientStatePreRecord) } return res, nil diff --git a/connclientpublish.go b/connclientpublish.go new file mode 100644 index 00000000..32d1879a --- /dev/null +++ b/connclientpublish.go @@ -0,0 +1,154 @@ +package gortsplib + +import ( + "fmt" + "time" + + "github.com/aler9/gortsplib/base" +) + +// Announce writes an ANNOUNCE request and reads a Response. +func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) { + _, err := c.checkState(map[connClientState]struct{}{ + connClientStateInitial: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.Do(&base.Request{ + Method: base.ANNOUNCE, + URL: u, + Header: base.Header{ + "Content-Type": base.HeaderValue{"application/sdp"}, + }, + Content: tracks.Write(), + }) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + c.streamUrl = u + *c.state = connClientStatePreRecord + + return res, nil +} + +// Record writes a RECORD request and reads a Response. +// This can be called only after Announce() and Setup(). +func (c *ConnClient) Record() (*base.Response, error) { + _, err := c.checkState(map[connClientState]struct{}{ + connClientStatePreRecord: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.Do(&base.Request{ + Method: base.RECORD, + URL: c.streamUrl, + }) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + if *c.streamProtocol == StreamProtocolUDP { + c.writeFrameFunc = c.writeFrameUDP + } else { + c.writeFrameFunc = c.writeFrameTCP + } + + c.state.store(connClientStateRecord) + + c.backgroundTerminate = make(chan struct{}) + c.backgroundDone = make(chan struct{}) + + if *c.streamProtocol == StreamProtocolUDP { + go c.backgroundRecordUDP() + } else { + go c.backgroundRecordTCP() + } + + return nil, nil +} + +func (c *ConnClient) backgroundRecordUDP() { + defer close(c.backgroundDone) + + c.nconn.SetReadDeadline(time.Time{}) // disable deadline + + readDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + readDone <- err + return + } + } + }() + + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readDone + c.backgroundUDPError = fmt.Errorf("terminated") + c.state.store(connClientStateUDPError) + return + + case err := <-readDone: + c.backgroundUDPError = err + c.state.store(connClientStateUDPError) + return + } +} + +func (c *ConnClient) backgroundRecordTCP() { + defer close(c.backgroundDone) +} + +func (c *ConnClient) writeFrameUDP(trackId int, streamType StreamType, content []byte) error { + switch c.state.load() { + case connClientStateUDPError: + return c.backgroundUDPError + + case connClientStateRecord: + + default: + return fmt.Errorf("not recording") + } + + if streamType == StreamTypeRtp { + return c.udpRtpListeners[trackId].write(content) + } + return c.udpRtcpListeners[trackId].write(content) +} + +func (c *ConnClient) writeFrameTCP(trackId int, streamType StreamType, content []byte) error { + if c.state.load() != connClientStateRecord { + return fmt.Errorf("not recording") + } + + c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout)) + frame := base.InterleavedFrame{ + TrackId: trackId, + StreamType: streamType, + Content: content, + } + return frame.Write(c.bw) +} + +// WriteFrame writes a frame. +// This can be used only after Record(). +func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error { + return c.writeFrameFunc(trackId, streamType, content) +} diff --git a/connclientread.go b/connclientread.go new file mode 100644 index 00000000..6a83c6d0 --- /dev/null +++ b/connclientread.go @@ -0,0 +1,201 @@ +package gortsplib + +import ( + "fmt" + "sync/atomic" + "time" + + "github.com/aler9/gortsplib/base" +) + +// Play writes a PLAY request and reads a Response. +// This can be called only after Setup(). +func (c *ConnClient) Play() (*base.Response, error) { + _, err := c.checkState(map[connClientState]struct{}{ + connClientStatePrePlay: {}, + }) + if err != nil { + return nil, err + } + + res, err := c.Do(&base.Request{ + Method: base.PLAY, + URL: c.streamUrl, + }) + if err != nil { + return nil, err + } + + if res.StatusCode != base.StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + if *c.streamProtocol == StreamProtocolUDP { + c.readFrameFunc = c.readFrameUDP + c.writeFrameFunc = c.writeFrameUDP + } else { + c.readFrameFunc = c.readFrameTCP + c.writeFrameFunc = c.writeFrameTCP + } + + c.state.store(connClientStatePlay) + + c.backgroundTerminate = make(chan struct{}) + c.backgroundDone = make(chan struct{}) + + if *c.streamProtocol == StreamProtocolUDP { + c.udpFrame = make(chan base.InterleavedFrame) + + for trackId := range c.udpRtpListeners { + c.udpRtpListeners[trackId].start() + c.udpRtcpListeners[trackId].start() + } + + // open the firewall by sending packets to the counterpart + for trackId := range c.udpRtpListeners { + c.WriteFrame(trackId, StreamTypeRtp, + []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) + + c.WriteFrame(trackId, StreamTypeRtcp, + []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) + } + + go c.backgroundPlayUDP() + } else { + go c.backgroundPlayTCP() + } + + return res, nil +} + +func (c *ConnClient) backgroundPlayUDP() { + defer close(c.backgroundDone) + + c.nconn.SetReadDeadline(time.Time{}) // disable deadline + + readDone := make(chan error) + go func() { + for { + var res base.Response + err := res.Read(c.br) + if err != nil { + readDone <- err + return + } + } + }() + + reportTicker := time.NewTicker(clientReceiverReportPeriod) + defer reportTicker.Stop() + + keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod) + defer keepaliveTicker.Stop() + + checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod) + defer checkStreamTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + c.nconn.SetReadDeadline(time.Now()) + <-readDone + c.backgroundUDPError = fmt.Errorf("terminated") + c.state.store(connClientStateUDPError) + return + + case <-reportTicker.C: + for trackId := range c.rtcpReceivers { + frame := c.rtcpReceivers[trackId].Report() + c.WriteFrame(trackId, StreamTypeRtcp, frame) + } + + case <-keepaliveTicker.C: + _, err := c.Do(&base.Request{ + Method: func() base.Method { + // the vlc integrated rtsp server requires GET_PARAMETER + if c.getParameterSupported { + return base.GET_PARAMETER + } + return base.OPTIONS + }(), + // use the stream path, otherwise some cameras do not reply + URL: c.streamUrl, + SkipResponse: true, + }) + if err != nil { + c.nconn.SetReadDeadline(time.Now()) + <-readDone + c.backgroundUDPError = err + c.state.store(connClientStateUDPError) + return + } + + case <-checkStreamTicker.C: + now := time.Now() + + for _, lastUnix := range c.udpLastFrameTimes { + last := time.Unix(atomic.LoadInt64(lastUnix), 0) + + if now.Sub(last) >= c.d.ReadTimeout { + c.nconn.SetReadDeadline(time.Now()) + <-readDone + c.backgroundUDPError = fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)") + c.state.store(connClientStateUDPError) + return + } + } + } + } +} + +func (c *ConnClient) backgroundPlayTCP() { + defer close(c.backgroundDone) + + reportTicker := time.NewTicker(clientReceiverReportPeriod) + defer reportTicker.Stop() + + for { + select { + case <-c.backgroundTerminate: + return + + case <-reportTicker.C: + for trackId := range c.rtcpReceivers { + frame := c.rtcpReceivers[trackId].Report() + c.WriteFrame(trackId, StreamTypeRtcp, frame) + } + } + } +} + +func (c *ConnClient) readFrameUDP() (int, StreamType, []byte, error) { + if c.state.load() != connClientStatePlay { + return 0, 0, nil, fmt.Errorf("not playing") + } + + f := <-c.udpFrame + return f.TrackId, f.StreamType, f.Content, nil +} + +func (c *ConnClient) readFrameTCP() (int, StreamType, []byte, error) { + if c.state.load() != connClientStatePlay { + return 0, 0, nil, fmt.Errorf("not playing") + } + + c.nconn.SetReadDeadline(time.Now().Add(c.d.ReadTimeout)) + c.frame.Content = c.tcpFrameBuffer.Next() + err := c.frame.Read(c.br) + if err != nil { + return 0, 0, nil, err + } + + c.rtcpReceivers[c.frame.TrackId].OnFrame(c.frame.StreamType, c.frame.Content) + + return c.frame.TrackId, c.frame.StreamType, c.frame.Content, nil +} + +// ReadFrame reads a frame. +// This can be used only after Play(). +func (c *ConnClient) ReadFrame() (int, StreamType, []byte, error) { + return c.readFrameFunc() +} diff --git a/connclientudpl.go b/connclientudpl.go index 06891e60..b9a8f173 100644 --- a/connclientudpl.go +++ b/connclientudpl.go @@ -3,40 +3,67 @@ package gortsplib import ( "net" "strconv" + "sync/atomic" + "time" + "github.com/aler9/gortsplib/base" "github.com/aler9/gortsplib/multibuffer" ) type connClientUDPListener struct { + c *ConnClient pc net.PacketConn remoteIp net.IP remoteZone string remotePort int udpFrameBuffer *multibuffer.MultiBuffer + trackId int + streamType StreamType + running bool + + done chan struct{} } -func newConnClientUDPListener(d Dialer, port int) (*connClientUDPListener, error) { - pc, err := d.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) +func newConnClientUDPListener(c *ConnClient, port int) (*connClientUDPListener, error) { + pc, err := c.d.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) if err != nil { return nil, err } return &connClientUDPListener{ + c: c, pc: pc, - udpFrameBuffer: multibuffer.New(d.ReadBufferCount, 2048), + udpFrameBuffer: multibuffer.New(c.d.ReadBufferCount+1, 2048), }, nil } func (l *connClientUDPListener) close() { + if l.running { + l.stop() + } l.pc.Close() } -func (l *connClientUDPListener) read() ([]byte, error) { +func (l *connClientUDPListener) start() { + l.running = true + l.pc.SetReadDeadline(time.Time{}) + l.done = make(chan struct{}) + go l.run() +} + +func (l *connClientUDPListener) stop() { + l.pc.SetReadDeadline(time.Now()) + <-l.done +} + +func (l *connClientUDPListener) run() { + defer close(l.done) + for { buf := l.udpFrameBuffer.Next() n, addr, err := l.pc.ReadFrom(buf) if err != nil { - return nil, err + return } uaddr := addr.(*net.UDPAddr) @@ -45,7 +72,15 @@ func (l *connClientUDPListener) read() ([]byte, error) { continue } - return buf[:n], nil + atomic.StoreInt64(l.c.udpLastFrameTimes[l.trackId], time.Now().Unix()) + + l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) + + l.c.udpFrame <- base.InterleavedFrame{ + TrackId: l.trackId, + StreamType: l.streamType, + Content: buf[:n], + } } } diff --git a/connserver.go b/connserver.go index c1f91834..93526666 100644 --- a/connserver.go +++ b/connserver.go @@ -20,11 +20,11 @@ type ConnServerConf struct { Conn net.Conn // (optional) timeout of read operations. - // It defaults to 5 seconds + // It defaults to 10 seconds ReadTimeout time.Duration // (optional) timeout of write operations. - // It defaults to 5 seconds + // It defaults to 10 seconds WriteTimeout time.Duration // (optional) read buffer count. diff --git a/dialer.go b/dialer.go index 276a5451..4d64a2b3 100644 --- a/dialer.go +++ b/dialer.go @@ -2,6 +2,7 @@ package gortsplib import ( "bufio" + "fmt" "net" "strings" "time" @@ -21,23 +22,27 @@ func Dial(host string) (*ConnClient, error) { } // DialRead connects to a server and starts reading all tracks. -func DialRead(address string, proto StreamProtocol) (*ConnClient, error) { - return DefaultDialer.DialRead(address, proto) +func DialRead(address string) (*ConnClient, error) { + return DefaultDialer.DialRead(address) } // DialPublish connects to a server and starts publishing the tracks. -func DialPublish(address string, proto StreamProtocol, tracks Tracks) (*ConnClient, error) { - return DefaultDialer.DialPublish(address, proto, tracks) +func DialPublish(address string, tracks Tracks) (*ConnClient, error) { + return DefaultDialer.DialPublish(address, tracks) } // Dialer allows to initialize a ConnClient. type Dialer struct { + // (optional) the stream protocol. + // It defaults to StreamProtocolUDP. + StreamProtocol StreamProtocol + // (optional) timeout of read operations. // It defaults to 10 seconds ReadTimeout time.Duration // (optional) timeout of write operations. - // It defaults to 5 seconds + // It defaults to 10 seconds WriteTimeout time.Duration // (optional) read buffer count. @@ -83,22 +88,27 @@ func (d Dialer) Dial(host string) (*ConnClient, error) { } return &ConnClient{ - d: d, - nconn: nconn, - br: bufio.NewReaderSize(nconn, clientReadBufferSize), - bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), - rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), - udpLastFrameTimes: make(map[int]*int64), - udpRtpListeners: make(map[int]*connClientUDPListener), - udpRtcpListeners: make(map[int]*connClientUDPListener), - response: &base.Response{}, - frame: &base.InterleavedFrame{}, - tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize), + d: d, + nconn: nconn, + br: bufio.NewReaderSize(nconn, clientReadBufferSize), + bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), + state: func() *connClientState { + v := connClientState(0) + return &v + }(), + rtcpReceivers: make(map[int]*rtcpreceiver.RtcpReceiver), + udpLastFrameTimes: make(map[int]*int64), + udpRtpListeners: make(map[int]*connClientUDPListener), + udpRtcpListeners: make(map[int]*connClientUDPListener), + response: &base.Response{}, + frame: &base.InterleavedFrame{}, + tcpFrameBuffer: multibuffer.New(d.ReadBufferCount, clientTCPFrameReadBufferSize), + backgroundUDPError: fmt.Errorf("not running"), }, nil } // DialRead connects to the address and starts reading all tracks. -func (d Dialer) DialRead(address string, proto StreamProtocol) (*ConnClient, error) { +func (d Dialer) DialRead(address string) (*ConnClient, error) { u, err := base.ParseURL(address) if err != nil { return nil, err @@ -124,11 +134,11 @@ func (d Dialer) DialRead(address string, proto StreamProtocol) (*ConnClient, err if res.StatusCode >= base.StatusMovedPermanently && res.StatusCode <= base.StatusUseProxy { conn.Close() - return d.DialRead(res.Header["Location"][0], proto) + return d.DialRead(res.Header["Location"][0]) } for _, track := range tracks { - _, err := conn.Setup(u, headers.TransportModePlay, proto, track, 0, 0) + _, err := conn.Setup(u, headers.TransportModePlay, d.StreamProtocol, track, 0, 0) if err != nil { conn.Close() return nil, err @@ -145,7 +155,7 @@ func (d Dialer) DialRead(address string, proto StreamProtocol) (*ConnClient, err } // DialPublish connects to the address and starts publishing the tracks. -func (d Dialer) DialPublish(address string, proto StreamProtocol, tracks Tracks) (*ConnClient, error) { +func (d Dialer) DialPublish(address string, tracks Tracks) (*ConnClient, error) { u, err := base.ParseURL(address) if err != nil { return nil, err @@ -169,7 +179,7 @@ func (d Dialer) DialPublish(address string, proto StreamProtocol, tracks Tracks) } for _, track := range tracks { - _, err = conn.Setup(u, headers.TransportModeRecord, proto, track, 0, 0) + _, err = conn.Setup(u, headers.TransportModeRecord, d.StreamProtocol, track, 0, 0) if err != nil { conn.Close() return nil, err diff --git a/dialer_test.go b/dialer_test.go index 83729581..2772cc4c 100644 --- a/dialer_test.go +++ b/dialer_test.go @@ -58,73 +58,110 @@ func (c *container) wait() int { return int(code) } -func TestDialReadUDP(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() +func TestDialRead(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - cnt2, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "/emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "udp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt2.close() + cnt2, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "udp", + "rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt2.close() - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - loopDone := make(chan struct{}) - defer func() { <-loopDone }() + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() - conn, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolUDP) - require.NoError(t, err) - defer conn.Close() + conn, err := dialer.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) - go func() { - defer close(loopDone) - conn.LoopUDP() - }() + id, typ, _, err := conn.ReadFrame() + require.NoError(t, err) - _, err = conn.ReadFrameUDP(0, StreamTypeRtp) - require.NoError(t, err) + require.Equal(t, 0, id) + require.Equal(t, StreamTypeRtp, typ) + + conn.Close() + + _, _, _, err = conn.ReadFrame() + require.Error(t, err) + }) + } } -func TestDialReadTCP(t *testing.T) { - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() +func TestDialReadClose(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - cnt2, err := newContainer("ffmpeg", "publish", []string{ - "-re", - "-stream_loop", "-1", - "-i", "/emptyvideo.ts", - "-c", "copy", - "-f", "rtsp", - "-rtsp_transport", "udp", - "rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt2.close() + cnt2, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", + "-rtsp_transport", "udp", + "rtsp://localhost:8554/teststream", + }) + require.NoError(t, err) + defer cnt2.close() - time.Sleep(1 * time.Second) + time.Sleep(1 * time.Second) - conn, err := DialRead("rtsp://localhost:8554/teststream", StreamProtocolTCP) - require.NoError(t, err) - defer conn.Close() + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() - id, typ, _, err := conn.ReadFrameTCP() - require.NoError(t, err) + conn, err := dialer.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) - require.Equal(t, 0, id) - require.Equal(t, StreamTypeRtp, typ) + readDone := make(chan struct{}) + go func() { + defer close(readDone) + + for { + _, _, _, err := conn.ReadFrame() + if err != nil { + break + } + } + }() + + time.Sleep(1 * time.Second) + + conn.Close() + <-readDone + }) + } } func TestDialReadRedirect(t *testing.T) { @@ -154,141 +191,78 @@ func TestDialReadRedirect(t *testing.T) { time.Sleep(1 * time.Second) - loopDone := make(chan struct{}) - defer func() { <-loopDone }() - - conn, err := DialRead("rtsp://localhost:8554/path1", StreamProtocolUDP) + conn, err := DialRead("rtsp://localhost:8554/path1") require.NoError(t, err) defer conn.Close() - go func() { - defer close(loopDone) - conn.LoopUDP() - }() - - _, err = conn.ReadFrameUDP(0, StreamTypeRtp) + _, _, _, err = conn.ReadFrame() require.NoError(t, err) } -func TestDialPublishUDP(t *testing.T) { - for _, server := range []string{ - "rtsp-simple-server", - "ffmpeg", +func TestDialReadPause(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", } { - t.Run(server, func(t *testing.T) { - switch server { - case "rtsp-simple-server": - cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) - require.NoError(t, err) - defer cnt1.close() - - default: - cnt0, err := newContainer("rtsp-simple-server", "server0", []string{"{}"}) - require.NoError(t, err) - defer cnt0.close() - - cnt1, err := newContainer("ffmpeg", "server", []string{ - "-fflags nobuffer -re -rtsp_flags listen -i rtsp://localhost:8555/teststream -c copy -f rtsp rtsp://localhost:8554/teststream", - }) - require.NoError(t, err) - defer cnt1.close() - } + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() time.Sleep(1 * time.Second) - publishDone := make(chan struct{}) - defer func() { <-publishDone }() - - var conn *ConnClient - - go func() { - defer close(publishDone) - - var writerDone chan struct{} - defer func() { - if writerDone != nil { - <-writerDone - } - }() - - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoderFromPacketConn(pc) - sps, pps, err := decoder.ReadSPSPPS() - require.NoError(t, err) - - track, err := NewTrackH264(0, sps, pps) - require.NoError(t, err) - - port := "8554" - if server == "ffmpeg" { - port = "8555" - } - conn, err = DialPublish("rtsp://localhost:"+port+"/teststream", - StreamProtocolUDP, Tracks{track}) - require.NoError(t, err) - - writerDone = make(chan struct{}) - - go func() { - defer close(writerDone) - - buf := make([]byte, 2048) - for { - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } - - err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) - if err != nil { - break - } - } - }() - - conn.LoopUDP() - }() - - if server == "ffmpeg" { - time.Sleep(5 * time.Second) - } - time.Sleep(1 * time.Second) - - cnt3, err := newContainer("ffmpeg", "read", []string{ + cnt2, err := newContainer("ffmpeg", "publish", []string{ + "-re", + "-stream_loop", "-1", + "-i", "/emptyvideo.ts", + "-c", "copy", + "-f", "rtsp", "-rtsp_transport", "udp", - "-i", "rtsp://localhost:8554/teststream", - "-vframes", "1", - "-f", "image2", - "-y", "/dev/null", + "rtsp://localhost:8554/teststream", }) require.NoError(t, err) - defer cnt3.close() + defer cnt2.close() - code := cnt3.wait() - require.Equal(t, 0, code) + time.Sleep(1 * time.Second) - conn.Close() + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() + + conn, err := dialer.DialRead("rtsp://localhost:8554/teststream") + require.NoError(t, err) + defer conn.Close() + + _, _, _, err = conn.ReadFrame() + require.NoError(t, err) + + _, err = conn.Pause() + require.NoError(t, err) + + _, err = conn.Play() + require.NoError(t, err) + + _, _, _, err = conn.ReadFrame() + require.NoError(t, err) }) } } -func TestDialPublishTCP(t *testing.T) { - for _, server := range []string{ - "rtsp-simple-server", - "ffmpeg", +func TestDialPublish(t *testing.T) { + for _, ca := range []struct { + proto string + server string + }{ + {"udp", "rtsp-simple-server"}, + {"udp", "ffmpeg"}, + {"tcp", "rtsp-simple-server"}, + {"tcp", "ffmpeg"}, } { - t.Run(server, func(t *testing.T) { - switch server { + t.Run(ca.proto+"_"+ca.server, func(t *testing.T) { + switch ca.server { case "rtsp-simple-server": cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) require.NoError(t, err) @@ -308,38 +282,47 @@ func TestDialPublishTCP(t *testing.T) { time.Sleep(1 * time.Second) + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + publishDone := make(chan struct{}) defer func() { <-publishDone }() var conn *ConnClient + defer func() { conn.Close() }() + + dialer := func() Dialer { + if ca.proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() go func() { defer close(publishDone) - pc, err := net.ListenPacket("udp4", "127.0.0.1:0") - require.NoError(t, err) - defer pc.Close() - - cnt2, err := newContainer("gstreamer", "source", []string{ - "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + - " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), - }) - require.NoError(t, err) - defer cnt2.close() - - decoder := rtph264.NewDecoderFromPacketConn(pc) - sps, pps, err := decoder.ReadSPSPPS() - require.NoError(t, err) - - track, err := NewTrackH264(0, sps, pps) - require.NoError(t, err) - port := "8554" - if server == "ffmpeg" { + if ca.server == "ffmpeg" { port = "8555" } - conn, err = DialPublish("rtsp://localhost:"+port+"/teststream", - StreamProtocolTCP, Tracks{track}) + var err error + conn, err = dialer.DialPublish("rtsp://localhost:"+port+"/teststream", + Tracks{track}) require.NoError(t, err) buf := make([]byte, 2048) @@ -356,7 +339,7 @@ func TestDialPublishTCP(t *testing.T) { } }() - if server == "ffmpeg" { + if ca.server == "ffmpeg" { time.Sleep(5 * time.Second) } time.Sleep(1 * time.Second) @@ -373,8 +356,139 @@ func TestDialPublishTCP(t *testing.T) { code := cnt3.wait() require.Equal(t, 0, code) - - conn.Close() + }) + } +} + +func TestDialPublishClose(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() + + conn, err := dialer.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + + writeDone := make(chan struct{}) + go func() { + defer close(writeDone) + + buf := make([]byte, 2048) + for { + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + if err != nil { + break + } + } + }() + + time.Sleep(1 * time.Second) + + conn.Close() + <-writeDone + }) + } +} + +func TestDialPublishPause(t *testing.T) { + for _, proto := range []string{ + "udp", + "tcp", + } { + t.Run(proto, func(t *testing.T) { + cnt1, err := newContainer("rtsp-simple-server", "server", []string{"{}"}) + require.NoError(t, err) + defer cnt1.close() + + time.Sleep(1 * time.Second) + + pc, err := net.ListenPacket("udp4", "127.0.0.1:0") + require.NoError(t, err) + defer pc.Close() + + cnt2, err := newContainer("gstreamer", "source", []string{ + "filesrc location=emptyvideo.ts ! tsdemux ! video/x-h264" + + " ! h264parse config-interval=1 ! rtph264pay ! udpsink host=127.0.0.1 port=" + strconv.FormatInt(int64(pc.LocalAddr().(*net.UDPAddr).Port), 10), + }) + require.NoError(t, err) + defer cnt2.close() + + decoder := rtph264.NewDecoderFromPacketConn(pc) + sps, pps, err := decoder.ReadSPSPPS() + require.NoError(t, err) + + track, err := NewTrackH264(0, sps, pps) + require.NoError(t, err) + + dialer := func() Dialer { + if proto == "udp" { + return Dialer{} + } + return Dialer{StreamProtocol: StreamProtocolTCP} + }() + + conn, err := dialer.DialPublish("rtsp://localhost:8554/teststream", + Tracks{track}) + require.NoError(t, err) + defer conn.Close() + + buf := make([]byte, 2048) + + n, _, err := pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + require.NoError(t, err) + + _, err = conn.Pause() + require.NoError(t, err) + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + require.Error(t, err) + + _, err = conn.Record() + require.NoError(t, err) + + n, _, err = pc.ReadFrom(buf) + require.NoError(t, err) + err = conn.WriteFrame(track.Id, StreamTypeRtp, buf[:n]) + require.NoError(t, err) }) } } diff --git a/examples/client-publish-tcp.go b/examples/client-publish-tcp.go index 02245cd1..10becf5b 100644 --- a/examples/client-publish-tcp.go +++ b/examples/client-publish-tcp.go @@ -41,8 +41,10 @@ func main() { } // connect to the server and start publishing the track - conn, err := gortsplib.DialPublish("rtsp://localhost:8554/mystream", - gortsplib.StreamProtocolTCP, gortsplib.Tracks{track}) + dialer := gortsplib.Dialer{ + StreamProtocol: gortsplib.StreamProtocolTCP, + } + conn, err := dialer.DialPublish("rtsp://localhost:8554/mystream", gortsplib.Tracks{track}) if err != nil { panic(err) } @@ -59,7 +61,7 @@ func main() { // write frames to the server err = conn.WriteFrame(track.Id, gortsplib.StreamTypeRtp, buf[:n]) if err != nil { - fmt.Println("connection is closed (%s)", err) + fmt.Printf("connection is closed (%s)\n", err) break } } diff --git a/examples/client-publish-udp.go b/examples/client-publish-udp.go index 2135cee7..177305ab 100644 --- a/examples/client-publish-udp.go +++ b/examples/client-publish-udp.go @@ -15,13 +15,6 @@ import ( // the frames with the UDP protocol. func main() { - var writerDone chan struct{} - defer func() { - if writerDone != nil { - <-writerDone - } - }() - // open a listener to receive RTP/H264 frames pc, err := net.ListenPacket("udp4", "127.0.0.1:9000") if err != nil { @@ -49,34 +42,25 @@ func main() { // connect to the server and start publishing the track conn, err := gortsplib.DialPublish("rtsp://localhost:8554/mystream", - gortsplib.StreamProtocolUDP, gortsplib.Tracks{track}) + gortsplib.Tracks{track}) if err != nil { panic(err) } defer conn.Close() - writerDone = make(chan struct{}) - - go func() { - defer close(writerDone) - - buf := make([]byte, 2048) - for { - // read frames from the source - n, _, err := pc.ReadFrom(buf) - if err != nil { - break - } - - // write frames to the server - err = conn.WriteFrame(track.Id, gortsplib.StreamTypeRtp, buf[:n]) - if err != nil { - break - } + buf := make([]byte, 2048) + for { + // read frames from the source + n, _, err := pc.ReadFrom(buf) + if err != nil { + break } - }() - // wait until the connection is closed - err = conn.LoopUDP() - fmt.Println("connection is closed (%s)", err) + // write frames to the server + err = conn.WriteFrame(track.Id, gortsplib.StreamTypeRtp, buf[:n]) + if err != nil { + fmt.Printf("connection is closed (%s)\n", err) + break + } + } } diff --git a/examples/client-query.go b/examples/client-query.go new file mode 100644 index 00000000..49c1050d --- /dev/null +++ b/examples/client-query.go @@ -0,0 +1,42 @@ +// +build ignore + +package main + +import ( + "fmt" + + "github.com/aler9/gortsplib" + "github.com/aler9/gortsplib/base" +) + +// This example shows how to connect to a server and print informations about +// tracks published on a certain path. + +func main() { + u, err := base.ParseURL("rtsp://myserver/mypath") + if err != nil { + panic(err) + } + + conn, err := gortsplib.Dial(u.Host) + if err != nil { + panic(err) + } + defer conn.Close() + + _, err = conn.Options(u) + if err != nil { + panic(err) + } + + tracks, res, err := conn.Describe(u) + if err != nil { + panic(err) + } + + if res.StatusCode != base.StatusOK { + panic(fmt.Errorf("server returned status %d", res.StatusCode)) + } + + fmt.Println("tracks: %v\n", tracks) +} diff --git a/examples/client-read-tcp.go b/examples/client-read-tcp.go index cf4f6ca3..3ee042a8 100644 --- a/examples/client-read-tcp.go +++ b/examples/client-read-tcp.go @@ -13,17 +13,20 @@ import ( func main() { // connect to the server and start reading all tracks - conn, err := gortsplib.DialRead("rtsp://localhost:8554/mystream", gortsplib.StreamProtocolTCP) + dialer := gortsplib.Dialer{ + StreamProtocol: gortsplib.StreamProtocolTCP, + } + conn, err := dialer.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } defer conn.Close() + // read frames for { - // read frames - id, typ, buf, err := conn.ReadFrameTCP() + id, typ, buf, err := conn.ReadFrame() if err != nil { - fmt.Println("connection is closed (%s)", err) + fmt.Printf("connection is closed (%s)\n", err) break } diff --git a/examples/client-read-udp.go b/examples/client-read-udp.go index acd65232..3f2fa0ff 100644 --- a/examples/client-read-udp.go +++ b/examples/client-read-udp.go @@ -4,7 +4,6 @@ package main import ( "fmt" - "sync" "github.com/aler9/gortsplib" ) @@ -13,49 +12,22 @@ import ( // read all tracks with the UDP protocol. func main() { - var wg sync.WaitGroup - defer wg.Wait() - // connect to the server and start reading all tracks - conn, err := gortsplib.DialRead("rtsp://localhost:8554/mystream", gortsplib.StreamProtocolUDP) + conn, err := gortsplib.DialRead("rtsp://localhost:8554/mystream") if err != nil { panic(err) } defer conn.Close() - for _, track := range conn.Tracks() { - // read RTP frames - wg.Add(1) - go func(trackId int) { - defer wg.Done() + // read frames + for { + id, typ, buf, err := conn.ReadFrame() + if err != nil { + fmt.Printf("connection is closed (%s)\n", err) + break + } - for { - buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtp) - if err != nil { - break - } - - fmt.Printf("frame from track %d, type RTP: %v\n", trackId, buf) - } - }(track.Id) - - // read RTCP frames - wg.Add(1) - go func(trackId int) { - defer wg.Done() - - for { - buf, err := conn.ReadFrameUDP(trackId, gortsplib.StreamTypeRtcp) - if err != nil { - break - } - - fmt.Printf("frame from track %d, type RTCP: %v\n", trackId, buf) - } - }(track.Id) + fmt.Printf("frame from track %d, type %v: %v\n", + id, typ, buf) } - - // wait until the connection is closed - err = conn.LoopUDP() - fmt.Println("connection is closed (%s)", err) }