mirror of
https://github.com/pion/mediadevices.git
synced 2025-09-27 04:46:10 +08:00
Compare commits
3 Commits
427feefb33
...
add-broadc
Author | SHA1 | Date | |
---|---|---|---|
![]() |
186ee09102 | ||
![]() |
20fadef555 | ||
![]() |
0734092a11 |
136
pkg/io/broadcast.go
Normal file
136
pkg/io/broadcast.go
Normal file
@@ -0,0 +1,136 @@
|
|||||||
|
package io
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
maskReading = 1 << 63
|
||||||
|
broadcasterRingSize = 32
|
||||||
|
// TODO: If the data source has fps greater than 30, they'll see some
|
||||||
|
// fps fluctuation. But, 30 fps should be enough for general cases.
|
||||||
|
broadcasterRingPollDuration = time.Millisecond * 33
|
||||||
|
)
|
||||||
|
|
||||||
|
var errEmptySource = fmt.Errorf("Source can't be nil")
|
||||||
|
|
||||||
|
type broadcasterData struct {
|
||||||
|
data interface{}
|
||||||
|
count uint32
|
||||||
|
err error
|
||||||
|
}
|
||||||
|
|
||||||
|
type broadcasterRing struct {
|
||||||
|
buffer []atomic.Value
|
||||||
|
// reading (1 bit) + reserved (31 bits) + data count (32 bits)
|
||||||
|
state uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func newBroadcasterRing() *broadcasterRing {
|
||||||
|
return &broadcasterRing{buffer: make([]atomic.Value, broadcasterRingSize)}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ring *broadcasterRing) index(count uint32) int {
|
||||||
|
return int(count) % len(ring.buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ring *broadcasterRing) acquire(count uint32) func(*broadcasterData) {
|
||||||
|
// Reader has reached the latest data, should read from the source.
|
||||||
|
// Only allow 1 reader to read from the source. When there are more than 1 readers,
|
||||||
|
// the other readers will need to share the same data that the first reader gets from
|
||||||
|
// the source.
|
||||||
|
state := uint64(count)
|
||||||
|
if atomic.CompareAndSwapUint64(&ring.state, state, state|maskReading) {
|
||||||
|
return func(data *broadcasterData) {
|
||||||
|
i := ring.index(count)
|
||||||
|
ring.buffer[i].Store(data)
|
||||||
|
atomic.StoreUint64(&ring.state, uint64(count+1))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ring *broadcasterRing) get(count uint32) *broadcasterData {
|
||||||
|
for {
|
||||||
|
reading := uint64(count) | maskReading
|
||||||
|
// TODO: since it's lockless, it spends a lot of resources in the scheduling.
|
||||||
|
for atomic.LoadUint64(&ring.state) == reading {
|
||||||
|
// Yield current goroutine to let other goroutines to run instead
|
||||||
|
time.Sleep(broadcasterRingPollDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
i := ring.index(count)
|
||||||
|
data := ring.buffer[i].Load().(*broadcasterData)
|
||||||
|
if data.count == count {
|
||||||
|
return data
|
||||||
|
}
|
||||||
|
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ring *broadcasterRing) lastCount() uint32 {
|
||||||
|
// ring.state always keeps track the next count, so we need to subtract it by 1 to get the
|
||||||
|
// last count
|
||||||
|
return uint32(atomic.LoadUint64(&ring.state)) - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Broadcaster is a generic pull-based broadcaster. Broadcaster is unique in a sense that
|
||||||
|
// readers can come and go at anytime, and readers don't need to close or notify broadcaster.
|
||||||
|
type Broadcaster struct {
|
||||||
|
source atomic.Value
|
||||||
|
buffer *broadcasterRing
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewNewBroadcaster creates a new broadcaster.
|
||||||
|
func NewBroadcaster(source Reader) *Broadcaster {
|
||||||
|
var broadcaster Broadcaster
|
||||||
|
broadcaster.buffer = newBroadcasterRing()
|
||||||
|
broadcaster.ReplaceSource(source)
|
||||||
|
|
||||||
|
return &broadcaster
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReader creates a new reader. Each reader will retrieve the same data from the source.
|
||||||
|
// copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring
|
||||||
|
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer
|
||||||
|
// in the ring buffer.
|
||||||
|
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader {
|
||||||
|
currentCount := broadcaster.buffer.lastCount()
|
||||||
|
|
||||||
|
return ReaderFunc(func() (data interface{}, err error) {
|
||||||
|
currentCount++
|
||||||
|
if push := broadcaster.buffer.acquire(currentCount); push != nil {
|
||||||
|
data, err = broadcaster.source.Load().(Reader).Read()
|
||||||
|
push(&broadcasterData{
|
||||||
|
data: data,
|
||||||
|
err: err,
|
||||||
|
count: currentCount,
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
ringData := broadcaster.buffer.get(currentCount)
|
||||||
|
data, err, currentCount = ringData.data, ringData.err, ringData.count
|
||||||
|
}
|
||||||
|
|
||||||
|
data = copyFn(data)
|
||||||
|
return
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceSource replaces the underlying source. This operation is thread safe.
|
||||||
|
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
|
||||||
|
if source == nil {
|
||||||
|
return errEmptySource
|
||||||
|
}
|
||||||
|
|
||||||
|
broadcaster.source.Store(source)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceSource retrieves the underlying source. This operation is thread safe.
|
||||||
|
func (broadcaster *Broadcaster) Source() Reader {
|
||||||
|
return broadcaster.source.Load().(Reader)
|
||||||
|
}
|
14
pkg/io/reader.go
Normal file
14
pkg/io/reader.go
Normal file
@@ -0,0 +1,14 @@
|
|||||||
|
package io
|
||||||
|
|
||||||
|
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
|
||||||
|
// to provide strong type.
|
||||||
|
type Reader interface {
|
||||||
|
Read() (interface{}, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReaderFunc is a proxy type for Reader
|
||||||
|
type ReaderFunc func() (interface{}, error)
|
||||||
|
|
||||||
|
func (f ReaderFunc) Read() (interface{}, error) {
|
||||||
|
return f()
|
||||||
|
}
|
65
pkg/io/video/broadcast.go
Normal file
65
pkg/io/video/broadcast.go
Normal file
@@ -0,0 +1,65 @@
|
|||||||
|
package video
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"image"
|
||||||
|
|
||||||
|
"github.com/pion/mediadevices/pkg/io"
|
||||||
|
)
|
||||||
|
|
||||||
|
var errEmptySource = fmt.Errorf("Source can't be nil")
|
||||||
|
|
||||||
|
// Broadcaster is a specialized video broadcaster.
|
||||||
|
type Broadcaster struct {
|
||||||
|
ioBroadcaster *io.Broadcaster
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewNewBroadcaster creates a new broadcaster.
|
||||||
|
func NewBroadcaster(source Reader) *Broadcaster {
|
||||||
|
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
|
||||||
|
return source.Read()
|
||||||
|
}))
|
||||||
|
|
||||||
|
return &Broadcaster{broadcaster}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewReader creates a new reader. Each reader will retrieve the same data from the source.
|
||||||
|
// copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring
|
||||||
|
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer
|
||||||
|
// in the ring buffer.
|
||||||
|
func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
|
||||||
|
copyFn := func(src interface{}) interface{} { return src }
|
||||||
|
|
||||||
|
if copyFrame {
|
||||||
|
buffer := NewFrameBuffer(0)
|
||||||
|
copyFn = func(src interface{}) interface{} {
|
||||||
|
realSrc, _ := src.(image.Image)
|
||||||
|
buffer.StoreCopy(realSrc)
|
||||||
|
return buffer.Load()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
|
||||||
|
return ReaderFunc(func() (image.Image, error) {
|
||||||
|
data, err := reader.Read()
|
||||||
|
img, _ := data.(image.Image)
|
||||||
|
return img, err
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceSource replaces the underlying source. This operation is thread safe.
|
||||||
|
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
|
||||||
|
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
|
||||||
|
return source.Read()
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ReplaceSource retrieves the underlying source. This operation is thread safe.
|
||||||
|
func (broadcaster *Broadcaster) Source() Reader {
|
||||||
|
source := broadcaster.ioBroadcaster.Source()
|
||||||
|
return ReaderFunc(func() (image.Image, error) {
|
||||||
|
data, err := source.Read()
|
||||||
|
img, _ := data.(image.Image)
|
||||||
|
return img, err
|
||||||
|
})
|
||||||
|
}
|
187
pkg/io/video/broadcast_test.go
Normal file
187
pkg/io/video/broadcast_test.go
Normal file
@@ -0,0 +1,187 @@
|
|||||||
|
package video
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"image"
|
||||||
|
"runtime"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func BenchmarkBroadcast(b *testing.B) {
|
||||||
|
var src Reader
|
||||||
|
img := image.NewRGBA(image.Rect(0, 0, 1920, 1080))
|
||||||
|
interval := time.NewTicker(time.Millisecond * 33) // 30 fps
|
||||||
|
defer interval.Stop()
|
||||||
|
src = ReaderFunc(func() (image.Image, error) {
|
||||||
|
<-interval.C
|
||||||
|
return img, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
for n := 1; n <= 4096; n *= 16 {
|
||||||
|
n := n
|
||||||
|
|
||||||
|
b.Run(fmt.Sprintf("Readers-%d", n), func(b *testing.B) {
|
||||||
|
b.SetParallelism(n)
|
||||||
|
broadcaster := NewBroadcaster(src)
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
reader := broadcaster.NewReader(false)
|
||||||
|
for pb.Next() {
|
||||||
|
reader.Read()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBroadcast(t *testing.T) {
|
||||||
|
// https://github.com/pion/mediadevices/issues/198
|
||||||
|
if runtime.GOOS == "darwin" {
|
||||||
|
t.Skip("Skipping because Darwin CI is not reliable for timing related tests.")
|
||||||
|
}
|
||||||
|
frames := make([]image.Image, 5*30) // 5 seconds worth of frames
|
||||||
|
resolution := image.Rect(0, 0, 1920, 1080)
|
||||||
|
for i := range frames {
|
||||||
|
rgba := image.NewRGBA(resolution)
|
||||||
|
rgba.Pix[0] = uint8(i >> 24)
|
||||||
|
rgba.Pix[1] = uint8(i >> 16)
|
||||||
|
rgba.Pix[2] = uint8(i >> 8)
|
||||||
|
rgba.Pix[3] = uint8(i)
|
||||||
|
frames[i] = rgba
|
||||||
|
}
|
||||||
|
|
||||||
|
routinePauseConds := []struct {
|
||||||
|
src bool
|
||||||
|
dst bool
|
||||||
|
expectedFPS float64
|
||||||
|
expectedDrop float64
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
src: false,
|
||||||
|
dst: false,
|
||||||
|
expectedFPS: 30,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
src: true,
|
||||||
|
dst: false,
|
||||||
|
expectedFPS: 20,
|
||||||
|
expectedDrop: 10,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
src: false,
|
||||||
|
dst: true,
|
||||||
|
expectedFPS: 20,
|
||||||
|
expectedDrop: 10,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, pauseCond := range routinePauseConds {
|
||||||
|
pauseCond := pauseCond
|
||||||
|
t.Run(fmt.Sprintf("SrcPause-%v/DstPause-%v", pauseCond.src, pauseCond.dst), func(t *testing.T) {
|
||||||
|
for n := 1; n <= 256; n *= 16 {
|
||||||
|
n := n
|
||||||
|
|
||||||
|
t.Run(fmt.Sprintf("Readers-%d", n), func(t *testing.T) {
|
||||||
|
var src Reader
|
||||||
|
interval := time.NewTicker(time.Millisecond * 33) // 30 fps
|
||||||
|
defer interval.Stop()
|
||||||
|
frameCount := 0
|
||||||
|
frameSent := 0
|
||||||
|
lastSend := time.Now()
|
||||||
|
src = ReaderFunc(func() (image.Image, error) {
|
||||||
|
if pauseCond.src && frameSent == 30 {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
<-interval.C
|
||||||
|
|
||||||
|
now := time.Now()
|
||||||
|
if interval := now.Sub(lastSend); interval > time.Millisecond*33*3/2 {
|
||||||
|
// Source reader should drop frames to catch up the latest frame.
|
||||||
|
drop := int(interval/(time.Millisecond*33)) - 1
|
||||||
|
frameCount += drop
|
||||||
|
t.Logf("Skipped %d frames", drop)
|
||||||
|
}
|
||||||
|
lastSend = now
|
||||||
|
frame := frames[frameCount]
|
||||||
|
frameCount++
|
||||||
|
frameSent++
|
||||||
|
return frame, nil
|
||||||
|
})
|
||||||
|
broadcaster := NewBroadcaster(src)
|
||||||
|
var done uint32
|
||||||
|
duration := time.Second * 3
|
||||||
|
fpsChan := make(chan []float64)
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
reader := broadcaster.NewReader(false)
|
||||||
|
count := 0
|
||||||
|
lastFrameCount := -1
|
||||||
|
droppedFrames := 0
|
||||||
|
wg.Done()
|
||||||
|
wg.Wait()
|
||||||
|
for atomic.LoadUint32(&done) == 0 {
|
||||||
|
if pauseCond.dst && count == 30 {
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
}
|
||||||
|
frame, err := reader.Read()
|
||||||
|
if err != nil {
|
||||||
|
t.Error(err)
|
||||||
|
}
|
||||||
|
rgba := frame.(*image.RGBA)
|
||||||
|
var frameCount int
|
||||||
|
frameCount |= int(rgba.Pix[0]) << 24
|
||||||
|
frameCount |= int(rgba.Pix[1]) << 16
|
||||||
|
frameCount |= int(rgba.Pix[2]) << 8
|
||||||
|
frameCount |= int(rgba.Pix[3])
|
||||||
|
|
||||||
|
droppedFrames += (frameCount - lastFrameCount - 1)
|
||||||
|
lastFrameCount = frameCount
|
||||||
|
count++
|
||||||
|
}
|
||||||
|
|
||||||
|
fps := float64(count) / duration.Seconds()
|
||||||
|
if fps < pauseCond.expectedFPS-2 || fps > pauseCond.expectedFPS+2 {
|
||||||
|
t.Fatal("Unexpected average FPS")
|
||||||
|
}
|
||||||
|
|
||||||
|
droppedFramesPerSecond := float64(droppedFrames) / duration.Seconds()
|
||||||
|
if droppedFramesPerSecond < pauseCond.expectedDrop-2 || droppedFramesPerSecond > pauseCond.expectedDrop+2 {
|
||||||
|
t.Fatal("Unexpected drop count")
|
||||||
|
}
|
||||||
|
|
||||||
|
fpsChan <- []float64{fps, droppedFramesPerSecond, float64(lastFrameCount)}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(duration)
|
||||||
|
atomic.StoreUint32(&done, 1)
|
||||||
|
|
||||||
|
var fpsAvg float64
|
||||||
|
var droppedFramesPerSecondAvg float64
|
||||||
|
var lastFrameCountAvg float64
|
||||||
|
var count int
|
||||||
|
for metric := range fpsChan {
|
||||||
|
fps, droppedFramesPerSecond, lastFrameCount := metric[0], metric[1], metric[2]
|
||||||
|
fpsAvg += fps
|
||||||
|
droppedFramesPerSecondAvg += droppedFramesPerSecond
|
||||||
|
lastFrameCountAvg += lastFrameCount
|
||||||
|
count++
|
||||||
|
if count == n {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("Average FPS :", fpsAvg/float64(n))
|
||||||
|
t.Log("Average dropped frames per second:", droppedFramesPerSecondAvg/float64(n))
|
||||||
|
t.Log("Last frame count (src) :", frameCount)
|
||||||
|
t.Log("Average last frame count (dst) :", lastFrameCountAvg/float64(n))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user