new client api

This commit is contained in:
aler9
2020-11-15 17:21:39 +01:00
parent b86fdca8b4
commit 65f6afcd9f
13 changed files with 927 additions and 611 deletions

View File

@@ -34,7 +34,7 @@ const (
clientUDPFrameReadBufferSize = 2048
)
type connClientState int
type connClientState int32
const (
connClientStateInitial connClientState = iota
@@ -42,8 +42,34 @@ const (
connClientStatePlay
connClientStatePreRecord
connClientStateRecord
connClientStateUDPError
)
func (s connClientState) String() string {
switch s {
case connClientStateInitial:
return "initial"
case connClientStatePrePlay:
return "prePlay"
case connClientStatePlay:
return "play"
case connClientStatePreRecord:
return "preRecord"
case connClientStateRecord:
return "record"
case connClientStateUDPError:
return "udpError"
}
return "uknown"
}
func (s *connClientState) load() connClientState {
return connClientState(atomic.LoadInt32((*int32)(s)))
}
func (s *connClientState) store(v connClientState) {
atomic.StoreInt32((*int32)(s), int32(v))
}
// ConnClient is a client-side RTSP connection.
type ConnClient struct {
d Dialer
@@ -53,7 +79,7 @@ type ConnClient struct {
session string
cseq int
auth *auth.Client
state connClientState
state *connClientState
streamUrl *base.URL
streamProtocol *StreamProtocol
tracks Tracks
@@ -64,18 +90,23 @@ type ConnClient struct {
response *base.Response
frame *base.InterleavedFrame
tcpFrameBuffer *multibuffer.MultiBuffer
readFrameFunc func() (int, StreamType, []byte, error)
writeFrameFunc func(trackId int, streamType StreamType, content []byte) error
getParameterSupported bool
backgroundUDPError error
reportWriterTerminate chan struct{}
reportWriterDone chan struct{}
backgroundTerminate chan struct{}
backgroundDone chan struct{}
udpFrame chan base.InterleavedFrame
}
// Close closes all the ConnClient resources.
func (c *ConnClient) Close() error {
if c.state == connClientStatePlay {
close(c.reportWriterTerminate)
<-c.reportWriterDone
s := c.state.load()
if s == connClientStatePlay {
close(c.backgroundTerminate)
<-c.backgroundDone
c.Do(&base.Request{
Method: base.TEARDOWN,
@@ -84,7 +115,14 @@ func (c *ConnClient) Close() error {
})
}
err := c.nconn.Close()
if s == connClientStatePlay {
if *c.streamProtocol == StreamProtocolUDP {
go func() {
for range c.udpFrame {
}
}()
}
}
for _, l := range c.udpRtpListeners {
l.close()
@@ -94,16 +132,28 @@ func (c *ConnClient) Close() error {
l.close()
}
if s == connClientStatePlay {
if *c.streamProtocol == StreamProtocolUDP {
close(c.udpFrame)
}
}
err := c.nconn.Close()
return err
}
func (c *ConnClient) checkState(allowed map[connClientState]struct{}) error {
if _, ok := allowed[c.state]; ok {
return nil
func (c *ConnClient) checkState(allowed map[connClientState]struct{}) (connClientState, error) {
s := c.state.load()
if _, ok := allowed[s]; ok {
return s, nil
}
return fmt.Errorf("client must be in state %v, while is in state %v",
allowed, c.state)
var allowedList []connClientState
for s := range allowed {
allowedList = append(allowedList, s)
}
return 0, fmt.Errorf("client must be in state %v, while is in state %v",
allowedList, s)
}
// NetConn returns the underlying net.Conn.
@@ -123,68 +173,8 @@ func (c *ConnClient) readFrameTCPOrResponse() (interface{}, error) {
return base.ReadInterleavedFrameOrResponse(c.frame, c.response, c.br)
}
// ReadFrameUDP reads an UDP frame.
func (c *ConnClient) ReadFrameUDP(trackId int, streamType StreamType) ([]byte, error) {
var buf []byte
var err error
if streamType == StreamTypeRtp {
buf, err = c.udpRtpListeners[trackId].read()
} else {
buf, err = c.udpRtcpListeners[trackId].read()
}
if err != nil {
return nil, err
}
atomic.StoreInt64(c.udpLastFrameTimes[trackId], time.Now().Unix())
c.rtcpReceivers[trackId].OnFrame(streamType, buf)
return buf, nil
}
// ReadFrameTCP reads an InterleavedFrame.
// This can't be used when publishing.
func (c *ConnClient) ReadFrameTCP() (int, StreamType, []byte, error) {
c.frame.Content = c.tcpFrameBuffer.Next()
c.nconn.SetReadDeadline(time.Now().Add(c.d.ReadTimeout))
err := c.frame.Read(c.br)
if err != nil {
return 0, 0, nil, err
}
c.rtcpReceivers[c.frame.TrackId].OnFrame(c.frame.StreamType, c.frame.Content)
return c.frame.TrackId, c.frame.StreamType, c.frame.Content, nil
}
func (c *ConnClient) writeFrameUDP(trackId int, streamType StreamType, content []byte) error {
if streamType == StreamTypeRtp {
return c.udpRtpListeners[trackId].write(content)
}
return c.udpRtcpListeners[trackId].write(content)
}
func (c *ConnClient) writeFrameTCP(trackId int, streamType StreamType, content []byte) error {
frame := base.InterleavedFrame{
TrackId: trackId,
StreamType: streamType,
Content: content,
}
c.nconn.SetWriteDeadline(time.Now().Add(c.d.WriteTimeout))
return frame.Write(c.bw)
}
// WriteFrame writes a frame.
// This can be used only after Record().
func (c *ConnClient) WriteFrame(trackId int, streamType StreamType, content []byte) error {
return c.writeFrameFunc(trackId, streamType, content)
}
// Do writes a Request and reads a Response.
// Interleaved frames sent before the response are ignored.
// Interleaved frames received before the response are ignored.
func (c *ConnClient) Do(req *base.Request) (*base.Response, error) {
if req.Header == nil {
req.Header = make(base.Header)
@@ -262,7 +252,7 @@ func (c *ConnClient) Do(req *base.Request) (*base.Response, error) {
// Since this method is not implemented by every RTSP server, the function
// does not fail if the returned code is StatusNotFound.
func (c *ConnClient) Options(u *base.URL) (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
_, err := c.checkState(map[connClientState]struct{}{
connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
@@ -302,7 +292,7 @@ func (c *ConnClient) Options(u *base.URL) (*base.Response, error) {
// Describe writes a DESCRIBE request and reads a Response.
func (c *ConnClient) Describe(u *base.URL) (Tracks, *base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
_, err := c.checkState(map[connClientState]struct{}{
connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
@@ -400,7 +390,7 @@ func (c *ConnClient) urlForTrack(baseUrl *base.URL, mode headers.TransportMode,
// if rtpPort and rtcpPort are zero, they are chosen automatically.
func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.StreamProtocol,
track *Track, rtpPort int, rtcpPort int) (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
s, err := c.checkState(map[connClientState]struct{}{
connClientStateInitial: {},
connClientStatePrePlay: {},
connClientStatePreRecord: {},
@@ -409,12 +399,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
return nil, err
}
if mode == headers.TransportModeRecord && c.state != connClientStatePreRecord {
if mode == headers.TransportModeRecord && s != connClientStatePreRecord {
return nil, fmt.Errorf("cannot read and publish at the same time")
}
if mode == headers.TransportModePlay && c.state != connClientStatePrePlay &&
c.state != connClientStateInitial {
if mode == headers.TransportModePlay && s != connClientStatePrePlay &&
s != connClientStateInitial {
return nil, fmt.Errorf("cannot read and publish at the same time")
}
@@ -451,12 +441,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
var err error
rtpListener, rtcpListener, err = func() (*connClientUDPListener, *connClientUDPListener, error) {
if rtpPort != 0 {
rtpListener, err := newConnClientUDPListener(c.d, rtpPort)
rtpListener, err := newConnClientUDPListener(c, rtpPort)
if err != nil {
return nil, nil, err
}
rtcpListener, err := newConnClientUDPListener(c.d, rtcpPort)
rtcpListener, err := newConnClientUDPListener(c, rtcpPort)
if err != nil {
rtpListener.close()
return nil, nil, err
@@ -471,12 +461,12 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
rtpPort = (rand.Intn((65535-10000)/2) * 2) + 10000
rtcpPort = rtpPort + 1
rtpListener, err := newConnClientUDPListener(c.d, rtpPort)
rtpListener, err := newConnClientUDPListener(c, rtpPort)
if err != nil {
continue
}
rtcpListener, err := newConnClientUDPListener(c.d, rtcpPort)
rtcpListener, err := newConnClientUDPListener(c, rtcpPort)
if err != nil {
rtpListener.close()
continue
@@ -545,8 +535,7 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
}
c.streamUrl = u
streamProtocol := proto
c.streamProtocol = &streamProtocol
c.streamProtocol = &proto
c.tracks = append(c.tracks, track)
@@ -563,241 +552,31 @@ func (c *ConnClient) Setup(u *base.URL, mode headers.TransportMode, proto base.S
rtpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone
rtpListener.remotePort = (*th.ServerPorts)[0]
rtpListener.trackId = track.Id
rtpListener.streamType = StreamTypeRtp
c.udpRtpListeners[track.Id] = rtpListener
rtcpListener.remoteIp = c.nconn.RemoteAddr().(*net.TCPAddr).IP
rtcpListener.remoteZone = c.nconn.RemoteAddr().(*net.TCPAddr).Zone
rtcpListener.remotePort = (*th.ServerPorts)[1]
rtcpListener.trackId = track.Id
rtcpListener.streamType = StreamTypeRtcp
c.udpRtcpListeners[track.Id] = rtcpListener
}
if mode == headers.TransportModePlay {
c.state = connClientStatePrePlay
*c.state = connClientStatePrePlay
} else {
c.state = connClientStatePreRecord
*c.state = connClientStatePreRecord
}
return res, nil
}
// Play writes a PLAY request and reads a Response.
// This can be called only after Setup().
func (c *ConnClient) Play() (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
connClientStatePrePlay: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.PLAY,
URL: c.streamUrl,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
if *c.streamProtocol == StreamProtocolUDP {
c.writeFrameFunc = c.writeFrameUDP
} else {
c.writeFrameFunc = c.writeFrameTCP
}
c.state = connClientStatePlay
// open the firewall by sending packets to the counterpart
if *c.streamProtocol == StreamProtocolUDP {
for trackId := range c.udpRtpListeners {
c.WriteFrame(trackId, StreamTypeRtp,
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
c.WriteFrame(trackId, StreamTypeRtcp,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
}
c.reportWriterTerminate = make(chan struct{})
c.reportWriterDone = make(chan struct{})
go func() {
defer close(c.reportWriterDone)
reportWriterTicker := time.NewTicker(clientReceiverReportPeriod)
defer reportWriterTicker.Stop()
for {
select {
case <-c.reportWriterTerminate:
return
case <-reportWriterTicker.C:
for trackId := range c.rtcpReceivers {
frame := c.rtcpReceivers[trackId].Report()
c.WriteFrame(trackId, StreamTypeRtcp, frame)
}
}
}
}()
return res, nil
}
// Announce writes an ANNOUNCE request and reads a Response.
func (c *ConnClient) Announce(u *base.URL, tracks Tracks) (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
connClientStateInitial: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.ANNOUNCE,
URL: u,
Header: base.Header{
"Content-Type": base.HeaderValue{"application/sdp"},
},
Content: tracks.Write(),
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
c.streamUrl = u
c.state = connClientStatePreRecord
return res, nil
}
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ConnClient) Record() (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
connClientStatePreRecord: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.RECORD,
URL: c.streamUrl,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
if *c.streamProtocol == StreamProtocolUDP {
c.writeFrameFunc = c.writeFrameUDP
} else {
c.writeFrameFunc = c.writeFrameTCP
}
c.state = connClientStateRecord
return nil, nil
}
// LoopUDP must be called after Play() or Record(); it keeps
// the TCP connection open with keepalives, and returns when the TCP
// connection closes.
func (c *ConnClient) LoopUDP() error {
err := c.checkState(map[connClientState]struct{}{
connClientStatePlay: {},
connClientStateRecord: {},
})
if err != nil {
return err
}
if *c.streamProtocol != StreamProtocolUDP {
return fmt.Errorf("stream protocol is not UDP")
}
if c.state == connClientStatePlay {
readDone := make(chan error)
go func() {
for {
c.nconn.SetReadDeadline(time.Now().Add(clientUDPKeepalivePeriod + c.d.ReadTimeout))
var res base.Response
err := res.Read(c.br)
if err != nil {
readDone <- err
return
}
}
}()
keepaliveTicker := time.NewTicker(clientUDPKeepalivePeriod)
defer keepaliveTicker.Stop()
checkStreamTicker := time.NewTicker(clientUDPCheckStreamPeriod)
defer checkStreamTicker.Stop()
for {
select {
case err := <-readDone:
c.nconn.Close()
return err
case <-keepaliveTicker.C:
_, err := c.Do(&base.Request{
Method: func() base.Method {
// the vlc integrated rtsp server requires GET_PARAMETER
if c.getParameterSupported {
return base.GET_PARAMETER
}
return base.OPTIONS
}(),
// use the stream path, otherwise some cameras do not reply
URL: c.streamUrl,
SkipResponse: true,
})
if err != nil {
c.nconn.Close()
<-readDone
return err
}
case <-checkStreamTicker.C:
now := time.Now()
for _, lastUnix := range c.udpLastFrameTimes {
last := time.Unix(atomic.LoadInt64(lastUnix), 0)
if now.Sub(last) >= c.d.ReadTimeout {
c.nconn.Close()
<-readDone
return fmt.Errorf("no packets received recently (maybe there's a firewall/NAT in between)")
}
}
}
}
}
// connClientStateRecord
c.nconn.SetReadDeadline(time.Time{}) // disable deadline
var res base.Response
return res.Read(c.br)
}
// Pause writes a PAUSE request and reads a Response.
// This can be called only after Play() or Record().
func (c *ConnClient) Pause() (*base.Response, error) {
err := c.checkState(map[connClientState]struct{}{
s, err := c.checkState(map[connClientState]struct{}{
connClientStatePlay: {},
connClientStateRecord: {},
})
@@ -805,9 +584,24 @@ func (c *ConnClient) Pause() (*base.Response, error) {
return nil, err
}
if c.state == connClientStatePlay {
close(c.reportWriterTerminate)
<-c.reportWriterDone
close(c.backgroundTerminate)
<-c.backgroundDone
if s == connClientStatePlay {
if *c.streamProtocol == StreamProtocolUDP {
ch := c.udpFrame
go func() {
for range ch {
}
}()
for trackId := range c.udpRtpListeners {
c.udpRtpListeners[trackId].stop()
c.udpRtcpListeners[trackId].stop()
}
close(ch)
}
}
res, err := c.Do(&base.Request{
@@ -822,10 +616,11 @@ func (c *ConnClient) Pause() (*base.Response, error) {
return nil, fmt.Errorf("bad status code: %d (%s)", res.StatusCode, res.StatusMessage)
}
if c.state == connClientStatePlay {
c.state = connClientStatePrePlay
} else {
c.state = connClientStatePreRecord
switch s {
case connClientStatePlay:
c.state.store(connClientStatePrePlay)
case connClientStateRecord:
c.state.store(connClientStatePreRecord)
}
return res, nil