diff --git a/gb28181/source_udp.go b/gb28181/source_udp.go index e4c6d3e..5e49baf 100644 --- a/gb28181/source_udp.go +++ b/gb28181/source_udp.go @@ -2,7 +2,6 @@ package gb28181 import ( "github.com/pion/rtp" - "github.com/yangjiechina/lkm/jitterbuffer" "github.com/yangjiechina/lkm/stream" ) @@ -10,32 +9,33 @@ import ( type UDPSource struct { BaseGBSource - jitterBuffer *jitterbuffer.JitterBuffer + jitterBuffer *stream.JitterBuffer receiveBuffer *stream.ReceiveBuffer } func NewUDPSource() *UDPSource { - return &UDPSource{ - jitterBuffer: jitterbuffer.New(), + u := &UDPSource{ receiveBuffer: stream.NewReceiveBuffer(1500, stream.ReceiveBufferUdpBlockCount+50), } + + u.jitterBuffer = stream.NewJitterBuffer(u.OnOrderedRtp) + return u } -func (u UDPSource) TransportType() TransportType { +func (u *UDPSource) TransportType() TransportType { return TransportTypeUDP } +func (u *UDPSource) OnOrderedRtp(packet interface{}) { + u.PublishSource.Input(packet.(*rtp.Packet).Payload) +} + // InputRtp udp收流会先拷贝rtp包,交给jitter buffer处理后再发给source -func (u UDPSource) InputRtp(pkt *rtp.Packet) error { +func (u *UDPSource) InputRtp(pkt *rtp.Packet) error { block := u.receiveBuffer.GetBlock() copy(block, pkt.Payload) pkt.Payload = block[:len(pkt.Payload)] - u.jitterBuffer.Push(pkt) - - for rtp, _ := u.jitterBuffer.Pop(); rtp != nil; rtp, _ = u.jitterBuffer.Pop() { - u.PublishSource.Input(rtp.Payload) - } - + u.jitterBuffer.Push(pkt.SequenceNumber, pkt) return nil } diff --git a/jitterbuffer/jitter_buffer.go b/jitterbuffer/jitter_buffer.go deleted file mode 100644 index 976ea76..0000000 --- a/jitterbuffer/jitter_buffer.go +++ /dev/null @@ -1,282 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -// Package jitterbuffer implements a buffer for RTP packets designed to help -// counteract non-deterministic sources of latency -package jitterbuffer - -import ( - "errors" - "math" - "sync" - - "github.com/pion/rtp" -) - -// State tracks a JitterBuffer as either Buffering or Emitting -type State uint16 - -// Event represents all events a JitterBuffer can emit -type Event string - -var ( - // ErrBufferUnderrun is returned when the buffer has no items - ErrBufferUnderrun = errors.New("invalid Peek: Empty jitter buffer") - // ErrPopWhileBuffering is returned if a jitter buffer is not in a playback state - ErrPopWhileBuffering = errors.New("attempt to pop while buffering") -) - -const ( - // Buffering is the state when the jitter buffer has not started emitting yet, or has hit an underflow and needs to re-buffer packets - Buffering State = iota - // Emitting is the state when the jitter buffer is operating nominally - Emitting -) - -const ( - // StartBuffering is emitted when the buffer receives its first packet - StartBuffering Event = "startBuffering" - // BeginPlayback is emitted when the buffer has satisfied its buffer length - BeginPlayback = "playing" - // BufferUnderflow is emitted when the buffer does not have enough packets to Pop - BufferUnderflow = "underflow" - // BufferOverflow is emitted when the buffer has exceeded its limit - BufferOverflow = "overflow" -) - -func (jbs State) String() string { - switch jbs { - case Buffering: - return "Buffering" - case Emitting: - return "Emitting" - } - return "unknown" -} - -type ( - // Option will Override JitterBuffer's defaults - Option func(jb *JitterBuffer) - // EventListener will be called when the corresponding Event occurs - EventListener func(event Event, jb *JitterBuffer) -) - -// A JitterBuffer will accept Pushed packets, put them in sequence number -// order, and allows removing in either sequence number order or via a -// provided timestamp -type JitterBuffer struct { - packets *PriorityQueue - minStartCount uint16 - lastSequence uint16 - playoutHead uint16 - playoutReady bool - state State - stats Stats - listeners map[Event][]EventListener - mutex sync.Mutex -} - -// Stats Track interesting statistics for the life of this JitterBuffer -// outOfOrderCount will provide the number of times a packet was Pushed -// -// without its predecessor being present -// -// underflowCount will provide the count of attempts to Pop an empty buffer -// overflowCount will track the number of times the jitter buffer exceeds its limit -type Stats struct { - outOfOrderCount uint32 - underflowCount uint32 - overflowCount uint32 -} - -// New will initialize a jitter buffer and its associated statistics -func New(opts ...Option) *JitterBuffer { - jb := &JitterBuffer{ - state: Buffering, - stats: Stats{0, 0, 0}, - minStartCount: 50, - packets: NewQueue(), - listeners: make(map[Event][]EventListener), - } - - for _, o := range opts { - o(jb) - } - - return jb -} - -// WithMinimumPacketCount will set the required number of packets to be received before -// any attempt to pop a packet can succeed -func WithMinimumPacketCount(count uint16) Option { - return func(jb *JitterBuffer) { - jb.minStartCount = count - } -} - -// Listen will register an event listener -// The jitter buffer may emit events correspnding, interested listerns should -// look at Event for available events -func (jb *JitterBuffer) Listen(event Event, cb EventListener) { - jb.listeners[event] = append(jb.listeners[event], cb) -} - -// PlayoutHead returns the SequenceNumber that will be attempted to Pop next -func (jb *JitterBuffer) PlayoutHead() uint16 { - jb.mutex.Lock() - defer jb.mutex.Unlock() - - return jb.playoutHead -} - -// SetPlayoutHead allows you to manually specify the packet you wish to pop next -// If you have encountered a packet that hasn't resolved you can skip it -func (jb *JitterBuffer) SetPlayoutHead(playoutHead uint16) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - - jb.playoutHead = playoutHead -} - -func (jb *JitterBuffer) updateStats(lastPktSeqNo uint16) { - // If we have at least one packet, and the next packet being pushed in is not - // at the expected sequence number increment the out of order count - if jb.packets.Length() > 0 && lastPktSeqNo != ((jb.lastSequence+1)%math.MaxUint16) { - jb.stats.outOfOrderCount++ - } - jb.lastSequence = lastPktSeqNo -} - -// Push an RTP packet into the jitter buffer, this does not clone -// the data so if the memory is expected to be reused, the caller should -// take this in to account and pass a copy of the packet they wish to buffer -func (jb *JitterBuffer) Push(packet *rtp.Packet) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - if jb.packets.Length() == 0 { - jb.emit(StartBuffering) - } - if jb.packets.Length() > 100 { - jb.stats.overflowCount++ - jb.emit(BufferOverflow) - } - if !jb.playoutReady && jb.packets.Length() == 0 { - jb.playoutHead = packet.SequenceNumber - } - jb.updateStats(packet.SequenceNumber) - jb.packets.Push(packet, packet.SequenceNumber) - jb.updateState() -} - -func (jb *JitterBuffer) emit(event Event) { - for _, l := range jb.listeners[event] { - l(event, jb) - } -} - -func (jb *JitterBuffer) updateState() { - // For now, we only look at the number of packets captured in the play buffer - if jb.packets.Length() >= jb.minStartCount && jb.state == Buffering { - jb.state = Emitting - jb.playoutReady = true - jb.emit(BeginPlayback) - } -} - -// Peek at the packet which is either: -// -// At the playout head when we are emitting, and the playoutHead flag is true -// -// or else -// -// At the last sequence received -func (jb *JitterBuffer) Peek(playoutHead bool) (*rtp.Packet, error) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - if jb.packets.Length() < 1 { - return nil, ErrBufferUnderrun - } - if playoutHead && jb.state == Emitting { - return jb.packets.Find(jb.playoutHead) - } - return jb.packets.Find(jb.lastSequence) -} - -// Pop an RTP packet from the jitter buffer at the current playout head -func (jb *JitterBuffer) Pop() (*rtp.Packet, error) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - if jb.state != Emitting { - return nil, ErrPopWhileBuffering - } - packet, err := jb.packets.PopAt(jb.playoutHead) - if err != nil { - jb.stats.underflowCount++ - jb.emit(BufferUnderflow) - return nil, err - } - jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16 - jb.updateState() - return packet, nil -} - -// PopAtSequence will pop an RTP packet from the jitter buffer at the specified Sequence -func (jb *JitterBuffer) PopAtSequence(sq uint16) (*rtp.Packet, error) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - if jb.state != Emitting { - return nil, ErrPopWhileBuffering - } - packet, err := jb.packets.PopAt(sq) - if err != nil { - jb.stats.underflowCount++ - jb.emit(BufferUnderflow) - return nil, err - } - jb.playoutHead = (jb.playoutHead + 1) % math.MaxUint16 - jb.updateState() - return packet, nil -} - -// PeekAtSequence will return an RTP packet from the jitter buffer at the specified Sequence -// without removing it from the buffer -func (jb *JitterBuffer) PeekAtSequence(sq uint16) (*rtp.Packet, error) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - packet, err := jb.packets.Find(sq) - if err != nil { - return nil, err - } - return packet, nil -} - -// PopAtTimestamp pops an RTP packet from the jitter buffer with the provided timestamp -// Call this method repeatedly to drain the buffer at the timestamp -func (jb *JitterBuffer) PopAtTimestamp(ts uint32) (*rtp.Packet, error) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - if jb.state != Emitting { - return nil, ErrPopWhileBuffering - } - packet, err := jb.packets.PopAtTimestamp(ts) - if err != nil { - jb.stats.underflowCount++ - jb.emit(BufferUnderflow) - return nil, err - } - jb.updateState() - return packet, nil -} - -// Clear will empty the buffer and optionally reset the state -func (jb *JitterBuffer) Clear(resetState bool) { - jb.mutex.Lock() - defer jb.mutex.Unlock() - jb.packets.Clear() - if resetState { - jb.lastSequence = 0 - jb.state = Buffering - jb.stats = Stats{0, 0, 0} - jb.minStartCount = 50 - } -} diff --git a/jitterbuffer/jitter_buffer_test.go b/jitterbuffer/jitter_buffer_test.go deleted file mode 100644 index 205e610..0000000 --- a/jitterbuffer/jitter_buffer_test.go +++ /dev/null @@ -1,238 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -package jitterbuffer - -import ( - "math" - "testing" - - "github.com/pion/rtp" - "github.com/stretchr/testify/assert" -) - -func TestJitterBuffer(t *testing.T) { - assert := assert.New(t) - - t.Run("Appends packets in order", func(*testing.T) { - jb := New() - assert.Equal(jb.lastSequence, uint16(0)) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) - - assert.Equal(jb.lastSequence, uint16(5002)) - - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5012, Timestamp: 512}, Payload: []byte{0x02}}) - - assert.Equal(jb.stats.outOfOrderCount, uint32(1)) - assert.Equal(jb.packets.Length(), uint16(4)) - assert.Equal(jb.lastSequence, uint16(5012)) - }) - - t.Run("Appends packets and begins playout", func(*testing.T) { - jb := New() - for i := 0; i < 100; i++ { - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - assert.Equal(jb.packets.Length(), uint16(100)) - assert.Equal(jb.state, Emitting) - assert.Equal(jb.playoutHead, uint16(5012)) - head, err := jb.Pop() - assert.Equal(head.SequenceNumber, uint16(5012)) - assert.Equal(err, nil) - }) - t.Run("Appends packets and begins playout", func(*testing.T) { - jb := New(WithMinimumPacketCount(1)) - events := make([]Event, 0) - jb.Listen(BeginPlayback, func(event Event, _ *JitterBuffer) { - events = append(events, event) - }) - for i := 0; i < 2; i++ { - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - assert.Equal(jb.packets.Length(), uint16(2)) - assert.Equal(jb.state, Emitting) - assert.Equal(jb.playoutHead, uint16(5012)) - head, err := jb.Pop() - assert.Equal(head.SequenceNumber, uint16(5012)) - assert.Equal(err, nil) - assert.Equal(1, len(events)) - assert.Equal(Event(BeginPlayback), events[0]) - }) - - t.Run("Wraps playout correctly", func(*testing.T) { - jb := New() - for i := 0; i < 100; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - assert.Equal(jb.packets.Length(), uint16(100)) - assert.Equal(jb.state, Emitting) - assert.Equal(jb.playoutHead, uint16(math.MaxUint16-32)) - head, err := jb.Pop() - assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) - assert.Equal(err, nil) - for i := 0; i < 100; i++ { - head, err := jb.Pop() - if i < 99 { - assert.Equal(head.SequenceNumber, uint16((math.MaxUint16-31+i)%math.MaxUint16)) - assert.Equal(err, nil) - } else { - assert.Equal(head, (*rtp.Packet)(nil)) - } - } - }) - - t.Run("Pops at timestamp correctly", func(*testing.T) { - jb := New() - for i := 0; i < 100; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - assert.Equal(jb.packets.Length(), uint16(100)) - assert.Equal(jb.state, Emitting) - head, err := jb.PopAtTimestamp(uint32(513)) - assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32+1)) - assert.Equal(err, nil) - head, err = jb.PopAtTimestamp(uint32(513)) - assert.Equal(head, (*rtp.Packet)(nil)) - assert.NotEqual(err, nil) - - head, err = jb.Pop() - assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) - assert.Equal(err, nil) - }) - - t.Run("Can peek at a packet", func(*testing.T) { - jb := New() - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) - pkt, err := jb.Peek(false) - assert.Equal(pkt.SequenceNumber, uint16(5002)) - assert.Equal(err, nil) - for i := 0; i < 100; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - pkt, err = jb.Peek(true) - assert.Equal(pkt.SequenceNumber, uint16(5000)) - assert.Equal(err, nil) - }) - - t.Run("Pops at sequence with an invalid sequence number", func(*testing.T) { - jb := New() - for i := 0; i < 50; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - assert.Equal(jb.packets.Length(), uint16(52)) - assert.Equal(jb.state, Emitting) - head, err := jb.PopAtSequence(uint16(9000)) - assert.Equal(head, (*rtp.Packet)(nil)) - assert.NotEqual(err, nil) - }) - - t.Run("Pops at timestamp with multiple packets", func(*testing.T) { - jb := New() - for i := 0; i < 50; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - assert.Equal(jb.packets.Length(), uint16(52)) - assert.Equal(jb.state, Emitting) - head, err := jb.PopAtTimestamp(uint32(9000)) - assert.Equal(head.SequenceNumber, uint16(1019)) - assert.Equal(err, nil) - head, err = jb.PopAtTimestamp(uint32(9000)) - assert.Equal(head.SequenceNumber, uint16(1020)) - assert.Equal(err, nil) - - head, err = jb.Pop() - assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) - assert.Equal(err, nil) - }) - - t.Run("Peeks at timestamp with multiple packets", func(*testing.T) { - jb := New() - for i := 0; i < 50; i++ { - sqnum := uint16((math.MaxUint16 - 32 + i) % math.MaxUint16) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: sqnum, Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}) - } - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1019, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 1020, Timestamp: uint32(9000)}, Payload: []byte{0x02}}) - assert.Equal(jb.packets.Length(), uint16(52)) - assert.Equal(jb.state, Emitting) - head, err := jb.PeekAtSequence(uint16(1019)) - assert.Equal(head.SequenceNumber, uint16(1019)) - assert.Equal(err, nil) - head, err = jb.PeekAtSequence(uint16(1020)) - assert.Equal(head.SequenceNumber, uint16(1020)) - assert.Equal(err, nil) - - head, err = jb.PopAtSequence(uint16(math.MaxUint16 - 32)) - assert.Equal(head.SequenceNumber, uint16(math.MaxUint16-32)) - assert.Equal(err, nil) - }) - - t.Run("SetPlayoutHead", func(*testing.T) { - jb := New(WithMinimumPacketCount(1)) - - // Push packets 0-9, but no packet 4 - for i := uint16(0); i < 10; i++ { - if i == 4 { - continue - } - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: i, Timestamp: uint32(512 + i)}, Payload: []byte{0x00}}) - } - - // The first 3 packets will be able to popped - for i := 0; i < 4; i++ { - pkt, err := jb.Pop() - assert.NoError(err) - assert.NotNil(pkt) - } - - // The next pop will fail because of gap - pkt, err := jb.Pop() - assert.ErrorIs(err, ErrNotFound) - assert.Nil(pkt) - assert.Equal(jb.PlayoutHead(), uint16(4)) - - // Assert that PlayoutHead isn't modified with pushing/popping again - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 10, Timestamp: uint32(522)}, Payload: []byte{0x00}}) - pkt, err = jb.Pop() - assert.ErrorIs(err, ErrNotFound) - assert.Nil(pkt) - assert.Equal(jb.PlayoutHead(), uint16(4)) - - // Increment the PlayoutHead and popping will work again - jb.SetPlayoutHead(jb.PlayoutHead() + 1) - for i := 0; i < 6; i++ { - pkt, err := jb.Pop() - assert.NoError(err) - assert.NotNil(pkt) - } - }) - - t.Run("Allows clearing the buffer", func(*testing.T) { - jb := New() - jb.Clear(false) - - assert.Equal(jb.lastSequence, uint16(0)) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5001, Timestamp: 501}, Payload: []byte{0x02}}) - jb.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: 5002, Timestamp: 502}, Payload: []byte{0x02}}) - - assert.Equal(jb.lastSequence, uint16(5002)) - jb.Clear(true) - assert.Equal(jb.lastSequence, uint16(0)) - assert.Equal(jb.stats.outOfOrderCount, uint32(0)) - assert.Equal(jb.packets.Length(), uint16(0)) - }) -} diff --git a/jitterbuffer/option.go b/jitterbuffer/option.go deleted file mode 100644 index 9a33c22..0000000 --- a/jitterbuffer/option.go +++ /dev/null @@ -1,19 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -package jitterbuffer - -import ( - "github.com/pion/logging" -) - -// ReceiverInterceptorOption can be used to configure ReceiverInterceptor -type ReceiverInterceptorOption func(d *ReceiverInterceptor) error - -// Log sets a logger for the interceptor -func Log(log logging.LeveledLogger) ReceiverInterceptorOption { - return func(d *ReceiverInterceptor) error { - d.log = log - return nil - } -} diff --git a/jitterbuffer/priority_queue.go b/jitterbuffer/priority_queue.go deleted file mode 100644 index f6d7d93..0000000 --- a/jitterbuffer/priority_queue.go +++ /dev/null @@ -1,189 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -package jitterbuffer - -import ( - "errors" - - "github.com/pion/rtp" -) - -// PriorityQueue provides a linked list sorting of RTP packets by SequenceNumber -type PriorityQueue struct { - next *node - length uint16 -} - -type node struct { - val *rtp.Packet - next *node - prev *node - priority uint16 -} - -var ( - // ErrInvalidOperation may be returned if a Pop or Find operation is performed on an empty queue - ErrInvalidOperation = errors.New("attempt to find or pop on an empty list") - // ErrNotFound will be returned if the packet cannot be found in the queue - ErrNotFound = errors.New("priority not found") -) - -// NewQueue will create a new PriorityQueue whose order relies on monotonically -// increasing Sequence Number, wrapping at MaxUint16, so -// a packet with sequence number MaxUint16 - 1 will be after 0 -func NewQueue() *PriorityQueue { - return &PriorityQueue{ - next: nil, - length: 0, - } -} - -func newNode(val *rtp.Packet, priority uint16) *node { - return &node{ - val: val, - prev: nil, - next: nil, - priority: priority, - } -} - -// Find a packet in the queue with the provided sequence number, -// regardless of position (the packet is retained in the queue) -func (q *PriorityQueue) Find(sqNum uint16) (*rtp.Packet, error) { - next := q.next - for next != nil { - if next.priority == sqNum { - return next.val, nil - } - next = next.next - } - - return nil, ErrNotFound -} - -// Push will insert a packet in to the queue in order of sequence number -func (q *PriorityQueue) Push(val *rtp.Packet, priority uint16) { - newPq := newNode(val, priority) - if q.next == nil { - q.next = newPq - q.length++ - return - } - if priority < q.next.priority { - newPq.next = q.next - q.next.prev = newPq - q.next = newPq - q.length++ - return - } - head := q.next - prev := q.next - for head != nil { - if priority <= head.priority { - break - } - prev = head - head = head.next - } - if head == nil { - if prev != nil { - prev.next = newPq - } - newPq.prev = prev - } else { - newPq.next = head - newPq.prev = prev - if prev != nil { - prev.next = newPq - } - head.prev = newPq - } - q.length++ -} - -// Length will get the total length of the queue -func (q *PriorityQueue) Length() uint16 { - return q.length -} - -// Pop removes the first element from the queue, regardless -// sequence number -func (q *PriorityQueue) Pop() (*rtp.Packet, error) { - if q.next == nil { - return nil, ErrInvalidOperation - } - val := q.next.val - q.length-- - q.next = q.next.next - return val, nil -} - -// PopAt removes an element at the specified sequence number (priority) -func (q *PriorityQueue) PopAt(sqNum uint16) (*rtp.Packet, error) { - if q.next == nil { - return nil, ErrInvalidOperation - } - if q.next.priority == sqNum { - val := q.next.val - q.next = q.next.next - q.length-- - return val, nil - } - pos := q.next - prev := q.next.prev - for pos != nil { - if pos.priority == sqNum { - val := pos.val - prev.next = pos.next - if prev.next != nil { - prev.next.prev = prev - } - q.length-- - return val, nil - } - prev = pos - pos = pos.next - } - return nil, ErrNotFound -} - -// PopAtTimestamp removes and returns a packet at the given RTP Timestamp, regardless -// sequence number order -func (q *PriorityQueue) PopAtTimestamp(timestamp uint32) (*rtp.Packet, error) { - if q.next == nil { - return nil, ErrInvalidOperation - } - if q.next.val.Timestamp == timestamp { - val := q.next.val - q.next = q.next.next - q.length-- - return val, nil - } - pos := q.next - prev := q.next.prev - for pos != nil { - if pos.val.Timestamp == timestamp { - val := pos.val - prev.next = pos.next - if prev.next != nil { - prev.next.prev = prev - } - q.length-- - return val, nil - } - prev = pos - pos = pos.next - } - return nil, ErrNotFound -} - -// Clear will empty a PriorityQueue -func (q *PriorityQueue) Clear() { - next := q.next - q.length = 0 - for next != nil { - next.prev = nil - next = next.next - } -} diff --git a/jitterbuffer/priority_queue_test.go b/jitterbuffer/priority_queue_test.go deleted file mode 100644 index 7fb2a7a..0000000 --- a/jitterbuffer/priority_queue_test.go +++ /dev/null @@ -1,138 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -package jitterbuffer - -import ( - "testing" - - "github.com/pion/rtp" - "github.com/stretchr/testify/assert" -) - -func TestPriorityQueue(t *testing.T) { - assert := assert.New(t) - - t.Run("Appends packets in order", func(*testing.T) { - pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} - q := NewQueue() - q.Push(pkt, pkt.SequenceNumber) - pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} - q.Push(pkt2, pkt2.SequenceNumber) - assert.Equal(q.next.next.val, pkt2) - assert.Equal(q.next.priority, uint16(5000)) - assert.Equal(q.next.next.priority, uint16(5004)) - }) - - t.Run("Appends many in order", func(*testing.T) { - q := NewQueue() - for i := 0; i < 100; i++ { - q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) - } - assert.Equal(uint16(100), q.Length()) - last := (*node)(nil) - cur := q.next - for cur != nil { - last = cur - cur = cur.next - if cur != nil { - assert.Equal(cur.priority, last.priority+1) - } - } - assert.Equal(q.next.priority, uint16(5012)) - assert.Equal(last.priority, uint16(5012+99)) - }) - - t.Run("Can remove an element", func(*testing.T) { - pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} - q := NewQueue() - q.Push(pkt, pkt.SequenceNumber) - pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} - q.Push(pkt2, pkt2.SequenceNumber) - for i := 0; i < 100; i++ { - q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) - } - popped, _ := q.Pop() - assert.Equal(popped.SequenceNumber, uint16(5000)) - _, _ = q.Pop() - nextPop, _ := q.Pop() - assert.Equal(nextPop.SequenceNumber, uint16(5012)) - }) - - t.Run("Appends in order", func(*testing.T) { - q := NewQueue() - for i := 0; i < 100; i++ { - q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) - } - assert.Equal(uint16(100), q.Length()) - pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} - q.Push(pkt, pkt.SequenceNumber) - assert.Equal(pkt, q.next.val) - assert.Equal(uint16(101), q.Length()) - assert.Equal(q.next.priority, uint16(5000)) - }) - - t.Run("Can find", func(*testing.T) { - q := NewQueue() - for i := 0; i < 100; i++ { - q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) - } - pkt, err := q.Find(5012) - assert.Equal(pkt.SequenceNumber, uint16(5012)) - assert.Equal(err, nil) - }) - - t.Run("Updates the length when PopAt* are called", func(*testing.T) { - pkt := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5000, Timestamp: 500}, Payload: []byte{0x02}} - q := NewQueue() - q.Push(pkt, pkt.SequenceNumber) - pkt2 := &rtp.Packet{Header: rtp.Header{SequenceNumber: 5004, Timestamp: 500}, Payload: []byte{0x02}} - q.Push(pkt2, pkt2.SequenceNumber) - for i := 0; i < 100; i++ { - q.Push(&rtp.Packet{Header: rtp.Header{SequenceNumber: uint16(5012 + i), Timestamp: uint32(512 + i)}, Payload: []byte{0x02}}, uint16(5012+i)) - } - assert.Equal(uint16(102), q.Length()) - popped, _ := q.PopAt(uint16(5012)) - assert.Equal(popped.SequenceNumber, uint16(5012)) - assert.Equal(uint16(101), q.Length()) - - popped, err := q.PopAtTimestamp(uint32(500)) - assert.Equal(popped.SequenceNumber, uint16(5000)) - assert.Equal(uint16(100), q.Length()) - assert.Equal(err, nil) - }) -} - -func TestPriorityQueue_Find(t *testing.T) { - packets := NewQueue() - - packets.Push(&rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 1000, - Timestamp: 5, - SSRC: 5, - }, - Payload: []uint8{0xA}, - }, 1000) - - _, err := packets.PopAt(1000) - assert.NoError(t, err) - - _, err = packets.Find(1001) - assert.Error(t, err) -} - -func TestPriorityQueue_Clean(t *testing.T) { - packets := NewQueue() - packets.Clear() - packets.Push(&rtp.Packet{ - Header: rtp.Header{ - SequenceNumber: 1000, - Timestamp: 5, - SSRC: 5, - }, - Payload: []uint8{0xA}, - }, 1000) - assert.EqualValues(t, 1, packets.Length()) - packets.Clear() -} diff --git a/jitterbuffer/receiver_interceptor.go b/jitterbuffer/receiver_interceptor.go deleted file mode 100644 index b4c032b..0000000 --- a/jitterbuffer/receiver_interceptor.go +++ /dev/null @@ -1,110 +0,0 @@ -// SPDX-FileCopyrightText: 2023 The Pion community -// SPDX-License-Identifier: MIT - -package jitterbuffer - -import ( - "sync" - - "github.com/pion/interceptor" - "github.com/pion/logging" - "github.com/pion/rtp" -) - -// InterceptorFactory is a interceptor.Factory for a GeneratorInterceptor -type InterceptorFactory struct { - opts []ReceiverInterceptorOption -} - -// NewInterceptor constructs a new ReceiverInterceptor -func (g *InterceptorFactory) NewInterceptor(_ string) (interceptor.Interceptor, error) { - i := &ReceiverInterceptor{ - close: make(chan struct{}), - log: logging.NewDefaultLoggerFactory().NewLogger("jitterbuffer"), - buffer: New(), - } - - for _, opt := range g.opts { - if err := opt(i); err != nil { - return nil, err - } - } - - return i, nil -} - -// ReceiverInterceptor places a JitterBuffer in the chain to smooth packet arrival -// and allow for network jitter -// -// The Interceptor is designed to fit in a RemoteStream -// pipeline and buffer incoming packets for a short period (currently -// defaulting to 50 packets) before emitting packets to be consumed by the -// next step in the pipeline. -// -// The caller must ensure they are prepared to handle an -// ErrPopWhileBuffering in the case that insufficient packets have been -// received by the jitter buffer. The caller should retry the operation -// at some point later as the buffer may have been filled in the interim. -// -// The caller should also be aware that an ErrBufferUnderrun may be -// returned in the case that the initial buffering was sufficient and -// playback began but the caller is consuming packets (or they are not -// arriving) quickly enough. -type ReceiverInterceptor struct { - interceptor.NoOp - buffer *JitterBuffer - m sync.Mutex - wg sync.WaitGroup - close chan struct{} - log logging.LeveledLogger -} - -// NewInterceptor returns a new InterceptorFactory -func NewInterceptor(opts ...ReceiverInterceptorOption) (*InterceptorFactory, error) { - return &InterceptorFactory{opts}, nil -} - -// BindRemoteStream lets you modify any incoming RTP packets. It is called once for per RemoteStream. The returned method -// will be called once per rtp packet. -func (i *ReceiverInterceptor) BindRemoteStream(_ *interceptor.StreamInfo, reader interceptor.RTPReader) interceptor.RTPReader { - return interceptor.RTPReaderFunc(func(b []byte, a interceptor.Attributes) (int, interceptor.Attributes, error) { - buf := make([]byte, len(b)) - n, attr, err := reader.Read(buf, a) - if err != nil { - return n, attr, err - } - packet := &rtp.Packet{} - if err := packet.Unmarshal(buf); err != nil { - return 0, nil, err - } - i.m.Lock() - defer i.m.Unlock() - i.buffer.Push(packet) - if i.buffer.state == Emitting { - newPkt, err := i.buffer.Pop() - if err != nil { - return 0, nil, err - } - nlen, err := newPkt.MarshalTo(b) - return nlen, attr, err - } - return n, attr, ErrPopWhileBuffering - }) -} - -// UnbindRemoteStream is called when the Stream is removed. It can be used to clean up any data related to that track. -func (i *ReceiverInterceptor) UnbindRemoteStream(_ *interceptor.StreamInfo) { - defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() - i.buffer.Clear(true) -} - -// Close closes the interceptor -func (i *ReceiverInterceptor) Close() error { - defer i.wg.Wait() - i.m.Lock() - defer i.m.Unlock() - i.buffer.Clear(true) - return nil -} diff --git a/stream/jitter_buffer.go b/stream/jitter_buffer.go new file mode 100644 index 0000000..97ce462 --- /dev/null +++ b/stream/jitter_buffer.go @@ -0,0 +1,74 @@ +package stream + +import "math" + +type JitterBuffer struct { + maxSeqNum uint16 + minSeqNum uint16 + nextSeqNum uint16 + + count int + minStartCount int + + queue []interface{} + onPacket func(packet interface{}) + first bool +} + +func (j *JitterBuffer) emit() { + if j.first { + j.nextSeqNum = j.minSeqNum + j.first = false + } + if j.nextSeqNum > j.maxSeqNum { + j.nextSeqNum = j.minSeqNum + } + + for j.queue[j.nextSeqNum] == nil { + j.nextSeqNum++ + } + + j.onPacket(j.queue[j.nextSeqNum]) + j.queue[j.nextSeqNum] = nil + j.nextSeqNum++ + j.minSeqNum = uint16(math.Min(float64(j.nextSeqNum), float64(j.maxSeqNum))) + j.count-- +} + +func (j *JitterBuffer) Push(seq uint16, packet interface{}) { + if j.count == 0 { + j.minSeqNum = seq + j.maxSeqNum = seq + } + + if j.queue[seq] == nil { + j.queue[seq] = packet + j.count++ + } + + j.minSeqNum = uint16(math.Min(float64(j.minSeqNum), float64(seq))) + j.maxSeqNum = uint16(math.Max(float64(j.maxSeqNum), float64(seq))) + + if j.count > j.minStartCount { + j.emit() + } +} + +func (j *JitterBuffer) Flush() { + for j.count > 0 { + j.emit() + } +} + +func (j *JitterBuffer) Close() { + j.onPacket = nil +} + +func NewJitterBuffer(handler func(packet interface{})) *JitterBuffer { + return &JitterBuffer{ + queue: make([]interface{}, 0xFFFF+1), + onPacket: handler, + minStartCount: 50, + first: true, + } +} diff --git a/stream/jitterbufer_test.go b/stream/jitterbufer_test.go new file mode 100644 index 0000000..3a3b8fe --- /dev/null +++ b/stream/jitterbufer_test.go @@ -0,0 +1,25 @@ +package stream + +import ( + "math/rand" + "strconv" + "testing" + "time" +) + +func TestName(t *testing.T) { + // 设置随机数种子,确保每次运行程序时都能得到不同的随机序列 + rand.Seed(time.Now().UnixNano()) + buffer := NewJitterBuffer(func(packet interface{}) { + println(packet.(string)) + }) + + for i := 0; i < 65535; i++ { + // 生成1到65535之间的随机数 + randomNumber := rand.Intn(65535) + 1 + buffer.Push(uint16(randomNumber), strconv.Itoa(randomNumber)) + } + + buffer.Flush() + buffer.Close() +}