diff --git a/client.go b/client.go index 119b41b5..81dc8393 100644 --- a/client.go +++ b/client.go @@ -30,8 +30,6 @@ import ( "github.com/aler9/gortsplib/pkg/liberrors" "github.com/aler9/gortsplib/pkg/multibuffer" "github.com/aler9/gortsplib/pkg/ringbuffer" - "github.com/aler9/gortsplib/pkg/rtcpreceiver" - "github.com/aler9/gortsplib/pkg/rtcpsender" ) const ( @@ -58,8 +56,6 @@ type clientTrack struct { udpRTPListener *clientUDPListener udpRTCPListener *clientUDPListener tcpChannel int - rtcpReceiver *rtcpreceiver.RTCPReceiver - rtcpSender *rtcpsender.RTCPSender } func (s clientState) String() string { @@ -225,6 +221,8 @@ type Client struct { closeError error writerRunning bool writeBuffer *ringbuffer.RingBuffer + rtcpReceiver *rtcpReceiver + rtcpSender *rtcpSender // connCloser channels connCloserTerminate chan struct{} @@ -670,14 +668,7 @@ func (c *Client) playRecordStart() { switch *c.effectiveTransport { case TransportUDP: - for trackID, cct := range c.tracks { - ctrackID := trackID - - cct.rtcpReceiver = rtcpreceiver.New(c.udpReceiverReportPeriod, nil, - cct.track.ClockRate(), func(pkt rtcp.Packet) { - c.WritePacketRTCP(ctrackID, pkt) - }) - } + c.rtcpReceiver = newRTCPReceiver(c.udpReceiverReportPeriod, len(c.tracks)) c.checkStreamTimer = time.NewTimer(c.InitialUDPReadTimeout) c.checkStreamInitial = true @@ -688,14 +679,7 @@ func (c *Client) playRecordStart() { } case TransportUDPMulticast: - for trackID, cct := range c.tracks { - ctrackID := trackID - - cct.rtcpReceiver = rtcpreceiver.New(c.udpReceiverReportPeriod, nil, - cct.track.ClockRate(), func(pkt rtcp.Packet) { - c.WritePacketRTCP(ctrackID, pkt) - }) - } + c.rtcpReceiver = newRTCPReceiver(c.udpReceiverReportPeriod, len(c.tracks)) c.checkStreamTimer = time.NewTimer(c.checkStreamPeriod) @@ -710,14 +694,7 @@ func (c *Client) playRecordStart() { c.tcpLastFrameTime = &v } } else if *c.effectiveTransport == TransportUDP { - for trackID, cct := range c.tracks { - ctrackID := trackID - - cct.rtcpSender = rtcpsender.New(c.udpSenderReportPeriod, - cct.track.ClockRate(), func(pkt rtcp.Packet) { - c.WritePacketRTCP(ctrackID, pkt) - }) - } + c.rtcpSender = newRTCPSender(c.udpSenderReportPeriod, len(c.tracks)) for _, cct := range c.tracks { cct.udpRTPListener.start(false) @@ -850,15 +827,11 @@ func (c *Client) playRecordStop(isClosing bool) { } if c.state == clientStatePlay { - for _, cct := range c.tracks { - cct.rtcpReceiver.Close() - cct.rtcpReceiver = nil - } + c.rtcpReceiver.close() + c.rtcpReceiver = nil } else { - for _, cct := range c.tracks { - cct.rtcpSender.Close() - cct.rtcpSender = nil - } + c.rtcpSender.close() + c.rtcpSender = nil } } @@ -1854,8 +1827,8 @@ func (c *Client) WritePacketRTP(trackID int, pkt *rtp.Packet) error { return err } - if c.tracks[trackID].rtcpSender != nil { - c.tracks[trackID].rtcpSender.ProcessPacketRTP(time.Now(), pkt) + if c.rtcpSender != nil { + c.rtcpSender.processPacketRTP(time.Now(), trackID, pkt) } c.writeBuffer.Push(trackTypePayload{ diff --git a/clientudpl.go b/clientudpl.go index aca9bf35..f7fda2f1 100644 --- a/clientudpl.go +++ b/clientudpl.go @@ -186,7 +186,7 @@ func (u *clientUDPListener) processPlayRTP(now time.Time, payload []byte) { pkt.Header.Padding = false pkt.PaddingSize = 0 - u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTP(now, pkt) + u.c.rtcpReceiver.processPacketRTP(now, u.trackID, pkt) u.c.OnPacketRTP(u.trackID, pkt) } @@ -197,7 +197,7 @@ func (u *clientUDPListener) processPlayRTCP(now time.Time, payload []byte) { } for _, pkt := range packets { - u.c.tracks[u.trackID].rtcpReceiver.ProcessPacketRTCP(now, pkt) + u.c.rtcpReceiver.processPacketRTCP(now, u.trackID, pkt) u.c.OnPacketRTCP(u.trackID, pkt) } } diff --git a/go.mod b/go.mod index 5c07b695..61520331 100644 --- a/go.mod +++ b/go.mod @@ -5,9 +5,12 @@ go 1.15 require ( github.com/asticode/go-astits v1.10.0 github.com/icza/bitio v1.0.0 + github.com/pion/interceptor v0.0.0-00010101000000-000000000000 github.com/pion/rtcp v1.2.9 github.com/pion/rtp/v2 v2.0.0-20220302185659-b3d10fc096b0 github.com/pion/sdp/v3 v3.0.2 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 golang.org/x/net v0.0.0-20210610132358-84b48f89b13b ) + +replace github.com/pion/interceptor => github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0 diff --git a/go.sum b/go.sum index 6a0790d8..959d4fa3 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0 h1:yF7/w7wwPhwGaGiu2d+6uerZpOxXUvGf77fdJxMtq3U= +github.com/aler9/interceptor v0.0.0-20220320105816-284c26e3dab0/go.mod h1:SydRAOHaGHULneiwoBnCXOMvVklY+ceUnym3jMKg5KQ= github.com/asticode/go-astikit v0.20.0 h1:+7N+J4E4lWx2QOkRdOf6DafWJMv6O4RRfgClwQokrH8= github.com/asticode/go-astikit v0.20.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= github.com/asticode/go-astits v1.10.0 h1:ixKsRl84nWtjgHWcWKTDkUHNQ4kxbf9nKmjuSCninCU= @@ -8,6 +10,8 @@ github.com/icza/bitio v1.0.0 h1:squ/m1SHyFeCA6+6Gyol1AxV9nmPPlJFT8c2vKdj3U8= github.com/icza/bitio v1.0.0/go.mod h1:0jGnlLAx8MKMr9VGnn/4YrvZiprkvBelsVIbA9Jjr9A= github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6 h1:8UsGZ2rr2ksmEru6lToqnXgA8Mz1DP11X4zSJ159C3k= github.com/icza/mighty v0.0.0-20180919140131-cfd07d671de6/go.mod h1:xQig96I1VNBDIWGCdTt54nHt6EeI639SmHycLYL7FkA= +github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY= +github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= github.com/pion/rtcp v1.2.9 h1:1ujStwg++IOLIEoOiIQ2s+qBuJ1VN81KW+9pMPsif+U= @@ -22,8 +26,9 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b h1:k+E048sYJHyVnsr1GDrRZWQ32D2C7lWs9JRc0bel53A= golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/rtcpreceiver.go b/rtcpreceiver.go new file mode 100644 index 00000000..9887f22c --- /dev/null +++ b/rtcpreceiver.go @@ -0,0 +1,126 @@ +package gortsplib + +import ( + "sync" + "time" + "fmt" + + "github.com/pion/interceptor" + "github.com/pion/interceptor/pkg/report" + "github.com/pion/rtcp" + "github.com/pion/rtp/v2" +) + +type rtpReceiverTrack struct { + chain *interceptor.Chain + packetRTP *rtp.Packet + readerRTP interceptor.RTPReader + packetRTCP rtcp.Packet + readerRTCP interceptor.RTCPReader +} + +func newRTPReceiverTrack(ssrc uint32) *rtpReceiverTrack { + track := &rtpReceiverTrack{} + + factory, _ := report.NewReceiverInterceptor( + report.ReceiverInterval(1 * time.Second), // period + ) + istance, _ := factory.NewInterceptor("") + + track.chain = interceptor.NewChain([]interceptor.Interceptor{ + istance, + }) + + track.readerRTP = track.chain.BindRemoteStream( + &interceptor.StreamInfo{ + SSRC: ssrc, + }, + interceptor.RTPReaderFunc(func([]byte, interceptor.Attributes) (int, interceptor.Attributes, error) { + attrs := interceptor.Attributes{} + attrs.Set(0, track.packetRTP.Header) + return 0, attrs, nil + })) + + track.readerRTCP = track.chain.BindRTCPReader(interceptor.RTCPReaderFunc(func(b []byte, attrs interceptor.Attributes) (int, interceptor.Attributes, error) { + byts, _ := track.packetRTCP.Marshal() + n := copy(b, byts) + + //attrs := interceptor.Attributes{} + //attrs.Set(1, []rtcp.Packet{track.packetRTCP}) + fmt.Println("INCOMING RTCP", len(b), n, len(byts)) + return n, nil, nil + })) + + track.chain.BindRTCPWriter(interceptor.RTCPWriterFunc(func(pkts []rtcp.Packet, attributes interceptor.Attributes) (int, error) { + fmt.Println("TODO", pkts) + return 0, nil + })) + + return track +} + +func (track *rtpReceiverTrack) close() { + track.chain.Close() +} + +type rtcpReceiver struct { + period time.Duration + + tracks []*rtpReceiverTrack + mutex sync.Mutex +} + +func newRTCPReceiver(period time.Duration, tracksLen int) *rtcpReceiver { + rr := &rtcpReceiver{ + period: period, + tracks: make([]*rtpReceiverTrack, tracksLen), + } + + return rr +} + +func (rr *rtcpReceiver) close() { + for _, track := range rr.tracks { + if track != nil { + track.close() + } + } +} + +func (rr *rtcpReceiver) processPacketRTP(now time.Time, trackID int, pkt *rtp.Packet) { + track := func() *rtpReceiverTrack { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + track := rr.tracks[trackID] + if track == nil { + track = newRTPReceiverTrack(pkt.SSRC) + rr.tracks[trackID] = track + } + + return track + }() + + track.packetRTP = pkt + track.readerRTP.Read(nil, nil) +} + +func (rr *rtcpReceiver) processPacketRTCP(now time.Time, trackID int, pkt rtcp.Packet) { + if sr, ok := (pkt).(*rtcp.SenderReport); ok { + track := func() *rtpReceiverTrack { + rr.mutex.Lock() + defer rr.mutex.Unlock() + + track := rr.tracks[trackID] + if track == nil { + track = newRTPReceiverTrack(sr.SSRC) + rr.tracks[trackID] = track + } + + return track + }() + + track.packetRTCP = pkt + track.readerRTCP.Read(nil, nil) + } +} diff --git a/rtcpsender.go b/rtcpsender.go new file mode 100644 index 00000000..b25aeeff --- /dev/null +++ b/rtcpsender.go @@ -0,0 +1,32 @@ +package gortsplib + +import ( + "time" + + "github.com/pion/interceptor" + "github.com/pion/rtp/v2" +) + +type rtcpSender struct { + period time.Duration + tracksLen int + + chain *interceptor.Chain +} + +func newRTCPSender(period time.Duration, tracksLen int) *rtcpSender { + rs := &rtcpSender{ + period: period, + tracksLen: tracksLen, + } + + rs.chain = interceptor.NewChain(nil) + + return rs +} + +func (rs *rtcpSender) close() { +} + +func (rs *rtcpSender) processPacketRTP(now time.Time, trackID int, pkt *rtp.Packet) { +}