server: allow a TCP session to be used by another connection after PAUSE

This commit is contained in:
aler9
2021-10-27 19:25:08 +02:00
parent eb7bf2614b
commit 57dbac2f3a
2 changed files with 217 additions and 210 deletions

View File

@@ -413,6 +413,7 @@ func (sc *ServerConn) handleRequest(req *base.Request) (*base.Response, error) {
_, res, err := sc.handleRequestInSession(sxID, req, false)
if _, ok := err.(liberrors.ErrServerTCPFramesDisable); ok {
sc.tcpSession = nil
sc.tcpFrameSetEnabled = false
return res, nil
}

View File

@@ -505,25 +505,26 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Tracks: tracks,
})
if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStatePrePublish
ss.setuppedPath = &path
ss.setuppedQuery = &query
ss.setuppedBaseURL = req.URL
ss.announcedTracks = make([]ServerSessionAnnouncedTrack, len(tracks))
for trackID, track := range tracks {
clockRate, _ := track.ClockRate()
ss.announcedTracks[trackID] = ServerSessionAnnouncedTrack{
track: track,
rtcpReceiver: rtcpreceiver.New(nil, clockRate),
}
}
v := time.Now().Unix()
ss.udpLastFrameTime = &v
if res.StatusCode != base.StatusOK {
return res, err
}
ss.state = ServerSessionStatePrePublish
ss.setuppedPath = &path
ss.setuppedQuery = &query
ss.setuppedBaseURL = req.URL
ss.announcedTracks = make([]ServerSessionAnnouncedTrack, len(tracks))
for trackID, track := range tracks {
clockRate, _ := track.ClockRate()
ss.announcedTracks[trackID] = ServerSessionAnnouncedTrack{
track: track,
rtcpReceiver: rtcpreceiver.New(nil, clockRate),
}
}
v := time.Now().Unix()
ss.udpLastFrameTime = &v
return res, err
case base.Setup:
@@ -648,89 +649,6 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Transport: transport,
})
if res.StatusCode == base.StatusOK {
if ss.state == ServerSessionStateInitial {
err := stream.readerAdd(ss,
transport,
inTH.ClientPorts,
)
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, err
}
ss.state = ServerSessionStatePreRead
ss.setuppedPath = &path
ss.setuppedQuery = &query
ss.setuppedStream = stream
}
th := headers.Transport{}
if ss.state == ServerSessionStatePreRead {
ssrc := stream.ssrc(trackID)
if ssrc != 0 {
th.SSRC = &ssrc
}
}
ss.setuppedTransport = &transport
if res.Header == nil {
res.Header = make(base.Header)
}
sst := ServerSessionSetuppedTrack{}
switch transport {
case TransportUDP:
sst.udpRTPPort = inTH.ClientPorts[0]
sst.udpRTCPPort = inTH.ClientPorts[1]
th.Protocol = headers.TransportProtocolUDP
de := headers.TransportDeliveryUnicast
th.Delivery = &de
th.ClientPorts = inTH.ClientPorts
th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()}
case TransportUDPMulticast:
th.Protocol = headers.TransportProtocolUDP
de := headers.TransportDeliveryMulticast
th.Delivery = &de
v := uint(127)
th.TTL = &v
d := stream.multicastListeners[trackID].rtpListener.ip()
th.Destination = &d
th.Ports = &[2]int{
stream.multicastListeners[trackID].rtpListener.port(),
stream.multicastListeners[trackID].rtcpListener.port(),
}
default: // TCP
sst.tcpChannel = inTH.InterleavedIDs[0]
if ss.setuppedTracksByChannel == nil {
ss.setuppedTracksByChannel = make(map[int]int)
}
ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]] = trackID
th.Protocol = headers.TransportProtocolTCP
de := headers.TransportDeliveryUnicast
th.Delivery = &de
th.InterleavedIDs = inTH.InterleavedIDs
}
if ss.setuppedTracks == nil {
ss.setuppedTracks = make(map[int]ServerSessionSetuppedTrack)
}
ss.setuppedTracks[trackID] = sst
res.Header["Transport"] = th.Write()
}
// workaround to prevent a bug in rtspclientsink
// that makes impossible for the client to receive the response
// and send frames.
@@ -743,6 +661,91 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}
}
if res.StatusCode != base.StatusOK {
return res, err
}
if ss.state == ServerSessionStateInitial {
err := stream.readerAdd(ss,
transport,
inTH.ClientPorts,
)
if err != nil {
return &base.Response{
StatusCode: base.StatusBadRequest,
}, err
}
ss.state = ServerSessionStatePreRead
ss.setuppedPath = &path
ss.setuppedQuery = &query
ss.setuppedStream = stream
}
th := headers.Transport{}
if ss.state == ServerSessionStatePreRead {
ssrc := stream.ssrc(trackID)
if ssrc != 0 {
th.SSRC = &ssrc
}
}
ss.setuppedTransport = &transport
if res.Header == nil {
res.Header = make(base.Header)
}
sst := ServerSessionSetuppedTrack{}
switch transport {
case TransportUDP:
sst.udpRTPPort = inTH.ClientPorts[0]
sst.udpRTCPPort = inTH.ClientPorts[1]
th.Protocol = headers.TransportProtocolUDP
de := headers.TransportDeliveryUnicast
th.Delivery = &de
th.ClientPorts = inTH.ClientPorts
th.ServerPorts = &[2]int{sc.s.udpRTPListener.port(), sc.s.udpRTCPListener.port()}
case TransportUDPMulticast:
th.Protocol = headers.TransportProtocolUDP
de := headers.TransportDeliveryMulticast
th.Delivery = &de
v := uint(127)
th.TTL = &v
d := stream.multicastListeners[trackID].rtpListener.ip()
th.Destination = &d
th.Ports = &[2]int{
stream.multicastListeners[trackID].rtpListener.port(),
stream.multicastListeners[trackID].rtcpListener.port(),
}
default: // TCP
sst.tcpChannel = inTH.InterleavedIDs[0]
if ss.setuppedTracksByChannel == nil {
ss.setuppedTracksByChannel = make(map[int]int)
}
ss.setuppedTracksByChannel[inTH.InterleavedIDs[0]] = trackID
th.Protocol = headers.TransportProtocolTCP
de := headers.TransportDeliveryUnicast
th.Delivery = &de
th.InterleavedIDs = inTH.InterleavedIDs
}
if ss.setuppedTracks == nil {
ss.setuppedTracks = make(map[int]ServerSessionSetuppedTrack)
}
ss.setuppedTracks[trackID] = sst
res.Header["Transport"] = th.Write()
return res, err
case base.Play:
@@ -790,74 +793,76 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Query: query,
})
if ss.state != ServerSessionStateRead {
if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStateRead
if res.StatusCode != base.StatusOK {
return res, err
}
if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc
}
if ss.state == ServerSessionStateRead {
return res, err
}
// add RTP-Info
var trackIDs []int
for trackID := range ss.setuppedTracks {
trackIDs = append(trackIDs, trackID)
}
sort.Slice(trackIDs, func(a, b int) bool {
return trackIDs[a] < trackIDs[b]
})
var ri headers.RTPInfo
for _, trackID := range trackIDs {
ts := ss.setuppedStream.timestamp(trackID)
if ts == 0 {
continue
}
ss.state = ServerSessionStateRead
u := &base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10),
}
if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc
}
lsn := ss.setuppedStream.lastSequenceNumber(trackID)
ri = append(ri, &headers.RTPInfoEntry{
URL: u.String(),
SequenceNumber: &lsn,
Timestamp: &ts,
})
}
if len(ri) > 0 {
if res.Header == nil {
res.Header = make(base.Header)
}
res.Header["RTP-Info"] = ri.Write()
}
ss.setuppedStream.readerSetActive(ss)
switch *ss.setuppedTransport {
case TransportUDP:
for trackID, track := range ss.setuppedTracks {
// readers can send RTCP packets
sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false)
// open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTCP,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
return res, err
case TransportUDPMulticast:
default: // TCP
err = liberrors.ErrServerTCPFramesEnable{}
}
return res, err
// add RTP-Info
var trackIDs []int
for trackID := range ss.setuppedTracks {
trackIDs = append(trackIDs, trackID)
}
sort.Slice(trackIDs, func(a, b int) bool {
return trackIDs[a] < trackIDs[b]
})
var ri headers.RTPInfo
for _, trackID := range trackIDs {
ts := ss.setuppedStream.timestamp(trackID)
if ts == 0 {
continue
}
u := &base.URL{
Scheme: req.URL.Scheme,
User: req.URL.User,
Host: req.URL.Host,
Path: "/" + *ss.setuppedPath + "/trackID=" + strconv.FormatInt(int64(trackID), 10),
}
lsn := ss.setuppedStream.lastSequenceNumber(trackID)
ri = append(ri, &headers.RTPInfoEntry{
URL: u.String(),
SequenceNumber: &lsn,
Timestamp: &ts,
})
}
if len(ri) > 0 {
if res.Header == nil {
res.Header = make(base.Header)
}
res.Header["RTP-Info"] = ri.Write()
}
ss.setuppedStream.readerSetActive(ss)
switch *ss.setuppedTransport {
case TransportUDP:
for trackID, track := range ss.setuppedTracks {
// readers can send RTCP packets
sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false)
// open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTCP,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
return res, err
case TransportUDPMulticast:
default: // TCP
err = liberrors.ErrServerTCPFramesEnable{}
}
return res, err
@@ -909,32 +914,31 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Query: query,
})
if res.StatusCode == base.StatusOK {
ss.state = ServerSessionStatePublish
switch *ss.setuppedTransport {
case TransportUDP:
for trackID, track := range ss.setuppedTracks {
ss.s.udpRTPListener.addClient(ss.author.ip(), track.udpRTPPort, ss, trackID, true)
ss.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, true)
// open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTP,
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
ss.WriteFrame(trackID, StreamTypeRTCP,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
case TransportUDPMulticast:
default: // TCP
err = liberrors.ErrServerTCPFramesEnable{}
}
if res.StatusCode != base.StatusOK {
ss.tcpConn = nil
return res, err
}
ss.tcpConn = nil
ss.state = ServerSessionStatePublish
switch *ss.setuppedTransport {
case TransportUDP:
for trackID, track := range ss.setuppedTracks {
ss.s.udpRTPListener.addClient(ss.author.ip(), track.udpRTPPort, ss, trackID, true)
ss.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, true)
// open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTP,
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
ss.WriteFrame(trackID, StreamTypeRTCP,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
}
case TransportUDPMulticast:
default: // TCP
err = liberrors.ErrServerTCPFramesEnable{}
}
return res, err
@@ -971,38 +975,40 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
Query: query,
})
if res.StatusCode == base.StatusOK {
switch ss.state {
case ServerSessionStateRead:
ss.setuppedStream.readerSetInactive(ss)
if res.StatusCode != base.StatusOK {
return res, err
}
ss.state = ServerSessionStatePreRead
ss.tcpConn = nil
switch ss.state {
case ServerSessionStateRead:
ss.setuppedStream.readerSetInactive(ss)
switch *ss.setuppedTransport {
case TransportUDP:
ss.s.udpRTCPListener.removeClient(ss)
ss.state = ServerSessionStatePreRead
ss.tcpConn = nil
case TransportUDPMulticast:
switch *ss.setuppedTransport {
case TransportUDP:
ss.s.udpRTCPListener.removeClient(ss)
default: // TCP
err = liberrors.ErrServerTCPFramesDisable{}
}
case TransportUDPMulticast:
case ServerSessionStatePublish:
ss.state = ServerSessionStatePrePublish
ss.tcpConn = nil
default: // TCP
err = liberrors.ErrServerTCPFramesDisable{}
}
switch *ss.setuppedTransport {
case TransportUDP:
ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss)
case ServerSessionStatePublish:
ss.state = ServerSessionStatePrePublish
ss.tcpConn = nil
case TransportUDPMulticast:
switch *ss.setuppedTransport {
case TransportUDP:
ss.s.udpRTPListener.removeClient(ss)
ss.s.udpRTCPListener.removeClient(ss)
default: // TCP
err = liberrors.ErrServerTCPFramesDisable{}
}
case TransportUDPMulticast:
default: // TCP
err = liberrors.ErrServerTCPFramesDisable{}
}
}