server: slightly improve write performance

This commit is contained in:
aler9
2021-12-07 22:20:18 +01:00
parent 289c272469
commit bda1f3539c
2 changed files with 49 additions and 29 deletions

View File

@@ -138,9 +138,13 @@ func (s ServerSessionState) String() string {
// ServerSessionSetuppedTrack is a setupped track of a ServerSession. // ServerSessionSetuppedTrack is a setupped track of a ServerSession.
type ServerSessionSetuppedTrack struct { type ServerSessionSetuppedTrack struct {
tcpChannel int tcpChannel int
udpRTPPort int udpRTPPort int
udpRTCPPort int udpRTCPPort int
udpRTPAddr *net.UDPAddr
udpRTCPAddr *net.UDPAddr
tcpRTPFrame *base.InterleavedFrame
tcpRTCPFrame *base.InterleavedFrame
} }
// ServerSessionAnnouncedTrack is an announced track of a ServerSession. // ServerSessionAnnouncedTrack is an announced track of a ServerSession.
@@ -715,6 +719,18 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
sst.udpRTPPort = inTH.ClientPorts[0] sst.udpRTPPort = inTH.ClientPorts[0]
sst.udpRTCPPort = inTH.ClientPorts[1] sst.udpRTCPPort = inTH.ClientPorts[1]
sst.udpRTPAddr = &net.UDPAddr{
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: sst.udpRTPPort,
}
sst.udpRTCPAddr = &net.UDPAddr{
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: sst.udpRTCPPort,
}
th.Protocol = headers.TransportProtocolUDP th.Protocol = headers.TransportProtocolUDP
de := headers.TransportDeliveryUnicast de := headers.TransportDeliveryUnicast
th.Delivery = &de th.Delivery = &de
@@ -734,6 +750,14 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
default: // TCP default: // TCP
sst.tcpChannel = inTH.InterleavedIDs[0] sst.tcpChannel = inTH.InterleavedIDs[0]
sst.tcpRTPFrame = &base.InterleavedFrame{
Channel: sst.tcpChannel,
}
sst.tcpRTCPFrame = &base.InterleavedFrame{
Channel: sst.tcpChannel + 1,
}
if ss.tcpTracksByChannel == nil { if ss.tcpTracksByChannel == nil {
ss.tcpTracksByChannel = make(map[int]int) ss.tcpTracksByChannel = make(map[int]int)
} }
@@ -1125,36 +1149,28 @@ func (ss *ServerSession) runWriter() {
if *ss.setuppedTransport == TransportUDP { if *ss.setuppedTransport == TransportUDP {
writeFunc = func(trackID int, isRTP bool, payload []byte) { writeFunc = func(trackID int, isRTP bool, payload []byte) {
if isRTP { if isRTP {
ss.s.udpRTPListener.write(payload, &net.UDPAddr{ ss.s.udpRTPListener.write(payload, ss.setuppedTracks[trackID].udpRTPAddr)
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: ss.setuppedTracks[trackID].udpRTPPort,
})
} else { } else {
ss.s.udpRTCPListener.write(payload, &net.UDPAddr{ ss.s.udpRTCPListener.write(payload, ss.setuppedTracks[trackID].udpRTCPAddr)
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: ss.setuppedTracks[trackID].udpRTCPPort,
})
} }
} }
} else { } else {
writeFunc = func(trackID int, isRTP bool, payload []byte) { writeFunc = func(trackID int, isRTP bool, payload []byte) {
if isRTP { if isRTP {
f := ss.setuppedTracks[trackID].tcpRTPFrame
f.Payload = payload
ss.tcpConn.tcpWriteMutex.Lock() ss.tcpConn.tcpWriteMutex.Lock()
ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout)) ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout))
(&base.InterleavedFrame{ f.Write(ss.tcpConn.bw)
Channel: ss.setuppedTracks[trackID].tcpChannel,
Payload: payload,
}).Write(ss.tcpConn.bw)
ss.tcpConn.tcpWriteMutex.Unlock() ss.tcpConn.tcpWriteMutex.Unlock()
} else { } else {
f := ss.setuppedTracks[trackID].tcpRTCPFrame
f.Payload = payload
ss.tcpConn.tcpWriteMutex.Lock() ss.tcpConn.tcpWriteMutex.Lock()
ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout)) ss.tcpConn.nconn.SetWriteDeadline(time.Now().Add(ss.s.WriteTimeout))
(&base.InterleavedFrame{ f.Write(ss.tcpConn.bw)
Channel: ss.setuppedTracks[trackID].tcpChannel + 1,
Payload: payload,
}).Write(ss.tcpConn.bw)
ss.tcpConn.tcpWriteMutex.Unlock() ss.tcpConn.tcpWriteMutex.Unlock()
} }
} }

View File

@@ -57,6 +57,16 @@ func (h *multicastHandler) ip() net.IP {
func (h *multicastHandler) runWriter() { func (h *multicastHandler) runWriter() {
defer close(h.writerDone) defer close(h.writerDone)
rtpAddr := &net.UDPAddr{
IP: h.rtpl.ip(),
Port: h.rtpl.port(),
}
rtcpAddr := &net.UDPAddr{
IP: h.rtcpl.ip(),
Port: h.rtcpl.port(),
}
for { for {
tmp, ok := h.writeBuffer.Pull() tmp, ok := h.writeBuffer.Pull()
if !ok { if !ok {
@@ -65,15 +75,9 @@ func (h *multicastHandler) runWriter() {
data := tmp.(trackTypePayload) data := tmp.(trackTypePayload)
if data.isRTP { if data.isRTP {
h.rtpl.write(data.payload, &net.UDPAddr{ h.rtpl.write(data.payload, rtpAddr)
IP: h.rtpl.ip(),
Port: h.rtpl.port(),
})
} else { } else {
h.rtcpl.write(data.payload, &net.UDPAddr{ h.rtcpl.write(data.payload, rtcpAddr)
IP: h.rtcpl.ip(),
Port: h.rtcpl.port(),
})
} }
} }
} }