替换自己实现的jitterbuffer

This commit is contained in:
yangjiechina
2024-06-29 19:19:34 +08:00
parent 678336f653
commit e4a283e642
9 changed files with 111 additions and 988 deletions

View File

@@ -2,7 +2,6 @@ package gb28181
import ( import (
"github.com/pion/rtp" "github.com/pion/rtp"
"github.com/yangjiechina/lkm/jitterbuffer"
"github.com/yangjiechina/lkm/stream" "github.com/yangjiechina/lkm/stream"
) )
@@ -10,32 +9,33 @@ import (
type UDPSource struct { type UDPSource struct {
BaseGBSource BaseGBSource
jitterBuffer *jitterbuffer.JitterBuffer jitterBuffer *stream.JitterBuffer
receiveBuffer *stream.ReceiveBuffer receiveBuffer *stream.ReceiveBuffer
} }
func NewUDPSource() *UDPSource { func NewUDPSource() *UDPSource {
return &UDPSource{ u := &UDPSource{
jitterBuffer: jitterbuffer.New(),
receiveBuffer: stream.NewReceiveBuffer(1500, stream.ReceiveBufferUdpBlockCount+50), receiveBuffer: stream.NewReceiveBuffer(1500, stream.ReceiveBufferUdpBlockCount+50),
} }
u.jitterBuffer = stream.NewJitterBuffer(u.OnOrderedRtp)
return u
} }
func (u UDPSource) TransportType() TransportType { func (u *UDPSource) TransportType() TransportType {
return TransportTypeUDP return TransportTypeUDP
} }
func (u *UDPSource) OnOrderedRtp(packet interface{}) {
u.PublishSource.Input(packet.(*rtp.Packet).Payload)
}
// InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source // InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source
func (u UDPSource) InputRtp(pkt *rtp.Packet) error { func (u *UDPSource) InputRtp(pkt *rtp.Packet) error {
block := u.receiveBuffer.GetBlock() block := u.receiveBuffer.GetBlock()
copy(block, pkt.Payload) copy(block, pkt.Payload)
pkt.Payload = block[:len(pkt.Payload)] pkt.Payload = block[:len(pkt.Payload)]
u.jitterBuffer.Push(pkt) u.jitterBuffer.Push(pkt.SequenceNumber, pkt)
for rtp, _ := u.jitterBuffer.Pop(); rtp != nil; rtp, _ = u.jitterBuffer.Pop() {
u.PublishSource.Input(rtp.Payload)
}
return nil return nil
} }

View File

@@ -1,282 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
// Package jitterbuffer implements a buffer for RTP packets designed to help
// counteract non-deterministic sources of latency
package jitterbuffer
import (
"errors"
"math"
"sync"
"github.com/pion/rtp"
)
// State tracks a JitterBuffer as either Buffering or Emitting
type State uint16
// Event represents all events a JitterBuffer can emit
type Event string
var (
// ErrBufferUnderrun is returned when the buffer has no items
ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer")
// ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state
ErrPopWhileBuffering = errors.New("attempt to pop while buffering")
)
const (
// Buffering is the state when the jitter buffer has not started emitting yet, or has hit an underflow and needs to re-buffer packets
Buffering State = iota
// Emitting is the state when the jitter buffer is operating nominally
Emitting
)
const (
// StartBuffering is emitted when the buffer receives its first packet
StartBuffering Event = "startBuffering"
// BeginPlayback is emitted when the buffer has satisfied its buffer length
BeginPlayback = "playing"
// BufferUnderflow is emitted when the buffer does not have enough packets to Pop
BufferUnderflow = "underflow"
// BufferOverflow is emitted when the buffer has exceeded its limit
BufferOverflow = "overflow"
)
func (jbs State) String() string {
switch jbs {
case Buffering:
return "Buffering"
case Emitting:
return "Emitting"
}
return "unknown"
}
type (
// Option will Override JitterBuffer's defaults
Option func(jb *JitterBuffer)
// EventListener will be called when the corresponding Event occurs
EventListener func(event Event, jb *JitterBuffer)
)
// A JitterBuffer will accept Pushed packets, put them in sequence number
// order, and allows removing in either sequence number order or via a
// provided timestamp
type JitterBuffer struct {
packets *PriorityQueue
minStartCount uint16
lastSequence uint16
playoutHead uint16
playoutReady bool
state State
stats Stats
listeners map[Event][]EventListener
mutex sync.Mutex
}
// Stats Track interesting statistics for the life of this JitterBuffer
// outOfOrderCount will provide the number of times a packet was Pushed
//
// without its predecessor being present
//
// underflowCount will provide the count of attempts to Pop an empty buffer
// overflowCount will track the number of times the jitter buffer exceeds its limit
type Stats struct {
outOfOrderCount uint32
underflowCount uint32
overflowCount uint32
}
// New will initialize a jitter buffer and its associated statistics
func New(opts ...Option) *JitterBuffer {
jb := &JitterBuffer{
state: Buffering,
stats: Stats{0, 0, 0},
minStartCount: 50,
packets: NewQueue(),
listeners: make(map[Event][]EventListener),
}
for _, o := range opts {
o(jb)
}
return jb
}
// WithMinimumPacketCount will set the required number of packets to be received before
// any attempt to pop a packet can succeed
func WithMinimumPacketCount(count uint16) Option {
return func(jb *JitterBuffer) {
jb.minStartCount = count
}
}
// Listen will register an event listener
// The jitter buffer may emit events correspnding, interested listerns should
// look at Event for available events
func (jb *JitterBuffer) Listen(event Event, cb EventListener) {
jb.listeners[event] = append(jb.listeners[event], cb)
}
// PlayoutHead returns the SequenceNumber that will be attempted to Pop next
func (jb *JitterBuffer) PlayoutHead() uint16 {
jb.mutex.Lock()
defer jb.mutex.Unlock()
return jb.playoutHead
}
// SetPlayoutHead allows you to manually specify the packet you wish to pop next
// If you have encountered a packet that hasn't resolved you can skip it
func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
jb.playoutHead = playoutHead
}
func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) {
// If we have at least one packet, and the next packet being pushed in is not
// at the expected sequence number increment the out of order count
if jb.packets.Length() > 0 && lastPktSeqNo != ((jb.lastSequence+1)%math.MaxUint16) {
jb.stats.outOfOrderCount++
}
jb.lastSequence = lastPktSeqNo
}
// Push an RTP packet into the jitter buffer, this does not clone
// the data so if the memory is expected to be reused, the caller should
// take this in to account and pass a copy of the packet they wish to buffer
func (jb *JitterBuffer) Push(packet *rtp.Packet) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() == 0 {
jb.emit(StartBuffering)
}
if jb.packets.Length() > 100 {
jb.stats.overflowCount++
jb.emit(BufferOverflow)
}
if !jb.playoutReady && jb.packets.Length() == 0 {
jb.playoutHead = packet.SequenceNumber
}
jb.updateStats(packet.SequenceNumber)
jb.packets.Push(packet, packet.SequenceNumber)
jb.updateState()
}
func (jb *JitterBuffer) emit(event Event) {
for _, l := range jb.listeners[event] {
l(event, jb)
}
}
func (jb *JitterBuffer) updateState() {
// For now, we only look at the number of packets captured in the play buffer
if jb.packets.Length() >= jb.minStartCount && jb.state == Buffering {
jb.state = Emitting
jb.playoutReady = true
jb.emit(BeginPlayback)
}
}
// Peek at the packet which is either:
//
// At the playout head when we are emitting, and the playoutHead flag is true
//
// or else
//
// At the last sequence received
func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.packets.Length() < 1 {
return nil, ErrBufferUnderrun
}
if playoutHead && jb.state == Emitting {
return jb.packets.Find(jb.playoutHead)
}
return jb.packets.Find(jb.lastSequence)
}
// Pop an RTP packet from the jitter buffer at the current playout head
func (jb *JitterBuffer) Pop() (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAt(jb.playoutHead)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
}
jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16
jb.updateState()
return packet, nil
}
// PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence
func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAt(sq)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
}
jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16
jb.updateState()
return packet, nil
}
// PeekAtSequence will return an RTP packet from the jitter buffer at the specified Sequence
// without removing it from the buffer
func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
packet, err := jb.packets.Find(sq)
if err != nil {
return nil, err
}
return packet, nil
}
// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp
// Call this method repeatedly to drain the buffer at the timestamp
func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
if jb.state != Emitting {
return nil, ErrPopWhileBuffering
}
packet, err := jb.packets.PopAtTimestamp(ts)
if err != nil {
jb.stats.underflowCount++
jb.emit(BufferUnderflow)
return nil, err
}
jb.updateState()
return packet, nil
}
// Clear will empty the buffer and optionally reset the state
func (jb *JitterBuffer) Clear(resetState bool) {
jb.mutex.Lock()
defer jb.mutex.Unlock()
jb.packets.Clear()
if resetState {
jb.lastSequence = 0
jb.state = Buffering
jb.stats = Stats{0, 0, 0}
jb.minStartCount = 50
}
}

View File

@@ -1,238 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package jitterbuffer
import (
"math"
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)
func TestJitterBuffer(t *testing.T) {
assert := assert.New(t)
t.Run("Appends packets in order", func(*testing.T) {
jb := New()
assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})
assert.Equal(jb.lastSequence, uint16(5002))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}})
assert.Equal(jb.stats.outOfOrderCount, uint32(1))
assert.Equal(jb.packets.Length(), uint16(4))
assert.Equal(jb.lastSequence, uint16(5012))
})
t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(5012))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})
t.Run("Appends packets and begins playout", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))
events := make([]Event, 0)
jb.Listen(BeginPlayback, func(event Event, _ *JitterBuffer) {
events = append(events, event)
})
for i := 0; i < 2; i++ {
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(2))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(5012))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
assert.Equal(1, len(events))
assert.Equal(Event(BeginPlayback), events[0])
})
t.Run("Wraps playout correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
assert.Equal(jb.playoutHead, uint16(math.MaxUint16-32))
head, err := jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
head, err := jb.Pop()
if i < 99 {
assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16))
assert.Equal(err, nil)
} else {
assert.Equal(head, (*rtp.Packet)(nil))
}
}
})
t.Run("Pops at timestamp correctly", func(*testing.T) {
jb := New()
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
assert.Equal(jb.packets.Length(), uint16(100))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(513))
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(513))
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)
head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
t.Run("Can peek at a packet", func(*testing.T) {
jb := New()
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})
pkt, err := jb.Peek(false)
assert.Equal(pkt.SequenceNumber, uint16(5002))
assert.Equal(err, nil)
for i := 0; i < 100; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
pkt, err = jb.Peek(true)
assert.Equal(pkt.SequenceNumber, uint16(5000))
assert.Equal(err, nil)
})
t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
assert.Equal(jb.packets.Length(), uint16(52))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtSequence(uint16(9000))
assert.Equal(head, (*rtp.Packet)(nil))
assert.NotEqual(err, nil)
})
t.Run("Pops at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
assert.Equal(jb.packets.Length(), uint16(52))
assert.Equal(jb.state, Emitting)
head, err := jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1019))
assert.Equal(err, nil)
head, err = jb.PopAtTimestamp(uint32(9000))
assert.Equal(head.SequenceNumber, uint16(1020))
assert.Equal(err, nil)
head, err = jb.Pop()
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
t.Run("Peeks at timestamp with multiple packets", func(*testing.T) {
jb := New()
for i := 0; i < 50; i++ {
sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16)
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}})
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}})
assert.Equal(jb.packets.Length(), uint16(52))
assert.Equal(jb.state, Emitting)
head, err := jb.PeekAtSequence(uint16(1019))
assert.Equal(head.SequenceNumber, uint16(1019))
assert.Equal(err, nil)
head, err = jb.PeekAtSequence(uint16(1020))
assert.Equal(head.SequenceNumber, uint16(1020))
assert.Equal(err, nil)
head, err = jb.PopAtSequence(uint16(math.MaxUint16 - 32))
assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32))
assert.Equal(err, nil)
})
t.Run("SetPlayoutHead", func(*testing.T) {
jb := New(WithMinimumPacketCount(1))
// Push packets 0-9, but no packet 4
for i := uint16(0); i < 10; i++ {
if i == 4 {
continue
}
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: i, Timestamp: uint32(512 + i)}, Payload: []byte{0x00}})
}
// The first 3 packets will be able to popped
for i := 0; i < 4; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}
// The next pop will fail because of gap
pkt, err := jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))
// Assert that PlayoutHead isn't modified with pushing/popping again
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 10, Timestamp: uint32(522)}, Payload: []byte{0x00}})
pkt, err = jb.Pop()
assert.ErrorIs(err, ErrNotFound)
assert.Nil(pkt)
assert.Equal(jb.PlayoutHead(), uint16(4))
// Increment the PlayoutHead and popping will work again
jb.SetPlayoutHead(jb.PlayoutHead() + 1)
for i := 0; i < 6; i++ {
pkt, err := jb.Pop()
assert.NoError(err)
assert.NotNil(pkt)
}
})
t.Run("Allows clearing the buffer", func(*testing.T) {
jb := New()
jb.Clear(false)
assert.Equal(jb.lastSequence, uint16(0))
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}})
jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}})
assert.Equal(jb.lastSequence, uint16(5002))
jb.Clear(true)
assert.Equal(jb.lastSequence, uint16(0))
assert.Equal(jb.stats.outOfOrderCount, uint32(0))
assert.Equal(jb.packets.Length(), uint16(0))
})
}

View File

@@ -1,19 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package jitterbuffer
import (
"github.com/pion/logging"
)
// ReceiverInterceptorOption can be used to configure ReceiverInterceptor
type ReceiverInterceptorOption func(d *ReceiverInterceptor) error
// Log sets a logger for the interceptor
func Log(log logging.LeveledLogger) ReceiverInterceptorOption {
return func(d *ReceiverInterceptor) error {
d.log = log
return nil
}
}

View File

@@ -1,189 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package jitterbuffer
import (
"errors"
"github.com/pion/rtp"
)
// PriorityQueue provides a linked list sorting of RTP packets by SequenceNumber
type PriorityQueue struct {
next *node
length uint16
}
type node struct {
val *rtp.Packet
next *node
prev *node
priority uint16
}
var (
// ErrInvalidOperation may be returned if a Pop or Find operation is performed on an empty queue
ErrInvalidOperation = errors.New("attempt to find or pop on an empty list")
// ErrNotFound will be returned if the packet cannot be found in the queue
ErrNotFound = errors.New("priority not found")
)
// NewQueue will create a new PriorityQueue whose order relies on monotonically
// increasing Sequence Number, wrapping at MaxUint16, so
// a packet with sequence number MaxUint16 - 1 will be after 0
func NewQueue() *PriorityQueue {
return &PriorityQueue{
next: nil,
length: 0,
}
}
func newNode(val *rtp.Packet, priority uint16) *node {
return &node{
val: val,
prev: nil,
next: nil,
priority: priority,
}
}
// Find a packet in the queue with the provided sequence number,
// regardless of position (the packet is retained in the queue)
func (q *PriorityQueue) Find(sqNum uint16) (*rtp.Packet, error) {
next := q.next
for next != nil {
if next.priority == sqNum {
return next.val, nil
}
next = next.next
}
return nil, ErrNotFound
}
// Push will insert a packet in to the queue in order of sequence number
func (q *PriorityQueue) Push(val *rtp.Packet, priority uint16) {
newPq := newNode(val, priority)
if q.next == nil {
q.next = newPq
q.length++
return
}
if priority < q.next.priority {
newPq.next = q.next
q.next.prev = newPq
q.next = newPq
q.length++
return
}
head := q.next
prev := q.next
for head != nil {
if priority <= head.priority {
break
}
prev = head
head = head.next
}
if head == nil {
if prev != nil {
prev.next = newPq
}
newPq.prev = prev
} else {
newPq.next = head
newPq.prev = prev
if prev != nil {
prev.next = newPq
}
head.prev = newPq
}
q.length++
}
// Length will get the total length of the queue
func (q *PriorityQueue) Length() uint16 {
return q.length
}
// Pop removes the first element from the queue, regardless
// sequence number
func (q *PriorityQueue) Pop() (*rtp.Packet, error) {
if q.next == nil {
return nil, ErrInvalidOperation
}
val := q.next.val
q.length--
q.next = q.next.next
return val, nil
}
// PopAt removes an element at the specified sequence number (priority)
func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) {
if q.next == nil {
return nil, ErrInvalidOperation
}
if q.next.priority == sqNum {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
prev := q.next.prev
for pos != nil {
if pos.priority == sqNum {
val := pos.val
prev.next = pos.next
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
pos = pos.next
}
return nil, ErrNotFound
}
// PopAtTimestamp removes and returns a packet at the given RTP Timestamp, regardless
// sequence number order
func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) {
if q.next == nil {
return nil, ErrInvalidOperation
}
if q.next.val.Timestamp == timestamp {
val := q.next.val
q.next = q.next.next
q.length--
return val, nil
}
pos := q.next
prev := q.next.prev
for pos != nil {
if pos.val.Timestamp == timestamp {
val := pos.val
prev.next = pos.next
if prev.next != nil {
prev.next.prev = prev
}
q.length--
return val, nil
}
prev = pos
pos = pos.next
}
return nil, ErrNotFound
}
// Clear will empty a PriorityQueue
func (q *PriorityQueue) Clear() {
next := q.next
q.length = 0
for next != nil {
next.prev = nil
next = next.next
}
}

View File

@@ -1,138 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package jitterbuffer
import (
"testing"
"github.com/pion/rtp"
"github.com/stretchr/testify/assert"
)
func TestPriorityQueue(t *testing.T) {
assert := assert.New(t)
t.Run("Appends packets in order", func(*testing.T) {
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q := NewQueue()
q.Push(pkt, pkt.SequenceNumber)
pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt2, pkt2.SequenceNumber)
assert.Equal(q.next.next.val, pkt2)
assert.Equal(q.next.priority, uint16(5000))
assert.Equal(q.next.next.priority, uint16(5004))
})
t.Run("Appends many in order", func(*testing.T) {
q := NewQueue()
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
assert.Equal(uint16(100), q.Length())
last := (*node)(nil)
cur := q.next
for cur != nil {
last = cur
cur = cur.next
if cur != nil {
assert.Equal(cur.priority, last.priority+1)
}
}
assert.Equal(q.next.priority, uint16(5012))
assert.Equal(last.priority, uint16(5012+99))
})
t.Run("Can remove an element", func(*testing.T) {
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q := NewQueue()
q.Push(pkt, pkt.SequenceNumber)
pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt2, pkt2.SequenceNumber)
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
popped, _ := q.Pop()
assert.Equal(popped.SequenceNumber, uint16(5000))
_, _ = q.Pop()
nextPop, _ := q.Pop()
assert.Equal(nextPop.SequenceNumber, uint16(5012))
})
t.Run("Appends in order", func(*testing.T) {
q := NewQueue()
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
assert.Equal(uint16(100), q.Length())
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt, pkt.SequenceNumber)
assert.Equal(pkt, q.next.val)
assert.Equal(uint16(101), q.Length())
assert.Equal(q.next.priority, uint16(5000))
})
t.Run("Can find", func(*testing.T) {
q := NewQueue()
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
pkt, err := q.Find(5012)
assert.Equal(pkt.SequenceNumber, uint16(5012))
assert.Equal(err, nil)
})
t.Run("Updates the length when PopAt* are called", func(*testing.T) {
pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}
q := NewQueue()
q.Push(pkt, pkt.SequenceNumber)
pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}}
q.Push(pkt2, pkt2.SequenceNumber)
for i := 0; i < 100; i++ {
q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i))
}
assert.Equal(uint16(102), q.Length())
popped, _ := q.PopAt(uint16(5012))
assert.Equal(popped.SequenceNumber, uint16(5012))
assert.Equal(uint16(101), q.Length())
popped, err := q.PopAtTimestamp(uint32(500))
assert.Equal(popped.SequenceNumber, uint16(5000))
assert.Equal(uint16(100), q.Length())
assert.Equal(err, nil)
})
}
func TestPriorityQueue_Find(t *testing.T) {
packets := NewQueue()
packets.Push(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1000,
Timestamp: 5,
SSRC: 5,
},
Payload: []uint8{0xA},
}, 1000)
_, err := packets.PopAt(1000)
assert.NoError(t, err)
_, err = packets.Find(1001)
assert.Error(t, err)
}
func TestPriorityQueue_Clean(t *testing.T) {
packets := NewQueue()
packets.Clear()
packets.Push(&rtp.Packet{
Header: rtp.Header{
SequenceNumber: 1000,
Timestamp: 5,
SSRC: 5,
},
Payload: []uint8{0xA},
}, 1000)
assert.EqualValues(t, 1, packets.Length())
packets.Clear()
}

View File

@@ -1,110 +0,0 @@
// SPDX-FileCopyrightText: 2023 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT
package jitterbuffer
import (
"sync"
"github.com/pion/interceptor"
"github.com/pion/logging"
"github.com/pion/rtp"
)
// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor
type InterceptorFactory struct {
opts []ReceiverInterceptorOption
}
// NewInterceptor constructs a new ReceiverInterceptor
func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) {
i := &ReceiverInterceptor{
close: make(chan struct{}),
log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"),
buffer: New(),
}
for _, opt := range g.opts {
if err := opt(i); err != nil {
return nil, err
}
}
return i, nil
}
// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival
// and allow for network jitter
//
// The Interceptor is designed to fit in a RemoteStream
// pipeline and buffer incoming packets for a short period (currently
// defaulting to 50 packets) before emitting packets to be consumed by the
// next step in the pipeline.
//
// The caller must ensure they are prepared to handle an
// ErrPopWhileBuffering in the case that insufficient packets have been
// received by the jitter buffer. The caller should retry the operation
// at some point later as the buffer may have been filled in the interim.
//
// The caller should also be aware that an ErrBufferUnderrun may be
// returned in the case that the initial buffering was sufficient and
// playback began but the caller is consuming packets (or they are not
// arriving) quickly enough.
type ReceiverInterceptor struct {
interceptor.NoOp
buffer *JitterBuffer
m sync.Mutex
wg sync.WaitGroup
close chan struct{}
log logging.LeveledLogger
}
// NewInterceptor returns a new InterceptorFactory
func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) {
return &InterceptorFactory{opts}, nil
}
// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method
// will be called once per rtp packet.
func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader {
return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) {
buf := make([]byte, len(b))
n, attr, err := reader.Read(buf, a)
if err != nil {
return n, attr, err
}
packet := &rtp.Packet{}
if err := packet.Unmarshal(buf); err != nil {
return 0, nil, err
}
i.m.Lock()
defer i.m.Unlock()
i.buffer.Push(packet)
if i.buffer.state == Emitting {
newPkt, err := i.buffer.Pop()
if err != nil {
return 0, nil, err
}
nlen, err := newPkt.MarshalTo(b)
return nlen, attr, err
}
return n, attr, ErrPopWhileBuffering
})
}
// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track.
func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
}
// Close closes the interceptor
func (i *ReceiverInterceptor) Close() error {
defer i.wg.Wait()
i.m.Lock()
defer i.m.Unlock()
i.buffer.Clear(true)
return nil
}

74
stream/jitter_buffer.go Normal file
View File

@@ -0,0 +1,74 @@
package stream
import "math"
type JitterBuffer struct {
maxSeqNum uint16
minSeqNum uint16
nextSeqNum uint16
count int
minStartCount int
queue []interface{}
onPacket func(packet interface{})
first bool
}
func (j *JitterBuffer) emit() {
if j.first {
j.nextSeqNum = j.minSeqNum
j.first = false
}
if j.nextSeqNum > j.maxSeqNum {
j.nextSeqNum = j.minSeqNum
}
for j.queue[j.nextSeqNum] == nil {
j.nextSeqNum++
}
j.onPacket(j.queue[j.nextSeqNum])
j.queue[j.nextSeqNum] = nil
j.nextSeqNum++
j.minSeqNum = uint16(math.Min(float64(j.nextSeqNum), float64(j.maxSeqNum)))
j.count--
}
func (j *JitterBuffer) Push(seq uint16, packet interface{}) {
if j.count == 0 {
j.minSeqNum = seq
j.maxSeqNum = seq
}
if j.queue[seq] == nil {
j.queue[seq] = packet
j.count++
}
j.minSeqNum = uint16(math.Min(float64(j.minSeqNum), float64(seq)))
j.maxSeqNum = uint16(math.Max(float64(j.maxSeqNum), float64(seq)))
if j.count > j.minStartCount {
j.emit()
}
}
func (j *JitterBuffer) Flush() {
for j.count > 0 {
j.emit()
}
}
func (j *JitterBuffer) Close() {
j.onPacket = nil
}
func NewJitterBuffer(handler func(packet interface{})) *JitterBuffer {
return &JitterBuffer{
queue: make([]interface{}, 0xFFFF+1),
onPacket: handler,
minStartCount: 50,
first: true,
}
}

View File

@@ -0,0 +1,25 @@
package stream
import (
"math/rand"
"strconv"
"testing"
"time"
)
func TestName(t *testing.T) {
// 设置随机数种子,确保每次运行程序时都能得到不同的随机序列
rand.Seed(time.Now().UnixNano())
buffer := NewJitterBuffer(func(packet interface{}) {
println(packet.(string))
})
for i := 0; i < 65535; i++ {
// 生成1到65535之间的随机数
randomNumber := rand.Intn(65535) + 1
buffer.Push(uint16(randomNumber), strconv.Itoa(randomNumber))
}
buffer.Flush()
buffer.Close()
}