diff --git a/conn-client-udpl.go b/conn-client-udpl.go new file mode 100644 index 00000000..bbef0c33 --- /dev/null +++ b/conn-client-udpl.go @@ -0,0 +1,51 @@ +package gortsplib + +import ( + "net" + "strconv" +) + +type ConnClientUdpListener struct { + c *ConnClient + pc net.PacketConn + trackId int + streamType StreamType + publisherIp net.IP + publisherPort int +} + +func newConnClientUdpListener(c *ConnClient, port int, trackId int, streamType StreamType) (*ConnClientUdpListener, error) { + pc, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10)) + if err != nil { + return nil, err + } + + return &ConnClientUdpListener{ + c: c, + pc: pc, + trackId: trackId, + streamType: streamType, + }, nil +} + +func (l *ConnClientUdpListener) Close() { + l.pc.Close() +} + +func (l *ConnClientUdpListener) Read(buf []byte) (int, error) { + for { + n, addr, err := l.pc.ReadFrom(buf) + if err != nil { + return 0, err + } + + uaddr := addr.(*net.UDPAddr) + + if !l.publisherIp.Equal(uaddr.IP) || l.publisherPort != uaddr.Port { + continue + } + + l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n]) + return n, nil + } +} diff --git a/conn-client.go b/conn-client.go index 13534efb..df31fc0a 100644 --- a/conn-client.go +++ b/conn-client.go @@ -20,9 +20,11 @@ import ( ) const ( - clientReadBufferSize = 4096 - clientWriteBufferSize = 4096 - clientTcpKeepalivePeriod = 30 * time.Second + clientReadBufferSize = 4096 + clientWriteBufferSize = 4096 + clientReceiverReportPeriod = 10 * time.Second + clientUdpCheckStreamPeriod = 5 * time.Second + clientTcpKeepalivePeriod = 30 * time.Second ) // Track is a track available in a certain URL. @@ -36,8 +38,8 @@ type Track struct { // ConnClientConf allows to configure a ConnClient. type ConnClientConf struct { - // pre-existing TCP connection that will be wrapped - Conn net.Conn + // target address in format hostname:port + Host string // (optional) timeout for read requests. // It defaults to 5 seconds @@ -50,16 +52,24 @@ type ConnClientConf struct { // ConnClient is a client-side RTSP connection. type ConnClient struct { - conf ConnClientConf - br *bufio.Reader - bw *bufio.Writer - session string - curCSeq int - auth *authClient + conf ConnClientConf + nconn net.Conn + br *bufio.Reader + bw *bufio.Writer + session string + cseq int + auth *authClient + streamProtocol *StreamProtocol + rtcpReceivers map[int]*RtcpReceiver + rtpListeners map[int]*ConnClientUdpListener + rtcpListeners map[int]*ConnClientUdpListener + + receiverReportTerminate chan struct{} + receiverReportDone chan struct{} } // NewConnClient allocates a ConnClient. See ConnClientConf for the options. -func NewConnClient(conf ConnClientConf) *ConnClient { +func NewConnClient(conf ConnClientConf) (*ConnClient, error) { if conf.ReadTimeout == time.Duration(0) { conf.ReadTimeout = 5 * time.Second } @@ -67,27 +77,59 @@ func NewConnClient(conf ConnClientConf) *ConnClient { conf.WriteTimeout = 5 * time.Second } - return &ConnClient{ - conf: conf, - br: bufio.NewReaderSize(conf.Conn, clientReadBufferSize), - bw: bufio.NewWriterSize(conf.Conn, clientWriteBufferSize), + nconn, err := net.DialTimeout("tcp", conf.Host, conf.ReadTimeout) + if err != nil { + return nil, err } + + return &ConnClient{ + conf: conf, + nconn: nconn, + br: bufio.NewReaderSize(nconn, clientReadBufferSize), + bw: bufio.NewWriterSize(nconn, clientWriteBufferSize), + rtcpReceivers: make(map[int]*RtcpReceiver), + rtpListeners: make(map[int]*ConnClientUdpListener), + rtcpListeners: make(map[int]*ConnClientUdpListener), + receiverReportTerminate: make(chan struct{}), + receiverReportDone: make(chan struct{}), + }, nil } -// NetConn returns the underlying net.Conn. -func (c *ConnClient) NetConn() net.Conn { - return c.conf.Conn +// Close closes all the ConnClient resources +func (c *ConnClient) Close() error { + close(c.receiverReportTerminate) + <-c.receiverReportDone + + for _, rr := range c.rtcpReceivers { + rr.Close() + } + + for _, l := range c.rtpListeners { + l.Close() + } + + for _, l := range c.rtcpListeners { + l.Close() + } + + return c.Close() } // ReadFrame reads an InterleavedFrame. func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { - c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) - return frame.Read(c.br) + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + err := frame.Read(c.br) + if err != nil { + return err + } + + c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content) + return nil } // ReadFrameOrResponse reads an InterleavedFrame or a Response. func (c *ConnClient) ReadFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { - c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) b, err := c.br.ReadByte() if err != nil { return nil, err @@ -129,10 +171,10 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { } // insert cseq - c.curCSeq += 1 - req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.curCSeq), 10)} + c.cseq += 1 + req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.cseq), 10)} - c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) + c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) err := req.Write(c.bw) if err != nil { return nil, err @@ -142,7 +184,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { return nil, nil } - c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) + c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) res, err := ReadResponse(c.br) if err != nil { return nil, err @@ -175,7 +217,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) { // WriteFrame writes an InterleavedFrame. func (c *ConnClient) WriteFrame(frame *InterleavedFrame) error { - c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) + c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) return frame.Write(c.bw) } @@ -319,7 +361,24 @@ func (c *ConnClient) setup(u *url.URL, media *sdp.MediaDescription, transport [] // SetupUdp writes a SETUP request, that means that we want to read // a given track with the UDP transport. It then reads a Response. func (c *ConnClient) SetupUdp(u *url.URL, track *Track, rtpPort int, - rtcpPort int) (int, int, *Response, error) { + rtcpPort int) (*ConnClientUdpListener, *ConnClientUdpListener, *Response, error) { + if c.streamProtocol != nil && *c.streamProtocol != StreamProtocolUdp { + return nil, nil, nil, fmt.Errorf("cannot setup tracks with different protocols") + } + + if _, ok := c.rtcpReceivers[track.Id]; ok { + return nil, nil, nil, fmt.Errorf("track has already been setup") + } + + rtpListener, err := newConnClientUdpListener(c, rtpPort, track.Id, StreamTypeRtp) + if err != nil { + return nil, nil, nil, err + } + + rtcpListener, err := newConnClientUdpListener(c, rtcpPort, track.Id, StreamTypeRtcp) + if err != nil { + return nil, nil, nil, err + } res, err := c.setup(u, track.Media, []string{ "RTP/AVP/UDP", @@ -327,27 +386,46 @@ func (c *ConnClient) SetupUdp(u *url.URL, track *Track, rtpPort int, fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), }) if err != nil { - return 0, 0, nil, err + return nil, nil, nil, err } th, err := ReadHeaderTransport(res.Header["Transport"]) if err != nil { - return 0, 0, nil, fmt.Errorf("SETUP: transport header: %s", err) + return nil, nil, nil, fmt.Errorf("SETUP: transport header: %s", err) } rtpServerPort, rtcpServerPort := th.Ports("server_port") if rtpServerPort == 0 { - return 0, 0, nil, fmt.Errorf("SETUP: server ports not provided") + return nil, nil, nil, fmt.Errorf("SETUP: server ports not provided") } - return rtpServerPort, rtcpServerPort, res, nil + c.rtcpReceivers[track.Id] = NewRtcpReceiver() + streamProtocol := StreamProtocolUdp + c.streamProtocol = &streamProtocol + + rtpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtpListener.publisherPort = rtpServerPort + c.rtpListeners[track.Id] = rtpListener + + rtcpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP + rtcpListener.publisherPort = rtcpServerPort + c.rtcpListeners[track.Id] = rtcpListener + + return rtpListener, rtcpListener, res, nil } // SetupTcp writes a SETUP request, that means that we want to read // a given track with the TCP transport. It then reads a Response. func (c *ConnClient) SetupTcp(u *url.URL, track *Track) (*Response, error) { - interleaved := fmt.Sprintf("interleaved=%d-%d", (track.Id * 2), (track.Id*2)+1) + if c.streamProtocol != nil && *c.streamProtocol != StreamProtocolTcp { + return nil, fmt.Errorf("cannot setup tracks with different protocols") + } + if _, ok := c.rtcpReceivers[track.Id]; ok { + return nil, fmt.Errorf("track has already been setup") + } + + interleaved := fmt.Sprintf("interleaved=%d-%d", (track.Id * 2), (track.Id*2)+1) res, err := c.setup(u, track.Media, []string{ "RTP/AVP/TCP", "unicast", @@ -364,14 +442,19 @@ func (c *ConnClient) SetupTcp(u *url.URL, track *Track) (*Response, error) { _, ok := th[interleaved] if !ok { - return nil, fmt.Errorf("SETUP: transport header does not have %s (%s)", interleaved, res.Header["Transport"]) + return nil, fmt.Errorf("SETUP: transport header does not contain '%s' (%s)", + interleaved, res.Header["Transport"]) } + c.rtcpReceivers[track.Id] = NewRtcpReceiver() + streamProtocol := StreamProtocolTcp + c.streamProtocol = &streamProtocol + return res, nil } -// Play writes a PLAY request, that means that we want to start the -// stream. It then reads a Response. +// Play must be called after SetupUDP() or SetupTCP(), and writes a PLAY request, +// that means that we want to start the stream. It then reads a Response. func (c *ConnClient) Play(u *url.URL) (*Response, error) { _, err := c.Do(&Request{ Method: PLAY, @@ -382,41 +465,93 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) { return nil, err } - frame := &InterleavedFrame{ - Content: make([]byte, 0, 512*1024), - } - - // v4lrtspserver sends frames before the response. - // ignore them and wait for the response. - for { - frame.Content = frame.Content[:cap(frame.Content)] - recv, err := c.ReadFrameOrResponse(frame) - if err != nil { - return nil, err + res, err := func() (*Response, error) { + frame := &InterleavedFrame{ + Content: make([]byte, 0, 512*1024), } - if res, ok := recv.(*Response); ok { - if res.StatusCode != StatusOK { - return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + // v4lrtspserver sends frames before the response. + // ignore them and wait for the response. + for { + frame.Content = frame.Content[:cap(frame.Content)] + recv, err := c.ReadFrameOrResponse(frame) + if err != nil { + return nil, err } - return res, nil + if res, ok := recv.(*Response); ok { + if res.StatusCode != StatusOK { + return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) + } + + return res, nil + } } + }() + if err != nil { + return nil, err } + + receiverReportTicker := time.NewTicker(clientReceiverReportPeriod) + go func() { + defer close(c.receiverReportDone) + defer receiverReportTicker.Stop() + + for { + select { + case <-c.receiverReportTerminate: + return + + case <-receiverReportTicker.C: + for trackId := range c.rtcpReceivers { + frame := c.rtcpReceivers[trackId].Report() + + if *c.streamProtocol == StreamProtocolUdp { + c.rtcpListeners[trackId].pc.WriteTo(frame, &net.UDPAddr{ + IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP, + Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone, + Port: c.rtcpListeners[trackId].publisherPort, + }) + + } else { + c.WriteFrame(&InterleavedFrame{ + TrackId: trackId, + StreamType: StreamTypeRtcp, + Content: frame, + }) + } + } + } + } + }() + + return res, nil } -// LoopUDP is called after setupping UDP tracks and calling Play(); it keeps +// LoopUDP must be called after SetupUDP() and Play(); it keeps // the TCP connection open through keepalives, and returns when the TCP // connection closes. -func (c *ConnClient) LoopUDP(u *url.URL) (error) { +func (c *ConnClient) LoopUDP(u *url.URL) error { keepaliveTicker := time.NewTicker(clientTcpKeepalivePeriod) defer keepaliveTicker.Stop() + checkStreamTicker := time.NewTicker(clientUdpCheckStreamPeriod) + defer checkStreamTicker.Stop() + for { - <- keepaliveTicker.C - _, err := c.Options(u) - if err != nil { - return err + select { + case <-keepaliveTicker.C: + _, err := c.Options(u) + if err != nil { + return err + } + + case <-checkStreamTicker.C: + for trackId := range c.rtcpReceivers { + if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.conf.ReadTimeout { + return fmt.Errorf("stream is dead") + } + } } } } diff --git a/examples/client-tcp.go b/examples/client-tcp.go index b00f36c5..53a64634 100644 --- a/examples/client-tcp.go +++ b/examples/client-tcp.go @@ -4,9 +4,7 @@ package main import ( "fmt" - "net" "net/url" - "time" "github.com/aler9/gortsplib" ) @@ -17,42 +15,42 @@ func main() { panic(err) } - conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second) + conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host}) if err != nil { panic(err) } defer conn.Close() - rconn := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn}) - - _, err = rconn.Options(u) + _, err = conn.Options(u) if err != nil { panic(err) } - tracks, _, err := rconn.Describe(u) + tracks, _, err := conn.Describe(u) if err != nil { panic(err) } for _, track := range tracks { - _, err := rconn.SetupTcp(u, track) + _, err := conn.SetupTcp(u, track) if err != nil { panic(err) } } - _, err = rconn.Play(u) + _, err = conn.Play(u) if err != nil { panic(err) } frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 512*1024)} + for { frame.Content = frame.Content[:cap(frame.Content)] - err := rconn.ReadFrame(frame) + + err := conn.ReadFrame(frame) if err != nil { - fmt.Println("connection is closed") + fmt.Println("connection is closed (%s)", err) break } diff --git a/examples/client-udp.go b/examples/client-udp.go index 89ac15ea..9419cbed 100644 --- a/examples/client-udp.go +++ b/examples/client-udp.go @@ -4,10 +4,8 @@ package main import ( "fmt" - "net" "net/url" - "strconv" - "time" + "sync" "github.com/aler9/gortsplib" ) @@ -18,84 +16,87 @@ func main() { panic(err) } - conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second) + conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host}) if err != nil { panic(err) } defer conn.Close() - rconn := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn}) - - _, err = rconn.Options(u) + _, err = conn.Options(u) if err != nil { panic(err) } - tracks, _, err := rconn.Describe(u) + tracks, _, err := conn.Describe(u) if err != nil { panic(err) } - var rtpListeners []net.PacketConn - var rtcpListeners []net.PacketConn + type trackListenerPair struct { + rtpl *gortsplib.ConnClientUdpListener + rtcpl *gortsplib.ConnClientUdpListener + } + var listeners []*trackListenerPair for _, track := range tracks { - rtpPort := 9000 + track.Id*2 - rtpl, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(rtpPort), 10)) + rtpl, rtcpl, _, err := conn.SetupUdp(u, track, 9000+track.Id*2, 9001+track.Id*2) if err != nil { panic(err) } - defer rtpl.Close() - rtpListeners = append(rtpListeners, rtpl) - rtcpPort := rtpPort + 1 - rtcpl, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(rtcpPort), 10)) - if err != nil { - panic(err) - } - defer rtcpl.Close() - rtcpListeners = append(rtcpListeners, rtcpl) - - _, _, _, err = rconn.SetupUdp(u, track, rtpPort, rtcpPort) - if err != nil { - panic(err) - } + listeners = append(listeners, &trackListenerPair{ + rtpl: rtpl, + rtcpl: rtcpl, + }) } - _, err = rconn.Play(u) + _, err = conn.Play(u) if err != nil { panic(err) } - // receive RTP packets - for trackId, l := range rtpListeners { - go func(trackId int, l net.PacketConn) { + var wg sync.WaitGroup + + for trackId, lp := range listeners { + wg.Add(2) + + // receive RTP packets + go func(trackId int, l *gortsplib.ConnClientUdpListener) { + defer wg.Done() + buf := make([]byte, 2048) for { - n, _, err := l.ReadFrom(buf) + n, err := l.Read(buf) if err != nil { break } fmt.Printf("packet from track %d, type RTP: %v\n", trackId, buf[:n]) } - }(trackId, l) - } + }(trackId, lp.rtpl) + + // receive RTCP packets + go func(trackId int, l *gortsplib.ConnClientUdpListener) { + defer wg.Done() - // receive RTCP packets - for trackId, l := range rtcpListeners { - go func(trackId int, l net.PacketConn) { buf := make([]byte, 2048) for { - n, _, err := l.ReadFrom(buf) + n, err := l.Read(buf) if err != nil { break } fmt.Printf("packet from track %d, type RTCP: %v\n", trackId, buf[:n]) } - }(trackId, l) + }(trackId, lp.rtcpl) } - panic(rconn.LoopUDP(u)) + err = conn.LoopUDP(u) + fmt.Println("connection is closed (%s)", err) + + for _, lp := range listeners { + lp.rtpl.Close() + lp.rtcpl.Close() + } + wg.Wait() } diff --git a/rtcp-receiver.go b/rtcp-receiver.go index 45458237..924ea50a 100644 --- a/rtcp-receiver.go +++ b/rtcp-receiver.go @@ -40,7 +40,7 @@ type rtcpReceiverEventTerminate struct{} func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {} -// RtcpReceiver is an object that helps to build RTCP receiver reports from +// RtcpReceiver is an object that helps building RTCP receiver reports, by parsing // incoming frames. type RtcpReceiver struct { events chan rtcpReceiverEvent diff --git a/utils.go b/utils.go index 1239ee53..ffff8d7c 100644 --- a/utils.go +++ b/utils.go @@ -15,11 +15,8 @@ const ( type StreamProtocol int const ( - // StreamProtocolInvalid is an invalid protocol - StreamProtocolInvalid StreamProtocol = iota - - // StreamProtocolUdp means that the stream uses the UDP\ protocol - StreamProtocolUdp + // StreamProtocolUdp means that the stream uses the UDP protocol + StreamProtocolUdp StreamProtocol = iota // StreamProtocolTcp means that the stream uses the TCP protocol StreamProtocolTcp @@ -38,7 +35,7 @@ type StreamType int const ( // StreamTypeRtp means that the stream contains RTP packets - StreamTypeRtp StreamType = iota + 1 + StreamTypeRtp StreamType = iota // StreamTypeRtcp means that the stream contains RTCP packets StreamTypeRtcp