add rtcp receiver reports to ClientConn

This commit is contained in:
aler9
2020-07-19 21:38:41 +02:00
parent a119764b96
commit 11f82842ac
6 changed files with 296 additions and 114 deletions

51
conn-client-udpl.go Normal file
View File

@@ -0,0 +1,51 @@
package gortsplib
import (
"net"
"strconv"
)
type ConnClientUdpListener struct {
c *ConnClient
pc net.PacketConn
trackId int
streamType StreamType
publisherIp net.IP
publisherPort int
}
func newConnClientUdpListener(c *ConnClient, port int, trackId int, streamType StreamType) (*ConnClientUdpListener, error) {
pc, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(port), 10))
if err != nil {
return nil, err
}
return &ConnClientUdpListener{
c: c,
pc: pc,
trackId: trackId,
streamType: streamType,
}, nil
}
func (l *ConnClientUdpListener) Close() {
l.pc.Close()
}
func (l *ConnClientUdpListener) Read(buf []byte) (int, error) {
for {
n, addr, err := l.pc.ReadFrom(buf)
if err != nil {
return 0, err
}
uaddr := addr.(*net.UDPAddr)
if !l.publisherIp.Equal(uaddr.IP) || l.publisherPort != uaddr.Port {
continue
}
l.c.rtcpReceivers[l.trackId].OnFrame(l.streamType, buf[:n])
return n, nil
}
}

View File

@@ -20,9 +20,11 @@ import (
) )
const ( const (
clientReadBufferSize = 4096 clientReadBufferSize = 4096
clientWriteBufferSize = 4096 clientWriteBufferSize = 4096
clientTcpKeepalivePeriod = 30 * time.Second clientReceiverReportPeriod = 10 * time.Second
clientUdpCheckStreamPeriod = 5 * time.Second
clientTcpKeepalivePeriod = 30 * time.Second
) )
// Track is a track available in a certain URL. // Track is a track available in a certain URL.
@@ -36,8 +38,8 @@ type Track struct {
// ConnClientConf allows to configure a ConnClient. // ConnClientConf allows to configure a ConnClient.
type ConnClientConf struct { type ConnClientConf struct {
// pre-existing TCP connection that will be wrapped // target address in format hostname:port
Conn net.Conn Host string
// (optional) timeout for read requests. // (optional) timeout for read requests.
// It defaults to 5 seconds // It defaults to 5 seconds
@@ -50,16 +52,24 @@ type ConnClientConf struct {
// ConnClient is a client-side RTSP connection. // ConnClient is a client-side RTSP connection.
type ConnClient struct { type ConnClient struct {
conf ConnClientConf conf ConnClientConf
br *bufio.Reader nconn net.Conn
bw *bufio.Writer br *bufio.Reader
session string bw *bufio.Writer
curCSeq int session string
auth *authClient cseq int
auth *authClient
streamProtocol *StreamProtocol
rtcpReceivers map[int]*RtcpReceiver
rtpListeners map[int]*ConnClientUdpListener
rtcpListeners map[int]*ConnClientUdpListener
receiverReportTerminate chan struct{}
receiverReportDone chan struct{}
} }
// NewConnClient allocates a ConnClient. See ConnClientConf for the options. // NewConnClient allocates a ConnClient. See ConnClientConf for the options.
func NewConnClient(conf ConnClientConf) *ConnClient { func NewConnClient(conf ConnClientConf) (*ConnClient, error) {
if conf.ReadTimeout == time.Duration(0) { if conf.ReadTimeout == time.Duration(0) {
conf.ReadTimeout = 5 * time.Second conf.ReadTimeout = 5 * time.Second
} }
@@ -67,27 +77,59 @@ func NewConnClient(conf ConnClientConf) *ConnClient {
conf.WriteTimeout = 5 * time.Second conf.WriteTimeout = 5 * time.Second
} }
return &ConnClient{ nconn, err := net.DialTimeout("tcp", conf.Host, conf.ReadTimeout)
conf: conf, if err != nil {
br: bufio.NewReaderSize(conf.Conn, clientReadBufferSize), return nil, err
bw: bufio.NewWriterSize(conf.Conn, clientWriteBufferSize),
} }
return &ConnClient{
conf: conf,
nconn: nconn,
br: bufio.NewReaderSize(nconn, clientReadBufferSize),
bw: bufio.NewWriterSize(nconn, clientWriteBufferSize),
rtcpReceivers: make(map[int]*RtcpReceiver),
rtpListeners: make(map[int]*ConnClientUdpListener),
rtcpListeners: make(map[int]*ConnClientUdpListener),
receiverReportTerminate: make(chan struct{}),
receiverReportDone: make(chan struct{}),
}, nil
} }
// NetConn returns the underlying net.Conn. // Close closes all the ConnClient resources
func (c *ConnClient) NetConn() net.Conn { func (c *ConnClient) Close() error {
return c.conf.Conn close(c.receiverReportTerminate)
<-c.receiverReportDone
for _, rr := range c.rtcpReceivers {
rr.Close()
}
for _, l := range c.rtpListeners {
l.Close()
}
for _, l := range c.rtcpListeners {
l.Close()
}
return c.Close()
} }
// ReadFrame reads an InterleavedFrame. // ReadFrame reads an InterleavedFrame.
func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error { func (c *ConnClient) ReadFrame(frame *InterleavedFrame) error {
c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
return frame.Read(c.br) err := frame.Read(c.br)
if err != nil {
return err
}
c.rtcpReceivers[frame.TrackId].OnFrame(frame.StreamType, frame.Content)
return nil
} }
// ReadFrameOrResponse reads an InterleavedFrame or a Response. // ReadFrameOrResponse reads an InterleavedFrame or a Response.
func (c *ConnClient) ReadFrameOrResponse(frame *InterleavedFrame) (interface{}, error) { func (c *ConnClient) ReadFrameOrResponse(frame *InterleavedFrame) (interface{}, error) {
c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
b, err := c.br.ReadByte() b, err := c.br.ReadByte()
if err != nil { if err != nil {
return nil, err return nil, err
@@ -129,10 +171,10 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
} }
// insert cseq // insert cseq
c.curCSeq += 1 c.cseq += 1
req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.curCSeq), 10)} req.Header["CSeq"] = HeaderValue{strconv.FormatInt(int64(c.cseq), 10)}
c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
err := req.Write(c.bw) err := req.Write(c.bw)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -142,7 +184,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
return nil, nil return nil, nil
} }
c.conf.Conn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout)) c.nconn.SetReadDeadline(time.Now().Add(c.conf.ReadTimeout))
res, err := ReadResponse(c.br) res, err := ReadResponse(c.br)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -175,7 +217,7 @@ func (c *ConnClient) Do(req *Request) (*Response, error) {
// WriteFrame writes an InterleavedFrame. // WriteFrame writes an InterleavedFrame.
func (c *ConnClient) WriteFrame(frame *InterleavedFrame) error { func (c *ConnClient) WriteFrame(frame *InterleavedFrame) error {
c.conf.Conn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout)) c.nconn.SetWriteDeadline(time.Now().Add(c.conf.WriteTimeout))
return frame.Write(c.bw) return frame.Write(c.bw)
} }
@@ -319,7 +361,24 @@ func (c *ConnClient) setup(u *url.URL, media *sdp.MediaDescription, transport []
// SetupUdp writes a SETUP request, that means that we want to read // SetupUdp writes a SETUP request, that means that we want to read
// a given track with the UDP transport. It then reads a Response. // a given track with the UDP transport. It then reads a Response.
func (c *ConnClient) SetupUdp(u *url.URL, track *Track, rtpPort int, func (c *ConnClient) SetupUdp(u *url.URL, track *Track, rtpPort int,
rtcpPort int) (int, int, *Response, error) { rtcpPort int) (*ConnClientUdpListener, *ConnClientUdpListener, *Response, error) {
if c.streamProtocol != nil && *c.streamProtocol != StreamProtocolUdp {
return nil, nil, nil, fmt.Errorf("cannot setup tracks with different protocols")
}
if _, ok := c.rtcpReceivers[track.Id]; ok {
return nil, nil, nil, fmt.Errorf("track has already been setup")
}
rtpListener, err := newConnClientUdpListener(c, rtpPort, track.Id, StreamTypeRtp)
if err != nil {
return nil, nil, nil, err
}
rtcpListener, err := newConnClientUdpListener(c, rtcpPort, track.Id, StreamTypeRtcp)
if err != nil {
return nil, nil, nil, err
}
res, err := c.setup(u, track.Media, []string{ res, err := c.setup(u, track.Media, []string{
"RTP/AVP/UDP", "RTP/AVP/UDP",
@@ -327,27 +386,46 @@ func (c *ConnClient) SetupUdp(u *url.URL, track *Track, rtpPort int,
fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort), fmt.Sprintf("client_port=%d-%d", rtpPort, rtcpPort),
}) })
if err != nil { if err != nil {
return 0, 0, nil, err return nil, nil, nil, err
} }
th, err := ReadHeaderTransport(res.Header["Transport"]) th, err := ReadHeaderTransport(res.Header["Transport"])
if err != nil { if err != nil {
return 0, 0, nil, fmt.Errorf("SETUP: transport header: %s", err) return nil, nil, nil, fmt.Errorf("SETUP: transport header: %s", err)
} }
rtpServerPort, rtcpServerPort := th.Ports("server_port") rtpServerPort, rtcpServerPort := th.Ports("server_port")
if rtpServerPort == 0 { if rtpServerPort == 0 {
return 0, 0, nil, fmt.Errorf("SETUP: server ports not provided") return nil, nil, nil, fmt.Errorf("SETUP: server ports not provided")
} }
return rtpServerPort, rtcpServerPort, res, nil c.rtcpReceivers[track.Id] = NewRtcpReceiver()
streamProtocol := StreamProtocolUdp
c.streamProtocol = &streamProtocol
rtpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.publisherPort = rtpServerPort
c.rtpListeners[track.Id] = rtpListener
rtcpListener.publisherIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtcpListener.publisherPort = rtcpServerPort
c.rtcpListeners[track.Id] = rtcpListener
return rtpListener, rtcpListener, res, nil
} }
// SetupTcp writes a SETUP request, that means that we want to read // SetupTcp writes a SETUP request, that means that we want to read
// a given track with the TCP transport. It then reads a Response. // a given track with the TCP transport. It then reads a Response.
func (c *ConnClient) SetupTcp(u *url.URL, track *Track) (*Response, error) { func (c *ConnClient) SetupTcp(u *url.URL, track *Track) (*Response, error) {
interleaved := fmt.Sprintf("interleaved=%d-%d", (track.Id * 2), (track.Id*2)+1) if c.streamProtocol != nil && *c.streamProtocol != StreamProtocolTcp {
return nil, fmt.Errorf("cannot setup tracks with different protocols")
}
if _, ok := c.rtcpReceivers[track.Id]; ok {
return nil, fmt.Errorf("track has already been setup")
}
interleaved := fmt.Sprintf("interleaved=%d-%d", (track.Id * 2), (track.Id*2)+1)
res, err := c.setup(u, track.Media, []string{ res, err := c.setup(u, track.Media, []string{
"RTP/AVP/TCP", "RTP/AVP/TCP",
"unicast", "unicast",
@@ -364,14 +442,19 @@ func (c *ConnClient) SetupTcp(u *url.URL, track *Track) (*Response, error) {
_, ok := th[interleaved] _, ok := th[interleaved]
if !ok { if !ok {
return nil, fmt.Errorf("SETUP: transport header does not have %s (%s)", interleaved, res.Header["Transport"]) return nil, fmt.Errorf("SETUP: transport header does not contain '%s' (%s)",
interleaved, res.Header["Transport"])
} }
c.rtcpReceivers[track.Id] = NewRtcpReceiver()
streamProtocol := StreamProtocolTcp
c.streamProtocol = &streamProtocol
return res, nil return res, nil
} }
// Play writes a PLAY request, that means that we want to start the // Play must be called after SetupUDP() or SetupTCP(), and writes a PLAY request,
// stream. It then reads a Response. // that means that we want to start the stream. It then reads a Response.
func (c *ConnClient) Play(u *url.URL) (*Response, error) { func (c *ConnClient) Play(u *url.URL) (*Response, error) {
_, err := c.Do(&Request{ _, err := c.Do(&Request{
Method: PLAY, Method: PLAY,
@@ -382,41 +465,93 @@ func (c *ConnClient) Play(u *url.URL) (*Response, error) {
return nil, err return nil, err
} }
frame := &InterleavedFrame{ res, err := func() (*Response, error) {
Content: make([]byte, 0, 512*1024), frame := &InterleavedFrame{
} Content: make([]byte, 0, 512*1024),
// v4lrtspserver sends frames before the response.
// ignore them and wait for the response.
for {
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.ReadFrameOrResponse(frame)
if err != nil {
return nil, err
} }
if res, ok := recv.(*Response); ok { // v4lrtspserver sends frames before the response.
if res.StatusCode != StatusOK { // ignore them and wait for the response.
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage) for {
frame.Content = frame.Content[:cap(frame.Content)]
recv, err := c.ReadFrameOrResponse(frame)
if err != nil {
return nil, err
} }
return res, nil if res, ok := recv.(*Response); ok {
if res.StatusCode != StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
return res, nil
}
} }
}()
if err != nil {
return nil, err
} }
receiverReportTicker := time.NewTicker(clientReceiverReportPeriod)
go func() {
defer close(c.receiverReportDone)
defer receiverReportTicker.Stop()
for {
select {
case <-c.receiverReportTerminate:
return
case <-receiverReportTicker.C:
for trackId := range c.rtcpReceivers {
frame := c.rtcpReceivers[trackId].Report()
if *c.streamProtocol == StreamProtocolUdp {
c.rtcpListeners[trackId].pc.WriteTo(frame, &net.UDPAddr{
IP: c.nconn.RemoteAddr().(*net.TCPAddr).IP,
Zone: c.nconn.RemoteAddr().(*net.TCPAddr).Zone,
Port: c.rtcpListeners[trackId].publisherPort,
})
} else {
c.WriteFrame(&InterleavedFrame{
TrackId: trackId,
StreamType: StreamTypeRtcp,
Content: frame,
})
}
}
}
}
}()
return res, nil
} }
// LoopUDP is called after setupping UDP tracks and calling Play(); it keeps // LoopUDP must be called after SetupUDP() and Play(); it keeps
// the TCP connection open through keepalives, and returns when the TCP // the TCP connection open through keepalives, and returns when the TCP
// connection closes. // connection closes.
func (c *ConnClient) LoopUDP(u *url.URL) (error) { func (c *ConnClient) LoopUDP(u *url.URL) error {
keepaliveTicker := time.NewTicker(clientTcpKeepalivePeriod) keepaliveTicker := time.NewTicker(clientTcpKeepalivePeriod)
defer keepaliveTicker.Stop() defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientUdpCheckStreamPeriod)
defer checkStreamTicker.Stop()
for { for {
<- keepaliveTicker.C select {
_, err := c.Options(u) case <-keepaliveTicker.C:
if err != nil { _, err := c.Options(u)
return err if err != nil {
return err
}
case <-checkStreamTicker.C:
for trackId := range c.rtcpReceivers {
if time.Since(c.rtcpReceivers[trackId].LastFrameTime()) >= c.conf.ReadTimeout {
return fmt.Errorf("stream is dead")
}
}
} }
} }
} }

View File

@@ -4,9 +4,7 @@ package main
import ( import (
"fmt" "fmt"
"net"
"net/url" "net/url"
"time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
@@ -17,42 +15,42 @@ func main() {
panic(err) panic(err)
} }
conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second) conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close() defer conn.Close()
rconn := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn}) _, err = conn.Options(u)
_, err = rconn.Options(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
tracks, _, err := rconn.Describe(u) tracks, _, err := conn.Describe(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
for _, track := range tracks { for _, track := range tracks {
_, err := rconn.SetupTcp(u, track) _, err := conn.SetupTcp(u, track)
if err != nil { if err != nil {
panic(err) panic(err)
} }
} }
_, err = rconn.Play(u) _, err = conn.Play(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 512*1024)} frame := &gortsplib.InterleavedFrame{Content: make([]byte, 0, 512*1024)}
for { for {
frame.Content = frame.Content[:cap(frame.Content)] frame.Content = frame.Content[:cap(frame.Content)]
err := rconn.ReadFrame(frame)
err := conn.ReadFrame(frame)
if err != nil { if err != nil {
fmt.Println("connection is closed") fmt.Println("connection is closed (%s)", err)
break break
} }

View File

@@ -4,10 +4,8 @@ package main
import ( import (
"fmt" "fmt"
"net"
"net/url" "net/url"
"strconv" "sync"
"time"
"github.com/aler9/gortsplib" "github.com/aler9/gortsplib"
) )
@@ -18,84 +16,87 @@ func main() {
panic(err) panic(err)
} }
conn, err := net.DialTimeout("tcp", u.Host, 5*time.Second) conn, err := gortsplib.NewConnClient(gortsplib.ConnClientConf{Host: u.Host})
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer conn.Close() defer conn.Close()
rconn := gortsplib.NewConnClient(gortsplib.ConnClientConf{Conn: conn}) _, err = conn.Options(u)
_, err = rconn.Options(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
tracks, _, err := rconn.Describe(u) tracks, _, err := conn.Describe(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
var rtpListeners []net.PacketConn type trackListenerPair struct {
var rtcpListeners []net.PacketConn rtpl *gortsplib.ConnClientUdpListener
rtcpl *gortsplib.ConnClientUdpListener
}
var listeners []*trackListenerPair
for _, track := range tracks { for _, track := range tracks {
rtpPort := 9000 + track.Id*2 rtpl, rtcpl, _, err := conn.SetupUdp(u, track, 9000+track.Id*2, 9001+track.Id*2)
rtpl, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(rtpPort), 10))
if err != nil { if err != nil {
panic(err) panic(err)
} }
defer rtpl.Close()
rtpListeners = append(rtpListeners, rtpl)
rtcpPort := rtpPort + 1 listeners = append(listeners, &trackListenerPair{
rtcpl, err := net.ListenPacket("udp", ":"+strconv.FormatInt(int64(rtcpPort), 10)) rtpl: rtpl,
if err != nil { rtcpl: rtcpl,
panic(err) })
}
defer rtcpl.Close()
rtcpListeners = append(rtcpListeners, rtcpl)
_, _, _, err = rconn.SetupUdp(u, track, rtpPort, rtcpPort)
if err != nil {
panic(err)
}
} }
_, err = rconn.Play(u) _, err = conn.Play(u)
if err != nil { if err != nil {
panic(err) panic(err)
} }
// receive RTP packets var wg sync.WaitGroup
for trackId, l := range rtpListeners {
go func(trackId int, l net.PacketConn) { for trackId, lp := range listeners {
wg.Add(2)
// receive RTP packets
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
defer wg.Done()
buf := make([]byte, 2048) buf := make([]byte, 2048)
for { for {
n, _, err := l.ReadFrom(buf) n, err := l.Read(buf)
if err != nil { if err != nil {
break break
} }
fmt.Printf("packet from track %d, type RTP: %v\n", trackId, buf[:n]) fmt.Printf("packet from track %d, type RTP: %v\n", trackId, buf[:n])
} }
}(trackId, l) }(trackId, lp.rtpl)
}
// receive RTCP packets
go func(trackId int, l *gortsplib.ConnClientUdpListener) {
defer wg.Done()
// receive RTCP packets
for trackId, l := range rtcpListeners {
go func(trackId int, l net.PacketConn) {
buf := make([]byte, 2048) buf := make([]byte, 2048)
for { for {
n, _, err := l.ReadFrom(buf) n, err := l.Read(buf)
if err != nil { if err != nil {
break break
} }
fmt.Printf("packet from track %d, type RTCP: %v\n", trackId, buf[:n]) fmt.Printf("packet from track %d, type RTCP: %v\n", trackId, buf[:n])
} }
}(trackId, l) }(trackId, lp.rtcpl)
} }
panic(rconn.LoopUDP(u)) err = conn.LoopUDP(u)
fmt.Println("connection is closed (%s)", err)
for _, lp := range listeners {
lp.rtpl.Close()
lp.rtcpl.Close()
}
wg.Wait()
} }

View File

@@ -40,7 +40,7 @@ type rtcpReceiverEventTerminate struct{}
func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {} func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {}
// RtcpReceiver is an object that helps to build RTCP receiver reports from // RtcpReceiver is an object that helps building RTCP receiver reports, by parsing
// incoming frames. // incoming frames.
type RtcpReceiver struct { type RtcpReceiver struct {
events chan rtcpReceiverEvent events chan rtcpReceiverEvent

View File

@@ -15,11 +15,8 @@ const (
type StreamProtocol int type StreamProtocol int
const ( const (
// StreamProtocolInvalid is an invalid protocol // StreamProtocolUdp means that the stream uses the UDP protocol
StreamProtocolInvalid StreamProtocol = iota StreamProtocolUdp StreamProtocol = iota
// StreamProtocolUdp means that the stream uses the UDP\ protocol
StreamProtocolUdp
// StreamProtocolTcp means that the stream uses the TCP protocol // StreamProtocolTcp means that the stream uses the TCP protocol
StreamProtocolTcp StreamProtocolTcp
@@ -38,7 +35,7 @@ type StreamType int
const ( const (
// StreamTypeRtp means that the stream contains RTP packets // StreamTypeRtp means that the stream contains RTP packets
StreamTypeRtp StreamType = iota + 1 StreamTypeRtp StreamType = iota
// StreamTypeRtcp means that the stream contains RTCP packets // StreamTypeRtcp means that the stream contains RTCP packets
StreamTypeRtcp StreamTypeRtcp