// SPDX-FileCopyrightText: 2023 The Pion community // SPDX-License-Identifier: MIT // Package samplebuilder provides functionality to reconstruct media frames from RTP packets. package samplebuilder import ( "math" "time" "github.com/pion/rtp" "github.com/pion/webrtc/v3/pkg/media" ) // SampleBuilder buffers packets until media frames are complete. type SampleBuilder struct { maxLate uint16 // how many packets to wait until we get a valid Sample maxLateTimestamp uint32 // max timestamp between old and new timestamps before dropping packets buffer [math.MaxUint16 + 1]*rtp.Packet preparedSamples [math.MaxUint16 + 1]*media.Sample // Interface that allows us to take RTP packets to samples depacketizer rtp.Depacketizer // sampleRate allows us to compute duration of media.SamplecA sampleRate uint32 // the handler to be called when the builder is about to remove the // reference to some packet. packetReleaseHandler func(*rtp.Packet) // filled contains the head/tail of the packets inserted into the buffer filled sampleSequenceLocation // active contains the active head/tail of the timestamp being actively processed active sampleSequenceLocation // prepared contains the samples that have been processed to date prepared sampleSequenceLocation // number of packets forced to be dropped droppedPackets uint16 // allows inspecting head packets of each sample and then returns a custom metadata packetHeadHandler func(headPacket interface{}) interface{} } // New constructs a new SampleBuilder. // maxLate is how long to wait until we can construct a completed media.Sample. // maxLate is measured in RTP packet sequence numbers. // A large maxLate will result in less packet loss but higher latency. // The depacketizer extracts media samples from RTP packets. // Several depacketizers are available in package github.com/pion/rtp/codecs. func New(maxLate uint16, depacketizer rtp.Depacketizer, sampleRate uint32, opts ...Option) *SampleBuilder { s := &SampleBuilder{maxLate: maxLate, depacketizer: depacketizer, sampleRate: sampleRate} for _, o := range opts { o(s) } return s } func (s *SampleBuilder) tooOld(location sampleSequenceLocation) bool { if s.maxLateTimestamp == 0 { return false } var foundHead *rtp.Packet var foundTail *rtp.Packet for i := location.head; i != location.tail; i++ { if packet := s.buffer[i]; packet != nil { foundHead = packet break } } if foundHead == nil { return false } for i := location.tail - 1; i != location.head; i-- { if packet := s.buffer[i]; packet != nil { foundTail = packet break } } if foundTail == nil { return false } return timestampDistance(foundHead.Timestamp, foundTail.Timestamp) > s.maxLateTimestamp } // fetchTimestamp returns the timestamp associated with a given sample location func (s *SampleBuilder) fetchTimestamp(location sampleSequenceLocation) (timestamp uint32, hasData bool) { if location.empty() { return 0, false } packet := s.buffer[location.head] if packet == nil { return 0, false } return packet.Timestamp, true } func (s *SampleBuilder) releasePacket(i uint16) { var p *rtp.Packet p, s.buffer[i] = s.buffer[i], nil if p != nil && s.packetReleaseHandler != nil { s.packetReleaseHandler(p) } } // purgeConsumedBuffers clears all buffers that have already been consumed by // popping. func (s *SampleBuilder) purgeConsumedBuffers() { s.purgeConsumedLocation(s.active, false) } // purgeConsumedLocation clears all buffers that have already been consumed // during a sample building method. func (s *SampleBuilder) purgeConsumedLocation(consume sampleSequenceLocation, forceConsume bool) { if !s.filled.hasData() { return } switch consume.compare(s.filled.head) { case slCompareInside: if !forceConsume { break } fallthrough case slCompareBefore: s.releasePacket(s.filled.head) s.filled.head++ } } // purgeBuffers flushes all buffers that are already consumed or those buffers // that are too late to consume. func (s *SampleBuilder) purgeBuffers() { s.purgeConsumedBuffers() for (s.tooOld(s.filled) || (s.filled.count() > s.maxLate)) && s.filled.hasData() { if s.active.empty() { // refill the active based on the filled packets s.active = s.filled } if s.active.hasData() && (s.active.head == s.filled.head) { // attempt to force the active packet to be consumed even though // outstanding data may be pending arrival if s.buildSample(true) != nil { continue } // could not build the sample so drop it s.active.head++ s.droppedPackets++ } s.releasePacket(s.filled.head) s.filled.head++ } } // Push adds an RTP Packet to s's buffer. // // Push does not copy the input. If you wish to reuse // this memory make sure to copy before calling Push func (s *SampleBuilder) Push(p *rtp.Packet) { s.buffer[p.SequenceNumber] = p switch s.filled.compare(p.SequenceNumber) { case slCompareVoid: s.filled.head = p.SequenceNumber s.filled.tail = p.SequenceNumber + 1 case slCompareBefore: s.filled.head = p.SequenceNumber case slCompareAfter: s.filled.tail = p.SequenceNumber + 1 case slCompareInside: break } s.purgeBuffers() } const secondToNanoseconds = 1000000000 // buildSample creates a sample from a valid collection of RTP Packets by // walking forwards building a sample if everything looks good clear and // update buffer+values func (s *SampleBuilder) buildSample(purgingBuffers bool) *media.Sample { if s.active.empty() { s.active = s.filled } if s.active.empty() { return nil } if s.filled.compare(s.active.tail) == slCompareInside { s.active.tail = s.filled.tail } var consume sampleSequenceLocation for i := s.active.head; s.buffer[i] != nil && s.active.compare(i) != slCompareAfter; i++ { if s.depacketizer.IsPartitionTail(s.buffer[i].Marker, s.buffer[i].Payload) { consume.head = s.active.head consume.tail = i + 1 break } headTimestamp, hasData := s.fetchTimestamp(s.active) if hasData && s.buffer[i].Timestamp != headTimestamp { consume.head = s.active.head consume.tail = i break } } if consume.empty() { return nil } if !purgingBuffers && s.buffer[consume.tail] == nil { // wait for the next packet after this set of packets to arrive // to ensure at least one post sample timestamp is known // (unless we have to release right now) return nil } sampleTimestamp, _ := s.fetchTimestamp(s.active) afterTimestamp := sampleTimestamp // scan for any packet after the current and use that time stamp as the diff point for i := consume.tail; i < s.active.tail; i++ { if s.buffer[i] != nil { afterTimestamp = s.buffer[i].Timestamp break } } // the head set of packets is now fully consumed s.active.head = consume.tail // prior to decoding all the packets, check if this packet // would end being disposed anyway if !s.depacketizer.IsPartitionHead(s.buffer[consume.head].Payload) { s.droppedPackets += consume.count() s.purgeConsumedLocation(consume, true) s.purgeConsumedBuffers() return nil } // merge all the buffers into a sample data := []byte{} var metadata interface{} for i := consume.head; i != consume.tail; i++ { p, err := s.depacketizer.Unmarshal(s.buffer[i].Payload) if err != nil { return nil } if i == consume.head && s.packetHeadHandler != nil { metadata = s.packetHeadHandler(s.depacketizer) } data = append(data, p...) } samples := afterTimestamp - sampleTimestamp sample := &media.Sample{ Data: data, Duration: time.Duration((float64(samples)/float64(s.sampleRate))*secondToNanoseconds) * time.Nanosecond, PacketTimestamp: sampleTimestamp, PrevDroppedPackets: s.droppedPackets, Metadata: metadata, } s.droppedPackets = 0 s.preparedSamples[s.prepared.tail] = sample s.prepared.tail++ s.purgeConsumedLocation(consume, true) s.purgeConsumedBuffers() return sample } // Pop compiles pushed RTP packets into media samples and then // returns the next valid sample (or nil if no sample is compiled). func (s *SampleBuilder) Pop() *media.Sample { _ = s.buildSample(false) if s.prepared.empty() { return nil } var result *media.Sample result, s.preparedSamples[s.prepared.head] = s.preparedSamples[s.prepared.head], nil s.prepared.head++ return result } // PopWithTimestamp compiles pushed RTP packets into media samples and then // returns the next valid sample with its associated RTP timestamp (or nil, 0 if // no sample is compiled). // // Deprecated: PopWithTimestamp will be removed in v4. Use Sample.PacketTimestamp field instead. func (s *SampleBuilder) PopWithTimestamp() (*media.Sample, uint32) { sample := s.Pop() if sample == nil { return nil, 0 } return sample, sample.PacketTimestamp } // seqnumDistance computes the distance between two sequence numbers func seqnumDistance(x, y uint16) uint16 { diff := int16(x - y) if diff < 0 { return uint16(-diff) } return uint16(diff) } // timestampDistance computes the distance between two timestamps func timestampDistance(x, y uint32) uint32 { diff := int32(x - y) if diff < 0 { return uint32(-diff) } return uint32(diff) } // An Option configures a SampleBuilder. type Option func(o *SampleBuilder) // WithPartitionHeadChecker is obsolete, it does nothing. func WithPartitionHeadChecker(interface{}) Option { return func(o *SampleBuilder) { } } // WithPacketReleaseHandler set a callback when the builder is about to release // some packet. func WithPacketReleaseHandler(h func(*rtp.Packet)) Option { return func(o *SampleBuilder) { o.packetReleaseHandler = h } } // WithPacketHeadHandler set a head packet handler to allow inspecting // the packet to extract certain information and return as custom metadata func WithPacketHeadHandler(h func(headPacket interface{}) interface{}) Option { return func(o *SampleBuilder) { o.packetHeadHandler = h } } // WithMaxTimeDelay ensures that packets that are too old in the buffer get // purged based on time rather than building up an extraordinarily long delay. func WithMaxTimeDelay(maxLateDuration time.Duration) Option { return func(o *SampleBuilder) { totalMillis := maxLateDuration.Milliseconds() o.maxLateTimestamp = uint32(int64(o.sampleRate) * totalMillis / 1000) } }