This commit is contained in:
aler9
2022-03-20 12:34:03 +01:00
parent 0e8c93c5c2
commit 94be09cc20
6 changed files with 181 additions and 42 deletions

View File

@@ -30,8 +30,6 @@ import (
"github.com/aler9/gortsplib/pkg/liberrors"
"github.com/aler9/gortsplib/pkg/multibuffer"
"github.com/aler9/gortsplib/pkg/ringbuffer"
"github.com/aler9/gortsplib/pkg/rtcpreceiver"
"github.com/aler9/gortsplib/pkg/rtcpsender"
)
const (
@@ -58,8 +56,6 @@ type clientTrack struct {
udpRTPListener *clientUDPListener
udpRTCPListener *clientUDPListener
tcpChannel int
rtcpReceiver *rtcpreceiver.RTCPReceiver
rtcpSender *rtcpsender.RTCPSender
}
func (s clientState) String() string {
@@ -225,6 +221,8 @@ type Client struct {
closeError error
writerRunning bool
writeBuffer *ringbuffer.RingBuffer
rtcpReceiver *rtcpReceiver
rtcpSender *rtcpSender
// connCloser channels
connCloserTerminate chan struct{}
@@ -670,14 +668,7 @@ func (c *Client) playRecordStart() {
switch *c.effectiveTransport {
case TransportUDP:
for trackID, cct := range c.tracks {
ctrackID := trackID
cct.rtcpReceiver = rtcpreceiver.New(c.udpReceiverReportPeriod, nil,
cct.track.ClockRate(), func(pkt rtcp.Packet) {
c.WritePacketRTCP(ctrackID, pkt)
})
}
c.rtcpReceiver = newRTCPReceiver(c.udpReceiverReportPeriod, len(c.tracks))
c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout)
c.checkStreamInitial = true
@@ -688,14 +679,7 @@ func (c *Client) playRecordStart() {
}
case TransportUDPMulticast:
for trackID, cct := range c.tracks {
ctrackID := trackID
cct.rtcpReceiver = rtcpreceiver.New(c.udpReceiverReportPeriod, nil,
cct.track.ClockRate(), func(pkt rtcp.Packet) {
c.WritePacketRTCP(ctrackID, pkt)
})
}
c.rtcpReceiver = newRTCPReceiver(c.udpReceiverReportPeriod, len(c.tracks))
c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod)
@@ -710,14 +694,7 @@ func (c *Client) playRecordStart() {
c.tcpLastFrameTime = &v
}
} else if *c.effectiveTransport == TransportUDP {
for trackID, cct := range c.tracks {
ctrackID := trackID
cct.rtcpSender = rtcpsender.New(c.udpSenderReportPeriod,
cct.track.ClockRate(), func(pkt rtcp.Packet) {
c.WritePacketRTCP(ctrackID, pkt)
})
}
c.rtcpSender = newRTCPSender(c.udpSenderReportPeriod, len(c.tracks))
for _, cct := range c.tracks {
cct.udpRTPListener.start(false)
@@ -850,15 +827,11 @@ func (c *Client) playRecordStop(isClosing bool) {
}
if c.state == clientStatePlay {
for _, cct := range c.tracks {
cct.rtcpReceiver.Close()
cct.rtcpReceiver = nil
}
c.rtcpReceiver.close()
c.rtcpReceiver = nil
} else {
for _, cct := range c.tracks {
cct.rtcpSender.Close()
cct.rtcpSender = nil
}
c.rtcpSender.close()
c.rtcpSender = nil
}
}
@@ -1854,8 +1827,8 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet) error {
return err
}
if c.tracks[trackID].rtcpSender != nil {
c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt)
if c.rtcpSender != nil {
c.rtcpSender.processPacketRTP(time.Now(), trackID, pkt)
}
c.writeBuffer.Push(trackTypePayload{

View File

@@ -186,7 +186,7 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) {
pkt.Header.Padding = false
pkt.PaddingSize = 0
u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTP(now, pkt)
u.c.rtcpReceiver.processPacketRTP(now, u.trackID, pkt)
u.c.OnPacketRTP(u.trackID, pkt)
}
@@ -197,7 +197,7 @@ func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) {
}
for _, pkt := range packets {
u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt)
u.c.rtcpReceiver.processPacketRTCP(now, u.trackID, pkt)
u.c.OnPacketRTCP(u.trackID, pkt)
}
}

5
go.mod
View File

@@ -5,9 +5,12 @@ go 1.15
require (
github.com/asticode/go-astits v1.10.0
github.com/icza/bitio v1.0.0
github.com/pion/interceptor v0.0.0-00010101000000-000000000000
github.com/pion/rtcp v1.2.9
github.com/pion/rtp/v2 v2.0.0-20220302185659-b3d10fc096b0
github.com/pion/sdp/v3 v3.0.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b
)
replace github.com/pion/interceptor => github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0

7
go.sum
View File

@@ -1,3 +1,5 @@
github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0 h1:yF7/w7wwPhwGaGiu2d+6uerZpOxXUvGf77fdJxMtq3U=
github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0/go.mod h1:SydRAOHaGHULneiwoBnCXOMvVklY+ceUnym3jMKg5KQ=
github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8=
github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/asticode/go-astits v1.10.0 h1:ixKsRl84nWtjgHWcWKTDkUHNQ4kxbf9nKmjuSCninCU=
@@ -8,6 +10,8 @@ github.com/icza/bitio v1.0.0 h1:squ/m1SHyFeCA6+6Gyol1AxV9nmPPlJFT8c2vKdj3U8=
github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A=
github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lToqnXgA8Mz1DP11X4zSJ159C3k=
github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U=
@@ -22,8 +26,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A=
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

126
rtcpreceiver.go Normal file
View File

@@ -0,0 +1,126 @@
package gortsplib
import (
"sync"
"time"
"fmt"
"github.com/pion/interceptor"
"github.com/pion/interceptor/pkg/report"
"github.com/pion/rtcp"
"github.com/pion/rtp/v2"
)
type rtpReceiverTrack struct {
chain *interceptor.Chain
packetRTP *rtp.Packet
readerRTP interceptor.RTPReader
packetRTCP rtcp.Packet
readerRTCP interceptor.RTCPReader
}
func newRTPReceiverTrack(ssrc uint32) *rtpReceiverTrack {
track := &rtpReceiverTrack{}
factory, _ := report.NewReceiverInterceptor(
report.ReceiverInterval(1 * time.Second), // period
)
istance, _ := factory.NewInterceptor("")
track.chain = interceptor.NewChain([]interceptor.Interceptor{
istance,
})
track.readerRTP = track.chain.BindRemoteStream(
&interceptor.StreamInfo{
SSRC: ssrc,
},
interceptor.RTPReaderFunc(func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error) {
attrs := interceptor.Attributes{}
attrs.Set(0, track.packetRTP.Header)
return 0, attrs, nil
}))
track.readerRTCP = track.chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) {
byts, _ := track.packetRTCP.Marshal()
n := copy(b, byts)
//attrs := interceptor.Attributes{}
//attrs.Set(1, []rtcp.Packet{track.packetRTCP})
fmt.Println("INCOMING RTCP", len(b), n, len(byts))
return n, nil, nil
}))
track.chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) {
fmt.Println("TODO", pkts)
return 0, nil
}))
return track
}
func (track *rtpReceiverTrack) close() {
track.chain.Close()
}
type rtcpReceiver struct {
period time.Duration
tracks []*rtpReceiverTrack
mutex sync.Mutex
}
func newRTCPReceiver(period time.Duration, tracksLen int) *rtcpReceiver {
rr := &rtcpReceiver{
period: period,
tracks: make([]*rtpReceiverTrack, tracksLen),
}
return rr
}
func (rr *rtcpReceiver) close() {
for _, track := range rr.tracks {
if track != nil {
track.close()
}
}
}
func (rr *rtcpReceiver) processPacketRTP(now time.Time, trackID int, pkt *rtp.Packet) {
track := func() *rtpReceiverTrack {
rr.mutex.Lock()
defer rr.mutex.Unlock()
track := rr.tracks[trackID]
if track == nil {
track = newRTPReceiverTrack(pkt.SSRC)
rr.tracks[trackID] = track
}
return track
}()
track.packetRTP = pkt
track.readerRTP.Read(nil, nil)
}
func (rr *rtcpReceiver) processPacketRTCP(now time.Time, trackID int, pkt rtcp.Packet) {
if sr, ok := (pkt).(*rtcp.SenderReport); ok {
track := func() *rtpReceiverTrack {
rr.mutex.Lock()
defer rr.mutex.Unlock()
track := rr.tracks[trackID]
if track == nil {
track = newRTPReceiverTrack(sr.SSRC)
rr.tracks[trackID] = track
}
return track
}()
track.packetRTCP = pkt
track.readerRTCP.Read(nil, nil)
}
}

32
rtcpsender.go Normal file
View File

@@ -0,0 +1,32 @@
package gortsplib
import (
"time"
"github.com/pion/interceptor"
"github.com/pion/rtp/v2"
)
type rtcpSender struct {
period time.Duration
tracksLen int
chain *interceptor.Chain
}
func newRTCPSender(period time.Duration, tracksLen int) *rtcpSender {
rs := &rtcpSender{
period: period,
tracksLen: tracksLen,
}
rs.chain = interceptor.NewChain(nil)
return rs
}
func (rs *rtcpSender) close() {
}
func (rs *rtcpSender) processPacketRTP(now time.Time, trackID int, pkt *rtp.Packet) {
}