diff --git a/rtcp-receiver.go b/rtcp-receiver.go index 924ea50a..7e946e58 100644 --- a/rtcp-receiver.go +++ b/rtcp-receiver.go @@ -7,53 +7,47 @@ import ( "github.com/pion/rtcp" ) -type rtcpReceiverEvent interface { - isRtpReceiverEvent() -} - -type rtcpReceiverEventFrameRtp struct { +type frameRtpReq struct { sequenceNumber uint16 } -func (rtcpReceiverEventFrameRtp) isRtpReceiverEvent() {} - -type rtcpReceiverEventFrameRtcp struct { +type frameRtcpReq struct { ssrc uint32 ntpTimeMiddle uint32 } -func (rtcpReceiverEventFrameRtcp) isRtpReceiverEvent() {} - -type rtcpReceiverEventLastFrameTime struct { +type lastFrameTimeReq struct { res chan time.Time } -func (rtcpReceiverEventLastFrameTime) isRtpReceiverEvent() {} - -type rtcpReceiverEventReport struct { +type reportReq struct { res chan []byte } -func (rtcpReceiverEventReport) isRtpReceiverEvent() {} - -type rtcpReceiverEventTerminate struct{} - -func (rtcpReceiverEventTerminate) isRtpReceiverEvent() {} - // RtcpReceiver is an object that helps building RTCP receiver reports, by parsing // incoming frames. type RtcpReceiver struct { - events chan rtcpReceiverEvent - done chan struct{} + frameRtp chan frameRtpReq + frameRtcp chan frameRtcpReq + lastFrameTime chan lastFrameTimeReq + report chan reportReq + terminate chan struct{} + done chan struct{} } // NewRtcpReceiver allocates a RtcpReceiver. func NewRtcpReceiver() *RtcpReceiver { rr := &RtcpReceiver{ - events: make(chan rtcpReceiverEvent), - done: make(chan struct{}), + frameRtp: make(chan frameRtpReq), + frameRtcp: make(chan frameRtcpReq), + lastFrameTime: make(chan lastFrameTimeReq), + report: make(chan reportReq), + terminate: make(chan struct{}), + done: make(chan struct{}), } + go rr.run() + return rr } @@ -66,23 +60,23 @@ func (rr *RtcpReceiver) run() { lastSenderReport := uint32(0) outer: - for rawEvt := range rr.events { - switch evt := rawEvt.(type) { - case rtcpReceiverEventFrameRtp: - if evt.sequenceNumber < lastSequenceNumber { + for { + select { + case req := <-rr.frameRtp: + if req.sequenceNumber < lastSequenceNumber { sequenceNumberCycles += 1 } - lastSequenceNumber = evt.sequenceNumber + lastSequenceNumber = req.sequenceNumber lastFrameTime = time.Now() - case rtcpReceiverEventFrameRtcp: - publisherSSRC = evt.ssrc - lastSenderReport = evt.ntpTimeMiddle + case req := <-rr.frameRtcp: + publisherSSRC = req.ssrc + lastSenderReport = req.ntpTimeMiddle - case rtcpReceiverEventLastFrameTime: - evt.res <- lastFrameTime + case req := <-rr.lastFrameTime: + req.res <- lastFrameTime - case rtcpReceiverEventReport: + case req := <-rr.report: rr := &rtcp.ReceiverReport{ SSRC: receiverSSRC, Reports: []rtcp.ReceptionReport{ @@ -94,21 +88,23 @@ outer: }, } frame, _ := rr.Marshal() - evt.res <- frame + req.res <- frame - case rtcpReceiverEventTerminate: + case <-rr.terminate: break outer } } - close(rr.events) - + close(rr.frameRtp) + close(rr.frameRtcp) + close(rr.lastFrameTime) + close(rr.report) close(rr.done) } // Close closes a RtcpReceiver. func (rr *RtcpReceiver) Close() { - rr.events <- rtcpReceiverEventTerminate{} + close(rr.terminate) <-rr.done } @@ -118,7 +114,7 @@ func (rr *RtcpReceiver) OnFrame(streamType StreamType, buf []byte) { // extract sequence number of first frame if len(buf) >= 3 { sequenceNumber := uint16(uint16(buf[2])<<8 | uint16(buf[1])) - rr.events <- rtcpReceiverEventFrameRtp{sequenceNumber} + rr.frameRtp <- frameRtpReq{sequenceNumber} } } else { @@ -126,7 +122,7 @@ func (rr *RtcpReceiver) OnFrame(streamType StreamType, buf []byte) { if err == nil { for _, frame := range frames { if senderReport, ok := (frame).(*rtcp.SenderReport); ok { - rr.events <- rtcpReceiverEventFrameRtcp{ + rr.frameRtcp <- frameRtcpReq{ senderReport.SSRC, uint32(senderReport.NTPTime >> 16), } @@ -139,13 +135,13 @@ func (rr *RtcpReceiver) OnFrame(streamType StreamType, buf []byte) { // LastFrameTime returns the time the last frame was received. func (rr *RtcpReceiver) LastFrameTime() time.Time { res := make(chan time.Time) - rr.events <- rtcpReceiverEventLastFrameTime{res} + rr.lastFrameTime <- lastFrameTimeReq{res} return <-res } // Report generates a RTCP receiver report. func (rr *RtcpReceiver) Report() []byte { res := make(chan []byte) - rr.events <- rtcpReceiverEventReport{res} + rr.report <- reportReq{res} return <-res }