client: cleanup

This commit is contained in:
aler9
2021-03-25 22:10:14 +01:00
parent 03137958c6
commit b864028ea8
2 changed files with 65 additions and 81 deletions

View File

@@ -54,46 +54,7 @@ func (c *ClientConn) Announce(u *base.URL, tracks Tracks) (*base.Response, error
return res, nil
}
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ClientConn) Record() (*base.Response, error) {
err := c.checkState(map[clientConnState]struct{}{
clientConnStatePreRecord: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.Record,
URL: c.streamURL,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientWrongStatusCode{
Code: res.StatusCode, Message: res.StatusMessage}
}
c.state = clientConnStateRecord
c.publishOpen = true
c.backgroundTerminate = make(chan struct{})
c.backgroundDone = make(chan struct{})
if *c.streamProtocol == StreamProtocolUDP {
go c.backgroundRecordUDP()
} else {
go c.backgroundRecordTCP()
}
return nil, nil
}
func (c *ClientConn) backgroundRecordUDP() {
defer close(c.backgroundDone)
defer func() {
c.publishWriteMutex.Lock()
defer c.publishWriteMutex.Unlock()
@@ -145,8 +106,6 @@ func (c *ClientConn) backgroundRecordUDP() {
}
func (c *ClientConn) backgroundRecordTCP() {
defer close(c.backgroundDone)
defer func() {
c.publishWriteMutex.Lock()
defer c.publishWriteMutex.Unlock()
@@ -181,6 +140,47 @@ func (c *ClientConn) backgroundRecordTCP() {
}
}
// Record writes a RECORD request and reads a Response.
// This can be called only after Announce() and Setup().
func (c *ClientConn) Record() (*base.Response, error) {
err := c.checkState(map[clientConnState]struct{}{
clientConnStatePreRecord: {},
})
if err != nil {
return nil, err
}
res, err := c.Do(&base.Request{
Method: base.Record,
URL: c.streamURL,
})
if err != nil {
return nil, err
}
if res.StatusCode != base.StatusOK {
return nil, liberrors.ErrClientWrongStatusCode{
Code: res.StatusCode, Message: res.StatusMessage}
}
c.state = clientConnStateRecord
c.publishOpen = true
c.backgroundTerminate = make(chan struct{})
c.backgroundDone = make(chan struct{})
go func() {
defer close(c.backgroundDone)
if *c.streamProtocol == StreamProtocolUDP {
c.backgroundRecordUDP()
} else {
c.backgroundRecordTCP()
}
}()
return nil, nil
}
// WriteFrame writes a frame.
// This can be called only after Record().
func (c *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []byte) error {

View File

@@ -50,20 +50,7 @@ func (c *ClientConn) RTPInfo() *headers.RTPInfo {
return c.rtpInfo
}
func (c *ClientConn) backgroundPlayUDP(done chan error) {
defer close(c.backgroundDone)
var returnError error
defer func() {
for trackID := range c.udpRTPListeners {
c.udpRTPListeners[trackID].stop()
c.udpRTCPListeners[trackID].stop()
}
done <- returnError
}()
func (c *ClientConn) backgroundPlayUDP() error {
// open the firewall by sending packets to the counterpart
for trackID := range c.udpRTPListeners {
c.udpRTPListeners[trackID].write(
@@ -78,6 +65,13 @@ func (c *ClientConn) backgroundPlayUDP(done chan error) {
c.udpRTCPListeners[trackID].start()
}
defer func() {
for trackID := range c.udpRTPListeners {
c.udpRTPListeners[trackID].stop()
c.udpRTCPListeners[trackID].stop()
}
}()
// disable deadline
c.nconn.SetReadDeadline(time.Time{})
@@ -107,8 +101,7 @@ func (c *ClientConn) backgroundPlayUDP(done chan error) {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = fmt.Errorf("terminated")
return
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
@@ -133,8 +126,7 @@ func (c *ClientConn) backgroundPlayUDP(done chan error) {
if err != nil {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = err
return
return err
}
case <-checkStreamTicker.C:
@@ -146,27 +138,17 @@ func (c *ClientConn) backgroundPlayUDP(done chan error) {
if now.Sub(last) >= c.conf.ReadTimeout {
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = liberrors.ErrClientNoUDPPacketsRecently{}
return
return liberrors.ErrClientNoUDPPacketsRecently{}
}
}
case err := <-readerDone:
returnError = err
return
return err
}
}
}
func (c *ClientConn) backgroundPlayTCP(done chan error) {
defer close(c.backgroundDone)
var returnError error
defer func() {
done <- returnError
}()
func (c *ClientConn) backgroundPlayTCP() error {
readerDone := make(chan error)
go func() {
for {
@@ -201,8 +183,7 @@ func (c *ClientConn) backgroundPlayTCP(done chan error) {
case <-c.backgroundTerminate:
c.nconn.SetReadDeadline(time.Now())
<-readerDone
returnError = fmt.Errorf("terminated")
return
return fmt.Errorf("terminated")
case <-reportTicker.C:
now := time.Now()
@@ -218,8 +199,7 @@ func (c *ClientConn) backgroundPlayTCP(done chan error) {
}
case err := <-readerDone:
returnError = err
return
return err
}
}
}
@@ -244,11 +224,15 @@ func (c *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) chan erro
c.backgroundTerminate = make(chan struct{})
c.backgroundDone = make(chan struct{})
if *c.streamProtocol == StreamProtocolUDP {
go c.backgroundPlayUDP(done)
} else {
go c.backgroundPlayTCP(done)
}
go func() {
defer close(c.backgroundDone)
if *c.streamProtocol == StreamProtocolUDP {
done <- c.backgroundPlayUDP()
} else {
done <- c.backgroundPlayTCP()
}
}()
return done
}