server: split WriteFrame into WritePacketRTP and WritePacketRTCP

This commit is contained in:
aler9
2021-10-30 16:15:04 +02:00
committed by Alessandro Ros
parent 472430f900
commit 2882bacdf2
10 changed files with 105 additions and 77 deletions

View File

@@ -131,7 +131,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx)
// if we are the publisher, route packet to readers // if we are the publisher, route packet to readers
if ctx.Session == sh.publisher { if ctx.Session == sh.publisher {
sh.stream.WriteFrame(ctx.TrackID, gortsplib.StreamTypeRTP, ctx.Payload) sh.stream.WritePacketRTP(ctx.TrackID, ctx.Payload)
} }
} }
@@ -142,7 +142,7 @@ func (sh *serverHandler) OnPacketRTCP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx
// if we are the publisher, route packet to readers // if we are the publisher, route packet to readers
if ctx.Session == sh.publisher { if ctx.Session == sh.publisher {
sh.stream.WriteFrame(ctx.TrackID, gortsplib.StreamTypeRTCP, ctx.Payload) sh.stream.WritePacketRTCP(ctx.TrackID, ctx.Payload)
} }
} }

View File

@@ -130,7 +130,7 @@ func (sh *serverHandler) OnPacketRTP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx)
// if we are the publisher, route packet to readers // if we are the publisher, route packet to readers
if ctx.Session == sh.publisher { if ctx.Session == sh.publisher {
sh.stream.WriteFrame(ctx.TrackID, gortsplib.StreamTypeRTP, ctx.Payload) sh.stream.WritePacketRTP(ctx.TrackID, ctx.Payload)
} }
} }
@@ -141,7 +141,7 @@ func (sh *serverHandler) OnPacketRTCP(ctx *gortsplib.ServerHandlerOnPacketRTPCtx
// if we are the publisher, route packet to readers // if we are the publisher, route packet to readers
if ctx.Session == sh.publisher { if ctx.Session == sh.publisher {
sh.stream.WriteFrame(ctx.TrackID, gortsplib.StreamTypeRTCP, ctx.Payload) sh.stream.WritePacketRTCP(ctx.TrackID, ctx.Payload)
} }
} }

View File

@@ -651,7 +651,7 @@ func TestServerPublish(t *testing.T) {
onPacketRTCP: func(ctx *ServerHandlerOnPacketRTCPCtx) { onPacketRTCP: func(ctx *ServerHandlerOnPacketRTCPCtx) {
require.Equal(t, 0, ctx.TrackID) require.Equal(t, 0, ctx.TrackID)
require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, ctx.Payload) require.Equal(t, []byte{0x05, 0x06, 0x07, 0x08}, ctx.Payload)
ctx.Session.WriteFrame(0, StreamTypeRTCP, []byte{0x09, 0x0A, 0x0B, 0x0C}) ctx.Session.WritePacketRTCP(0, []byte{0x09, 0x0A, 0x0B, 0x0C})
}, },
}, },
} }

View File

@@ -295,8 +295,8 @@ func TestServerRead(t *testing.T) {
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
stream.WriteFrame(0, StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) stream.WritePacketRTP(0, []byte{0x01, 0x02, 0x03, 0x04})
stream.WriteFrame(0, StreamTypeRTCP, []byte{0x05, 0x06, 0x07, 0x08}) stream.WritePacketRTCP(0, []byte{0x05, 0x06, 0x07, 0x08})
}() }()
return &base.Response{ return &base.Response{
@@ -673,7 +673,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
go func() { go func() {
defer close(writerDone) defer close(writerDone)
stream.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) stream.WritePacketRTP(0, []byte("\x00\x00\x00\x00"))
t := time.NewTicker(50 * time.Millisecond) t := time.NewTicker(50 * time.Millisecond)
defer t.Stop() defer t.Stop()
@@ -681,7 +681,7 @@ func TestServerReadTCPResponseBeforeFrames(t *testing.T) {
for { for {
select { select {
case <-t.C: case <-t.C:
stream.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) stream.WritePacketRTP(0, []byte("\x00\x00\x00\x00"))
case <-writerTerminate: case <-writerTerminate:
return return
} }
@@ -857,7 +857,7 @@ func TestServerReadPlayPausePlay(t *testing.T) {
for { for {
select { select {
case <-t.C: case <-t.C:
stream.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) stream.WritePacketRTP(0, []byte("\x00\x00\x00\x00"))
case <-writerTerminate: case <-writerTerminate:
return return
} }
@@ -973,7 +973,7 @@ func TestServerReadPlayPausePause(t *testing.T) {
for { for {
select { select {
case <-t.C: case <-t.C:
stream.WriteFrame(0, StreamTypeRTP, []byte("\x00\x00\x00\x00")) stream.WritePacketRTP(0, []byte("\x00\x00\x00\x00"))
case <-writerTerminate: case <-writerTerminate:
return return
} }
@@ -1477,8 +1477,8 @@ func TestServerReadNonSetuppedPath(t *testing.T) {
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
stream.WriteFrame(1, base.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) stream.WritePacketRTP(1, []byte{0x01, 0x02, 0x03, 0x04})
stream.WriteFrame(0, base.StreamTypeRTP, []byte{0x05, 0x06, 0x07, 0x08}) stream.WritePacketRTP(0, []byte{0x05, 0x06, 0x07, 0x08})
}() }()
return &base.Response{ return &base.Response{
@@ -1642,8 +1642,8 @@ func TestServerReadAdditionalInfos(t *testing.T) {
onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) { onPlay: func(ctx *ServerHandlerOnPlayCtx) (*base.Response, error) {
go func() { go func() {
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
stream.WriteFrame(1, base.StreamTypeRTP, []byte{0x01, 0x02, 0x03, 0x04}) stream.WritePacketRTP(1, []byte{0x01, 0x02, 0x03, 0x04})
stream.WriteFrame(0, base.StreamTypeRTP, []byte{0x05, 0x06, 0x07, 0x08}) stream.WritePacketRTP(0, []byte{0x05, 0x06, 0x07, 0x08})
}() }()
return &base.Response{ return &base.Response{
@@ -1669,7 +1669,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal() }).Marshal()
require.NoError(t, err) require.NoError(t, err)
stream.WriteFrame(0, StreamTypeRTP, buf) stream.WritePacketRTP(0, buf)
rtpInfo, ssrcs := getInfos() rtpInfo, ssrcs := getInfos()
require.Equal(t, &headers.RTPInfo{ require.Equal(t, &headers.RTPInfo{
@@ -1705,7 +1705,7 @@ func TestServerReadAdditionalInfos(t *testing.T) {
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal() }).Marshal()
require.NoError(t, err) require.NoError(t, err)
stream.WriteFrame(1, StreamTypeRTP, buf) stream.WritePacketRTP(1, buf)
rtpInfo, ssrcs = getInfos() rtpInfo, ssrcs = getInfos()
require.Equal(t, &headers.RTPInfo{ require.Equal(t, &headers.RTPInfo{

View File

@@ -48,8 +48,8 @@ type testServerHandler struct {
onPlay func(*ServerHandlerOnPlayCtx) (*base.Response, error) onPlay func(*ServerHandlerOnPlayCtx) (*base.Response, error)
onRecord func(*ServerHandlerOnRecordCtx) (*base.Response, error) onRecord func(*ServerHandlerOnRecordCtx) (*base.Response, error)
onPause func(*ServerHandlerOnPauseCtx) (*base.Response, error) onPause func(*ServerHandlerOnPauseCtx) (*base.Response, error)
onPacketRTP func(*ServerHandlerOnPacketRTPCtx) onPacketRTP func(*ServerHandlerOnPacketRTPCtx)
onPacketRTCP func(*ServerHandlerOnPacketRTCPCtx) onPacketRTCP func(*ServerHandlerOnPacketRTCPCtx)
onSetParameter func(*ServerHandlerOnSetParameterCtx) (*base.Response, error) onSetParameter func(*ServerHandlerOnSetParameterCtx) (*base.Response, error)
onGetParameter func(*ServerHandlerOnGetParameterCtx) (*base.Response, error) onGetParameter func(*ServerHandlerOnGetParameterCtx) (*base.Response, error)
} }
@@ -404,7 +404,7 @@ func TestServerHighLevelPublishRead(t *testing.T) {
defer mutex.Unlock() defer mutex.Unlock()
if ctx.Session == publisher { if ctx.Session == publisher {
stream.WriteFrame(ctx.TrackID, StreamTypeRTP, ctx.Payload) stream.WritePacketRTP(ctx.TrackID, ctx.Payload)
} }
}, },
onPacketRTCP: func(ctx *ServerHandlerOnPacketRTCPCtx) { onPacketRTCP: func(ctx *ServerHandlerOnPacketRTCPCtx) {
@@ -412,7 +412,7 @@ func TestServerHighLevelPublishRead(t *testing.T) {
defer mutex.Unlock() defer mutex.Unlock()
if ctx.Session == publisher { if ctx.Session == publisher {
stream.WriteFrame(ctx.TrackID, StreamTypeRTCP, ctx.Payload) stream.WritePacketRTCP(ctx.TrackID, ctx.Payload)
} }
}, },
}, },

View File

@@ -162,17 +162,17 @@ func (sc *ServerConn) run() {
if streamType == StreamTypeRTP { if streamType == StreamTypeRTP {
if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok { if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTP); ok {
h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{
Session: sc.tcpSession, Session: sc.tcpSession,
TrackID: trackID, TrackID: trackID,
Payload: frame.Payload, Payload: frame.Payload,
}) })
} }
} else { } else {
if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTCP); ok { if h, ok := sc.s.Handler.(ServerHandlerOnPacketRTCP); ok {
h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{
Session: sc.tcpSession, Session: sc.tcpSession,
TrackID: trackID, TrackID: trackID,
Payload: frame.Payload, Payload: frame.Payload,
}) })
} }
} }

View File

@@ -181,9 +181,9 @@ type ServerHandlerOnSetParameter interface {
// ServerHandlerOnPacketRTPCtx is the context of a RTP packet. // ServerHandlerOnPacketRTPCtx is the context of a RTP packet.
type ServerHandlerOnPacketRTPCtx struct { type ServerHandlerOnPacketRTPCtx struct {
Session *ServerSession Session *ServerSession
TrackID int TrackID int
Payload []byte Payload []byte
} }
// ServerHandlerOnPacketRTP can be implemented by a ServerHandler. // ServerHandlerOnPacketRTP can be implemented by a ServerHandler.
@@ -193,9 +193,9 @@ type ServerHandlerOnPacketRTP interface {
// ServerHandlerOnPacketRTCPCtx is the context of a RTCP packet. // ServerHandlerOnPacketRTCPCtx is the context of a RTCP packet.
type ServerHandlerOnPacketRTCPCtx struct { type ServerHandlerOnPacketRTCPCtx struct {
Session *ServerSession Session *ServerSession
TrackID int TrackID int
Payload []byte Payload []byte
} }
// ServerHandlerOnPacketRTCP can be implemented by a ServerHandler. // ServerHandlerOnPacketRTCP can be implemented by a ServerHandler.

View File

@@ -339,7 +339,7 @@ func (ss *ServerSession) run() {
now := time.Now() now := time.Now()
for trackID, track := range ss.announcedTracks { for trackID, track := range ss.announcedTracks {
r := track.rtcpReceiver.Report(now) r := track.rtcpReceiver.Report(now)
ss.WriteFrame(trackID, StreamTypeRTCP, r) ss.WritePacketRTCP(trackID, r)
} }
case <-ss.ctx.Done(): case <-ss.ctx.Done():
@@ -863,7 +863,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false) sc.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, false)
// open the firewall by sending packets to the counterpart // open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTCP, ss.WritePacketRTCP(trackID,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
} }
@@ -905,7 +905,7 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
path, query := base.PathSplitQuery(pathAndQuery) path, query := base.PathSplitQuery(pathAndQuery)
// allow to use WriteFrame() before response // allow to use WritePacket*() before response
if *ss.setuppedTransport == TransportTCP { if *ss.setuppedTransport == TransportTCP {
ss.tcpConn = sc ss.tcpConn = sc
} }
@@ -938,9 +938,9 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
ss.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, true) ss.s.udpRTCPListener.addClient(ss.author.ip(), track.udpRTCPPort, ss, trackID, true)
// open the firewall by sending packets to the counterpart // open the firewall by sending packets to the counterpart
ss.WriteFrame(trackID, StreamTypeRTP, ss.WritePacketRTP(trackID,
[]byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}) []byte{0x80, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00})
ss.WriteFrame(trackID, StreamTypeRTCP, ss.WritePacketRTCP(trackID,
[]byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00}) []byte{0x80, 0xc9, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00})
} }
@@ -1065,8 +1065,8 @@ func (ss *ServerSession) handleRequest(sc *ServerConn, req *base.Request) (*base
}, liberrors.ErrServerUnhandledRequest{Req: req} }, liberrors.ErrServerUnhandledRequest{Req: req}
} }
// WriteFrame writes a frame to the session. // WritePacketRTP writes a RTP packet to the session.
func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload []byte) { func (ss *ServerSession) WritePacketRTP(trackID int, payload []byte) {
if _, ok := ss.setuppedTracks[trackID]; !ok { if _, ok := ss.setuppedTracks[trackID]; !ok {
return return
} }
@@ -1075,25 +1075,41 @@ func (ss *ServerSession) WriteFrame(trackID int, streamType StreamType, payload
case TransportUDP: case TransportUDP:
track := ss.setuppedTracks[trackID] track := ss.setuppedTracks[trackID]
if streamType == StreamTypeRTP { ss.s.udpRTPListener.write(payload, &net.UDPAddr{
ss.s.udpRTPListener.write(payload, &net.UDPAddr{ IP: ss.author.ip(),
IP: ss.author.ip(), Zone: ss.author.zone(),
Zone: ss.author.zone(), Port: track.udpRTPPort,
Port: track.udpRTPPort, })
})
} else {
ss.s.udpRTCPListener.write(payload, &net.UDPAddr{
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: track.udpRTCPPort,
})
}
case TransportTCP: case TransportTCP:
channel := ss.setuppedTracks[trackID].tcpChannel channel := ss.setuppedTracks[trackID].tcpChannel
if streamType == base.StreamTypeRTCP {
channel++ ss.tcpConn.tcpFrameWriteBuffer.Push(&base.InterleavedFrame{
} Channel: channel,
Payload: payload,
})
}
}
// WritePacketRTCP writes a RTCP packet to the session.
func (ss *ServerSession) WritePacketRTCP(trackID int, payload []byte) {
if _, ok := ss.setuppedTracks[trackID]; !ok {
return
}
switch *ss.setuppedTransport {
case TransportUDP:
track := ss.setuppedTracks[trackID]
ss.s.udpRTCPListener.write(payload, &net.UDPAddr{
IP: ss.author.ip(),
Zone: ss.author.zone(),
Port: track.udpRTCPPort,
})
case TransportTCP:
channel := ss.setuppedTracks[trackID].tcpChannel
channel++
ss.tcpConn.tcpFrameWriteBuffer.Push(&base.InterleavedFrame{ ss.tcpConn.tcpFrameWriteBuffer.Push(&base.InterleavedFrame{
Channel: channel, Channel: channel,

View File

@@ -221,9 +221,9 @@ func (st *ServerStream) readerSetInactive(ss *ServerSession) {
} }
} }
// WriteFrame writes a frame to all the readers of the stream. // WritePacketRTP writes a RTP packet to all the readers of the stream.
func (st *ServerStream) WriteFrame(trackID int, streamType StreamType, payload []byte) { func (st *ServerStream) WritePacketRTP(trackID int, payload []byte) {
if streamType == StreamTypeRTP && len(payload) >= 8 { if len(payload) >= 8 {
track := st.trackInfos[trackID] track := st.trackInfos[trackID]
sequenceNumber := binary.BigEndian.Uint16(payload[2:4]) sequenceNumber := binary.BigEndian.Uint16(payload[2:4])
@@ -242,21 +242,33 @@ func (st *ServerStream) WriteFrame(trackID int, streamType StreamType, payload [
// send unicast // send unicast
for r := range st.readersUnicast { for r := range st.readersUnicast {
r.WriteFrame(trackID, streamType, payload) r.WritePacketRTP(trackID, payload)
} }
// send multicast // send multicast
if st.multicastListeners != nil { if st.multicastListeners != nil {
if streamType == StreamTypeRTP { st.multicastListeners[trackID].rtpListener.write(payload, &net.UDPAddr{
st.multicastListeners[trackID].rtpListener.write(payload, &net.UDPAddr{ IP: st.multicastListeners[trackID].rtpListener.ip(),
IP: st.multicastListeners[trackID].rtpListener.ip(), Port: st.multicastListeners[trackID].rtpListener.port(),
Port: st.multicastListeners[trackID].rtpListener.port(), })
}) }
} else { }
st.multicastListeners[trackID].rtcpListener.write(payload, &net.UDPAddr{
IP: st.multicastListeners[trackID].rtpListener.ip(), // WritePacketRTCP writes a RTCP packet to all the readers of the stream.
Port: st.multicastListeners[trackID].rtcpListener.port(), func (st *ServerStream) WritePacketRTCP(trackID int, payload []byte) {
}) st.mutex.RLock()
} defer st.mutex.RUnlock()
// send unicast
for r := range st.readersUnicast {
r.WritePacketRTCP(trackID, payload)
}
// send multicast
if st.multicastListeners != nil {
st.multicastListeners[trackID].rtcpListener.write(payload, &net.UDPAddr{
IP: st.multicastListeners[trackID].rtcpListener.ip(),
Port: st.multicastListeners[trackID].rtcpListener.port(),
})
} }
} }

View File

@@ -211,17 +211,17 @@ func (u *serverUDPListener) run() {
if u.streamType == StreamTypeRTP { if u.streamType == StreamTypeRTP {
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok { if h, ok := u.s.Handler.(ServerHandlerOnPacketRTP); ok {
h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{ h.OnPacketRTP(&ServerHandlerOnPacketRTPCtx{
Session: clientData.ss, Session: clientData.ss,
TrackID: clientData.trackID, TrackID: clientData.trackID,
Payload: buf[:n], Payload: buf[:n],
}) })
} }
} else { } else {
if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok { if h, ok := u.s.Handler.(ServerHandlerOnPacketRTCP); ok {
h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{ h.OnPacketRTCP(&ServerHandlerOnPacketRTCPCtx{
Session: clientData.ss, Session: clientData.ss,
TrackID: clientData.trackID, TrackID: clientData.trackID,
Payload: buf[:n], Payload: buf[:n],
}) })
} }
} }