client: split WriteFrame into WritePacketRTP and WritePacketRTCP

This commit is contained in:
aler9
2021-10-30 16:43:46 +02:00
committed by Alessandro Ros
parent e8e2059b0d
commit 7ebbdbf093
8 changed files with 58 additions and 34 deletions

View File

@@ -188,7 +188,7 @@ func TestClientPublishSerial(t *testing.T) {
}) })
}() }()
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err) require.NoError(t, err)
@@ -196,7 +196,7 @@ func TestClientPublishSerial(t *testing.T) {
conn.Close() conn.Close()
<-done <-done
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.Error(t, err) require.Error(t, err)
}) })
@@ -328,7 +328,7 @@ func TestClientPublishParallel(t *testing.T) {
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
err := conn.WriteFrame(0, StreamTypeRTP, err := conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
if err != nil { if err != nil {
return return
@@ -475,21 +475,21 @@ func TestClientPublishPauseSerial(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err) require.NoError(t, err)
_, err = conn.Pause() _, err = conn.Pause()
require.NoError(t, err) require.NoError(t, err)
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.Error(t, err) require.Error(t, err)
_, err = conn.Record() _, err = conn.Record()
require.NoError(t, err) require.NoError(t, err)
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err) require.NoError(t, err)
}) })
@@ -619,7 +619,7 @@ func TestClientPublishPauseParallel(t *testing.T) {
defer t.Stop() defer t.Stop()
for range t.C { for range t.C {
err := conn.WriteFrame(0, StreamTypeRTP, err := conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
if err != nil { if err != nil {
return return
@@ -750,7 +750,7 @@ func TestClientPublishAutomaticProtocol(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
defer conn.Close() defer conn.Close()
err = conn.WriteFrame(0, StreamTypeRTP, err = conn.WritePacketRTP(0,
[]byte{0x01, 0x02, 0x03, 0x04}) []byte{0x01, 0x02, 0x03, 0x04})
require.NoError(t, err) require.NoError(t, err)
} }
@@ -902,11 +902,11 @@ func TestClientPublishRTCPReport(t *testing.T) {
}, },
Payload: []byte{0x01, 0x02, 0x03, 0x04}, Payload: []byte{0x01, 0x02, 0x03, 0x04},
}).Marshal() }).Marshal()
err = conn.WriteFrame(0, StreamTypeRTP, byts) err = conn.WritePacketRTP(0, byts)
require.NoError(t, err) require.NoError(t, err)
time.Sleep(1300 * time.Millisecond) time.Sleep(1300 * time.Millisecond)
err = conn.WriteFrame(0, StreamTypeRTP, byts) err = conn.WritePacketRTP(0, byts)
require.NoError(t, err) require.NoError(t, err)
} }

View File

@@ -430,7 +430,7 @@ func TestClientRead(t *testing.T) {
require.Equal(t, StreamTypeRTP, streamType) require.Equal(t, StreamTypeRTP, streamType)
require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload) require.Equal(t, []byte{0x01, 0x02, 0x03, 0x04}, payload)
err = conn.WriteFrame(0, StreamTypeRTCP, []byte{0x05, 0x06, 0x07, 0x08}) err = conn.WritePacketRTCP(0, []byte{0x05, 0x06, 0x07, 0x08})
require.NoError(t, err) require.NoError(t, err)
}) })
}() }()

View File

@@ -509,7 +509,7 @@ func (cc *ClientConn) runBackgroundPlayUDP() error {
now := time.Now() now := time.Now()
for trackID, cct := range cc.tracks { for trackID, cct := range cc.tracks {
rr := cct.rtcpReceiver.Report(now) rr := cct.rtcpReceiver.Report(now)
cc.WriteFrame(trackID, StreamTypeRTCP, rr) cc.WritePacketRTCP(trackID, rr)
} }
case <-keepaliveTicker.C: case <-keepaliveTicker.C:
@@ -645,7 +645,7 @@ func (cc *ClientConn) runBackgroundPlayTCP() error {
now := time.Now() now := time.Now()
for trackID, cct := range cc.tracks { for trackID, cct := range cc.tracks {
rr := cct.rtcpReceiver.Report(now) rr := cct.rtcpReceiver.Report(now)
cc.WriteFrame(trackID, StreamTypeRTCP, rr) cc.WritePacketRTCP(trackID, rr)
} }
case <-checkStreamTicker.C: case <-checkStreamTicker.C:
@@ -709,7 +709,7 @@ func (cc *ClientConn) runBackgroundRecordUDP() error {
for trackID, cct := range cc.tracks { for trackID, cct := range cc.tracks {
sr := cct.rtcpSender.Report(now) sr := cct.rtcpSender.Report(now)
if sr != nil { if sr != nil {
cc.WriteFrame(trackID, StreamTypeRTCP, sr) cc.WritePacketRTCP(trackID, sr)
} }
} }
@@ -766,7 +766,7 @@ func (cc *ClientConn) runBackgroundRecordTCP() error {
for trackID, cct := range cc.tracks { for trackID, cct := range cc.tracks {
sr := cct.rtcpSender.Report(now) sr := cct.rtcpSender.Report(now)
if sr != nil { if sr != nil {
cc.WriteFrame(trackID, StreamTypeRTCP, sr) cc.WritePacketRTCP(trackID, sr)
} }
} }
@@ -1677,8 +1677,8 @@ func (cc *ClientConn) ReadFrames(onFrame func(int, StreamType, []byte)) error {
return cc.backgroundErr return cc.backgroundErr
} }
// WriteFrame writes a frame. // WritePacketRTP writes a RTP packet.
func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []byte) error { func (cc *ClientConn) WritePacketRTP(trackID int, payload []byte) error {
now := time.Now() now := time.Now()
cc.writeMutex.RLock() cc.writeMutex.RLock()
@@ -1689,25 +1689,49 @@ func (cc *ClientConn) WriteFrame(trackID int, streamType StreamType, payload []b
} }
if cc.tracks[trackID].rtcpSender != nil { if cc.tracks[trackID].rtcpSender != nil {
if streamType == StreamTypeRTP { cc.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload)
cc.tracks[trackID].rtcpSender.ProcessPacketRTP(now, payload)
} else {
cc.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload)
}
} }
switch *cc.protocol { switch *cc.protocol {
case TransportUDP, TransportUDPMulticast: case TransportUDP, TransportUDPMulticast:
if streamType == StreamTypeRTP { return cc.tracks[trackID].udpRTPListener.write(payload)
return cc.tracks[trackID].udpRTPListener.write(payload)
}
return cc.tracks[trackID].udpRTCPListener.write(payload)
default: // TCP default: // TCP
channel := cc.tracks[trackID].tcpChannel channel := cc.tracks[trackID].tcpChannel
if streamType == StreamTypeRTCP {
channel++ cc.tcpWriteMutex.Lock()
} defer cc.tcpWriteMutex.Unlock()
cc.nconn.SetWriteDeadline(now.Add(cc.c.WriteTimeout))
return base.InterleavedFrame{
Channel: channel,
Payload: payload,
}.Write(cc.bw)
}
}
// WritePacketRTCP writes a RTCP packet.
func (cc *ClientConn) WritePacketRTCP(trackID int, payload []byte) error {
now := time.Now()
cc.writeMutex.RLock()
defer cc.writeMutex.RUnlock()
if !cc.writeFrameAllowed {
return cc.backgroundErr
}
if cc.tracks[trackID].rtcpSender != nil {
cc.tracks[trackID].rtcpSender.ProcessPacketRTCP(now, payload)
}
switch *cc.protocol {
case TransportUDP, TransportUDPMulticast:
return cc.tracks[trackID].udpRTCPListener.write(payload)
default: // TCP
channel := cc.tracks[trackID].tcpChannel
channel++
cc.tcpWriteMutex.Lock() cc.tcpWriteMutex.Lock()
defer cc.tcpWriteMutex.Unlock() defer cc.tcpWriteMutex.Unlock()

View File

@@ -56,7 +56,7 @@ func main() {
} }
// route RTP packets to the server // route RTP packets to the server
err = conn.WriteFrame(0, gortsplib.StreamTypeRTP, buf[:n]) err = conn.WritePacketRTP(0, buf[:n])
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -57,7 +57,7 @@ func main() {
} }
// route RTP packets to the server // route RTP packets to the server
err = conn.WriteFrame(0, gortsplib.StreamTypeRTP, buf[:n]) err = conn.WritePacketRTP(0, buf[:n])
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -68,7 +68,7 @@ func main() {
} }
// route RTP packets to the server // route RTP packets to the server
err = conn.WriteFrame(0, gortsplib.StreamTypeRTP, buf[:n]) err = conn.WritePacketRTP(0, buf[:n])
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -56,7 +56,7 @@ func main() {
} }
// route RTP packets to the server // route RTP packets to the server
err = conn.WriteFrame(0, gortsplib.StreamTypeRTP, buf[:n]) err = conn.WritePacketRTP(0, buf[:n])
if err != nil { if err != nil {
panic(err) panic(err)
} }

View File

@@ -64,7 +64,7 @@ func main() {
} }
// route RTP packets to the server // route RTP packets to the server
err = conn.WriteFrame(0, gortsplib.StreamTypeRTP, buf[:n]) err = conn.WritePacketRTP(0, buf[:n])
if err != nil { if err != nil {
break break
} }