server: allow reading RTCP packets from readers (#27)

This commit is contained in:
aler9
2021-03-13 16:54:52 +01:00
parent 73efb1437c
commit 9be2e5f4ed
2 changed files with 61 additions and 41 deletions

View File

@@ -261,6 +261,11 @@ func (sc *ServerConn) frameModeEnable() {
case ServerConnStatePlay:
if *sc.setuppedTracksProtocol == StreamProtocolTCP {
sc.doEnableFrames = true
} else {
// readers can send RTCP frames, they cannot sent RTP frames
for trackID, track := range sc.setuppedTracks {
sc.udpRTCPListener.addClient(sc.ip(), track.rtcpPort, sc, trackID, false)
}
}
case ServerConnStateRecord:
@@ -270,8 +275,8 @@ func (sc *ServerConn) frameModeEnable() {
} else {
for trackID, track := range sc.setuppedTracks {
sc.udpRTPListener.addPublisher(sc.ip(), track.rtpPort, trackID, sc)
sc.udpRTCPListener.addPublisher(sc.ip(), track.rtcpPort, trackID, sc)
sc.udpRTPListener.addClient(sc.ip(), track.rtpPort, sc, trackID, true)
sc.udpRTCPListener.addClient(sc.ip(), track.rtcpPort, sc, trackID, true)
// open the firewall by sending packets to the counterpart
sc.WriteFrame(trackID, StreamTypeRTP,
@@ -294,6 +299,11 @@ func (sc *ServerConn) frameModeDisable() {
sc.framesEnabled = false
sc.frameRingBuffer.Close()
<-sc.backgroundWriteDone
} else {
for _, track := range sc.setuppedTracks {
sc.udpRTCPListener.removeClient(sc.ip(), track.rtcpPort)
}
}
case ServerConnStateRecord:
@@ -310,8 +320,8 @@ func (sc *ServerConn) frameModeDisable() {
} else {
for _, track := range sc.setuppedTracks {
sc.udpRTPListener.removePublisher(sc.ip(), track.rtpPort)
sc.udpRTCPListener.removePublisher(sc.ip(), track.rtcpPort)
sc.udpRTPListener.removeClient(sc.ip(), track.rtpPort)
sc.udpRTCPListener.removeClient(sc.ip(), track.rtcpPort)
}
}
}
@@ -600,6 +610,9 @@ func (sc *ServerConn) handleRequest(req *base.Request) (*base.Response, error) {
rtcpPort: th.ClientPorts[1],
}
if res.Header == nil {
res.Header = make(base.Header)
}
res.Header["Transport"] = headers.Transport{
Protocol: StreamProtocolUDP,
Delivery: func() *base.StreamDelivery {
@@ -613,6 +626,9 @@ func (sc *ServerConn) handleRequest(req *base.Request) (*base.Response, error) {
} else {
sc.setuppedTracks[trackID] = ServerConnSetuppedTrack{}
if res.Header == nil {
res.Header = make(base.Header)
}
res.Header["Transport"] = headers.Transport{
Protocol: StreamProtocolTCP,
InterleavedIds: th.InterleavedIds,

View File

@@ -19,17 +19,18 @@ type bufAddrPair struct {
addr *net.UDPAddr
}
type publisherData struct {
publisher *ServerConn
trackID int
type clientData struct {
sc *ServerConn
trackID int
isPublishing bool
}
type publisherAddr struct {
type clientAddr struct {
ip [net.IPv6len]byte // use a fixed-size array to enable the equality operator
port int
}
func (p *publisherAddr) fill(ip net.IP, port int) {
func (p *clientAddr) fill(ip net.IP, port int) {
p.port = port
if len(ip) == net.IPv4len {
@@ -41,13 +42,13 @@ func (p *publisherAddr) fill(ip net.IP, port int) {
}
type serverUDPListener struct {
pc *net.UDPConn
streamType StreamType
writeTimeout time.Duration
readBuf *multibuffer.MultiBuffer
publishersMutex sync.RWMutex
publishers map[publisherAddr]*publisherData
ringBuffer *ringbuffer.RingBuffer
pc *net.UDPConn
streamType StreamType
writeTimeout time.Duration
readBuf *multibuffer.MultiBuffer
clientsMutex sync.RWMutex
clients map[clientAddr]*clientData
ringBuffer *ringbuffer.RingBuffer
// out
done chan struct{}
@@ -70,9 +71,9 @@ func newServerUDPListener(
}
s := &serverUDPListener{
pc: pc,
publishers: make(map[publisherAddr]*publisherData),
done: make(chan struct{}),
pc: pc,
clients: make(map[clientAddr]*clientData),
done: make(chan struct{}),
}
s.streamType = streamType
@@ -108,21 +109,23 @@ func (s *serverUDPListener) run() {
}
func() {
s.publishersMutex.RLock()
defer s.publishersMutex.RUnlock()
s.clientsMutex.RLock()
defer s.clientsMutex.RUnlock()
// find publisher data
var pubAddr publisherAddr
pubAddr.fill(addr.IP, addr.Port)
pubData, ok := s.publishers[pubAddr]
var clientAddr clientAddr
clientAddr.fill(addr.IP, addr.Port)
clientData, ok := s.clients[clientAddr]
if !ok {
return
}
now := time.Now()
atomic.StoreInt64(pubData.publisher.announcedTracks[pubData.trackID].udpLastFrameTime, now.Unix())
pubData.publisher.announcedTracks[pubData.trackID].rtcpReceiver.ProcessFrame(now, s.streamType, buf[:n])
pubData.publisher.readHandlers.OnFrame(pubData.trackID, s.streamType, buf[:n])
if clientData.isPublishing {
now := time.Now()
atomic.StoreInt64(clientData.sc.announcedTracks[clientData.trackID].udpLastFrameTime, now.Unix())
clientData.sc.announcedTracks[clientData.trackID].rtcpReceiver.ProcessFrame(now, s.streamType, buf[:n])
}
clientData.sc.readHandlers.OnFrame(clientData.trackID, s.streamType, buf[:n])
}()
}
}()
@@ -154,25 +157,26 @@ func (s *serverUDPListener) write(buf []byte, addr *net.UDPAddr) {
s.ringBuffer.Push(bufAddrPair{buf, addr})
}
func (s *serverUDPListener) addPublisher(ip net.IP, port int, trackID int, sc *ServerConn) {
s.publishersMutex.Lock()
defer s.publishersMutex.Unlock()
func (s *serverUDPListener) addClient(ip net.IP, port int, sc *ServerConn, trackID int, isPublishing bool) {
s.clientsMutex.Lock()
defer s.clientsMutex.Unlock()
var addr publisherAddr
var addr clientAddr
addr.fill(ip, port)
s.publishers[addr] = &publisherData{
publisher: sc,
trackID: trackID,
s.clients[addr] = &clientData{
sc: sc,
trackID: trackID,
isPublishing: isPublishing,
}
}
func (s *serverUDPListener) removePublisher(ip net.IP, port int) {
s.publishersMutex.Lock()
defer s.publishersMutex.Unlock()
func (s *serverUDPListener) removeClient(ip net.IP, port int) {
s.clientsMutex.Lock()
defer s.clientsMutex.Unlock()
var addr publisherAddr
var addr clientAddr
addr.fill(ip, port)
delete(s.publishers, addr)
delete(s.clients, addr)
}