Files
gortsplib/client_format.go

160 lines
4.0 KiB
Go

package gortsplib
import (
"sync/atomic"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/bluenviron/gortsplib/v4/internal/rtcpreceiver"
"github.com/bluenviron/gortsplib/v4/internal/rtcpsender"
"github.com/bluenviron/gortsplib/v4/internal/rtplossdetector"
"github.com/bluenviron/gortsplib/v4/internal/rtpreorderer"
"github.com/bluenviron/gortsplib/v4/pkg/format"
)
type clientFormat struct {
cm *clientMedia
format format.Format
onPacketRTP OnPacketRTPFunc
udpReorderer *rtpreorderer.Reorderer // play
tcpLossDetector *rtplossdetector.LossDetector // play
rtcpReceiver *rtcpreceiver.RTCPReceiver // play
rtcpSender *rtcpsender.RTCPSender // record or back channel
writePacketRTPInQueue func([]byte) error
rtpPacketsReceived *uint64
rtpPacketsSent *uint64
rtpPacketsLost *uint64
}
func (cf *clientFormat) initialize() {
cf.rtpPacketsReceived = new(uint64)
cf.rtpPacketsSent = new(uint64)
cf.rtpPacketsLost = new(uint64)
}
func (cf *clientFormat) start() {
if cf.cm.udpRTPListener != nil {
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueUDP
} else {
cf.writePacketRTPInQueue = cf.writePacketRTPInQueueTCP
}
if cf.cm.c.state == clientStateRecord || cf.cm.media.IsBackChannel {
cf.rtcpSender = &rtcpsender.RTCPSender{
ClockRate: cf.format.ClockRate(),
Period: cf.cm.c.senderReportPeriod,
TimeNow: cf.cm.c.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if !cf.cm.c.DisableRTCPSenderReports {
cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck
}
},
}
cf.rtcpSender.Initialize()
} else {
if cf.cm.udpRTPListener != nil {
cf.udpReorderer = &rtpreorderer.Reorderer{}
cf.udpReorderer.Initialize()
} else {
cf.tcpLossDetector = &rtplossdetector.LossDetector{}
}
cf.rtcpReceiver = &rtcpreceiver.RTCPReceiver{
ClockRate: cf.format.ClockRate(),
Period: cf.cm.c.receiverReportPeriod,
TimeNow: cf.cm.c.timeNow,
WritePacketRTCP: func(pkt rtcp.Packet) {
if cf.cm.udpRTPListener != nil {
cf.cm.c.WritePacketRTCP(cf.cm.media, pkt) //nolint:errcheck
}
},
}
err := cf.rtcpReceiver.Initialize()
if err != nil {
panic(err)
}
}
}
func (cf *clientFormat) stop() {
if cf.rtcpReceiver != nil {
cf.rtcpReceiver.Close()
cf.rtcpReceiver = nil
}
if cf.rtcpSender != nil {
cf.rtcpSender.Close()
}
}
func (cf *clientFormat) readPacketRTPUDP(pkt *rtp.Packet) {
packets, lost := cf.udpReorderer.Process(pkt)
if lost != 0 {
cf.onPacketRTPLost(lost)
// do not return
}
now := cf.cm.c.timeNow()
for _, pkt := range packets {
cf.handlePacketRTP(pkt, now)
}
}
func (cf *clientFormat) readPacketRTPTCP(pkt *rtp.Packet) {
lost := cf.tcpLossDetector.Process(pkt)
if lost != 0 {
cf.onPacketRTPLost(lost)
// do not return
}
now := cf.cm.c.timeNow()
cf.handlePacketRTP(pkt, now)
}
func (cf *clientFormat) handlePacketRTP(pkt *rtp.Packet, now time.Time) {
err := cf.rtcpReceiver.ProcessPacketRTP(pkt, now, cf.format.PTSEqualsDTS(pkt))
if err != nil {
cf.cm.onPacketRTPDecodeError(err)
return
}
atomic.AddUint64(cf.rtpPacketsReceived, 1)
cf.onPacketRTP(pkt)
}
func (cf *clientFormat) onPacketRTPLost(lost uint64) {
atomic.AddUint64(cf.rtpPacketsLost, lost)
cf.cm.c.OnPacketLost2(lost)
}
func (cf *clientFormat) writePacketRTPInQueueUDP(payload []byte) error {
err := cf.cm.udpRTPListener.write(payload)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
}
func (cf *clientFormat) writePacketRTPInQueueTCP(payload []byte) error {
cf.cm.c.tcpFrame.Channel = cf.cm.tcpChannel
cf.cm.c.tcpFrame.Payload = payload
cf.cm.c.nconn.SetWriteDeadline(time.Now().Add(cf.cm.c.WriteTimeout))
err := cf.cm.c.conn.WriteInterleavedFrame(cf.cm.c.tcpFrame, cf.cm.c.tcpBuffer)
if err != nil {
return err
}
atomic.AddUint64(cf.cm.bytesSent, uint64(len(payload)))
atomic.AddUint64(cf.rtpPacketsSent, 1)
return nil
}