Compare commits

...

3 Commits

Author SHA1 Message Date
Atsushi Watanabe
186ee09102 Drop source frames during pause
Source reader should drop frames to catch up the latest frame.
2020-09-18 13:31:52 +09:00
Atsushi Watanabe
20fadef555 Add broadcast test conditions with pause
Add test case to pause provider feeding or consumer reading
during broadcasting.
2020-09-17 11:06:16 +09:00
Lukas Herman
0734092a11 Add pull-based Broadcaster
* Add generic io.Reader
* Add generic broadcaster
* Add specialize video broadcaster
* Use ring buffer in broadcaster
* Use small delay to relax the schedule in polling
2020-09-15 12:32:29 -07:00
4 changed files with 402 additions and 0 deletions

136
pkg/io/broadcast.go Normal file
View File

@@ -0,0 +1,136 @@
package io
import (
"fmt"
"sync/atomic"
"time"
)
const (
maskReading = 1 << 63
broadcasterRingSize = 32
// TODO: If the data source has fps greater than 30, they'll see some
// fps fluctuation. But, 30 fps should be enough for general cases.
broadcasterRingPollDuration = time.Millisecond * 33
)
var errEmptySource = fmt.Errorf("Source can't be nil")
type broadcasterData struct {
data interface{}
count uint32
err error
}
type broadcasterRing struct {
buffer []atomic.Value
// reading (1 bit) + reserved (31 bits) + data count (32 bits)
state uint64
}
func newBroadcasterRing() *broadcasterRing {
return &broadcasterRing{buffer: make([]atomic.Value, broadcasterRingSize)}
}
func (ring *broadcasterRing) index(count uint32) int {
return int(count) % len(ring.buffer)
}
func (ring *broadcasterRing) acquire(count uint32) func(*broadcasterData) {
// Reader has reached the latest data, should read from the source.
// Only allow 1 reader to read from the source. When there are more than 1 readers,
// the other readers will need to share the same data that the first reader gets from
// the source.
state := uint64(count)
if atomic.CompareAndSwapUint64(&ring.state, state, state|maskReading) {
return func(data *broadcasterData) {
i := ring.index(count)
ring.buffer[i].Store(data)
atomic.StoreUint64(&ring.state, uint64(count+1))
}
}
return nil
}
func (ring *broadcasterRing) get(count uint32) *broadcasterData {
for {
reading := uint64(count) | maskReading
// TODO: since it's lockless, it spends a lot of resources in the scheduling.
for atomic.LoadUint64(&ring.state) == reading {
// Yield current goroutine to let other goroutines to run instead
time.Sleep(broadcasterRingPollDuration)
}
i := ring.index(count)
data := ring.buffer[i].Load().(*broadcasterData)
if data.count == count {
return data
}
count++
}
}
func (ring *broadcasterRing) lastCount() uint32 {
// ring.state always keeps track the next count, so we need to subtract it by 1 to get the
// last count
return uint32(atomic.LoadUint64(&ring.state)) - 1
}
// Broadcaster is a generic pull-based broadcaster. Broadcaster is unique in a sense that
// readers can come and go at anytime, and readers don't need to close or notify broadcaster.
type Broadcaster struct {
source atomic.Value
buffer *broadcasterRing
}
// NewNewBroadcaster creates a new broadcaster.
func NewBroadcaster(source Reader) *Broadcaster {
var broadcaster Broadcaster
broadcaster.buffer = newBroadcasterRing()
broadcaster.ReplaceSource(source)
return &broadcaster
}
// NewReader creates a new reader. Each reader will retrieve the same data from the source.
// copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer
// in the ring buffer.
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader {
currentCount := broadcaster.buffer.lastCount()
return ReaderFunc(func() (data interface{}, err error) {
currentCount++
if push := broadcaster.buffer.acquire(currentCount); push != nil {
data, err = broadcaster.source.Load().(Reader).Read()
push(&broadcasterData{
data: data,
err: err,
count: currentCount,
})
} else {
ringData := broadcaster.buffer.get(currentCount)
data, err, currentCount = ringData.data, ringData.err, ringData.count
}
data = copyFn(data)
return
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
if source == nil {
return errEmptySource
}
broadcaster.source.Store(source)
return nil
}
// ReplaceSource retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
return broadcaster.source.Load().(Reader)
}

14
pkg/io/reader.go Normal file
View File

@@ -0,0 +1,14 @@
package io
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
// to provide strong type.
type Reader interface {
Read() (interface{}, error)
}
// ReaderFunc is a proxy type for Reader
type ReaderFunc func() (interface{}, error)
func (f ReaderFunc) Read() (interface{}, error) {
return f()
}

65
pkg/io/video/broadcast.go Normal file
View File

@@ -0,0 +1,65 @@
package video
import (
"fmt"
"image"
"github.com/pion/mediadevices/pkg/io"
)
var errEmptySource = fmt.Errorf("Source can't be nil")
// Broadcaster is a specialized video broadcaster.
type Broadcaster struct {
ioBroadcaster *io.Broadcaster
}
// NewNewBroadcaster creates a new broadcaster.
func NewBroadcaster(source Reader) *Broadcaster {
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
return source.Read()
}))
return &Broadcaster{broadcaster}
}
// NewReader creates a new reader. Each reader will retrieve the same data from the source.
// copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer
// in the ring buffer.
func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
copyFn := func(src interface{}) interface{} { return src }
if copyFrame {
buffer := NewFrameBuffer(0)
copyFn = func(src interface{}) interface{} {
realSrc, _ := src.(image.Image)
buffer.StoreCopy(realSrc)
return buffer.Load()
}
}
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (image.Image, error) {
data, err := reader.Read()
img, _ := data.(image.Image)
return img, err
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
return source.Read()
}))
}
// ReplaceSource retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (image.Image, error) {
data, err := source.Read()
img, _ := data.(image.Image)
return img, err
})
}

View File

@@ -0,0 +1,187 @@
package video
import (
"fmt"
"image"
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
)
func BenchmarkBroadcast(b *testing.B) {
var src Reader
img := image.NewRGBA(image.Rect(0, 0, 1920, 1080))
interval := time.NewTicker(time.Millisecond * 33) // 30 fps
defer interval.Stop()
src = ReaderFunc(func() (image.Image, error) {
<-interval.C
return img, nil
})
for n := 1; n <= 4096; n *= 16 {
n := n
b.Run(fmt.Sprintf("Readers-%d", n), func(b *testing.B) {
b.SetParallelism(n)
broadcaster := NewBroadcaster(src)
b.RunParallel(func(pb *testing.PB) {
reader := broadcaster.NewReader(false)
for pb.Next() {
reader.Read()
}
})
})
}
}
func TestBroadcast(t *testing.T) {
// https://github.com/pion/mediadevices/issues/198
if runtime.GOOS == "darwin" {
t.Skip("Skipping because Darwin CI is not reliable for timing related tests.")
}
frames := make([]image.Image, 5*30) // 5 seconds worth of frames
resolution := image.Rect(0, 0, 1920, 1080)
for i := range frames {
rgba := image.NewRGBA(resolution)
rgba.Pix[0] = uint8(i >> 24)
rgba.Pix[1] = uint8(i >> 16)
rgba.Pix[2] = uint8(i >> 8)
rgba.Pix[3] = uint8(i)
frames[i] = rgba
}
routinePauseConds := []struct {
src bool
dst bool
expectedFPS float64
expectedDrop float64
}{
{
src: false,
dst: false,
expectedFPS: 30,
},
{
src: true,
dst: false,
expectedFPS: 20,
expectedDrop: 10,
},
{
src: false,
dst: true,
expectedFPS: 20,
expectedDrop: 10,
},
}
for _, pauseCond := range routinePauseConds {
pauseCond := pauseCond
t.Run(fmt.Sprintf("SrcPause-%v/DstPause-%v", pauseCond.src, pauseCond.dst), func(t *testing.T) {
for n := 1; n <= 256; n *= 16 {
n := n
t.Run(fmt.Sprintf("Readers-%d", n), func(t *testing.T) {
var src Reader
interval := time.NewTicker(time.Millisecond * 33) // 30 fps
defer interval.Stop()
frameCount := 0
frameSent := 0
lastSend := time.Now()
src = ReaderFunc(func() (image.Image, error) {
if pauseCond.src && frameSent == 30 {
time.Sleep(time.Second)
}
<-interval.C
now := time.Now()
if interval := now.Sub(lastSend); interval > time.Millisecond*33*3/2 {
// Source reader should drop frames to catch up the latest frame.
drop := int(interval/(time.Millisecond*33)) - 1
frameCount += drop
t.Logf("Skipped %d frames", drop)
}
lastSend = now
frame := frames[frameCount]
frameCount++
frameSent++
return frame, nil
})
broadcaster := NewBroadcaster(src)
var done uint32
duration := time.Second * 3
fpsChan := make(chan []float64)
var wg sync.WaitGroup
wg.Add(n)
for i := 0; i < n; i++ {
go func() {
reader := broadcaster.NewReader(false)
count := 0
lastFrameCount := -1
droppedFrames := 0
wg.Done()
wg.Wait()
for atomic.LoadUint32(&done) == 0 {
if pauseCond.dst && count == 30 {
time.Sleep(time.Second)
}
frame, err := reader.Read()
if err != nil {
t.Error(err)
}
rgba := frame.(*image.RGBA)
var frameCount int
frameCount |= int(rgba.Pix[0]) << 24
frameCount |= int(rgba.Pix[1]) << 16
frameCount |= int(rgba.Pix[2]) << 8
frameCount |= int(rgba.Pix[3])
droppedFrames += (frameCount - lastFrameCount - 1)
lastFrameCount = frameCount
count++
}
fps := float64(count) / duration.Seconds()
if fps < pauseCond.expectedFPS-2 || fps > pauseCond.expectedFPS+2 {
t.Fatal("Unexpected average FPS")
}
droppedFramesPerSecond := float64(droppedFrames) / duration.Seconds()
if droppedFramesPerSecond < pauseCond.expectedDrop-2 || droppedFramesPerSecond > pauseCond.expectedDrop+2 {
t.Fatal("Unexpected drop count")
}
fpsChan <- []float64{fps, droppedFramesPerSecond, float64(lastFrameCount)}
}()
}
time.Sleep(duration)
atomic.StoreUint32(&done, 1)
var fpsAvg float64
var droppedFramesPerSecondAvg float64
var lastFrameCountAvg float64
var count int
for metric := range fpsChan {
fps, droppedFramesPerSecond, lastFrameCount := metric[0], metric[1], metric[2]
fpsAvg += fps
droppedFramesPerSecondAvg += droppedFramesPerSecond
lastFrameCountAvg += lastFrameCount
count++
if count == n {
break
}
}
t.Log("Average FPS :", fpsAvg/float64(n))
t.Log("Average dropped frames per second:", droppedFramesPerSecondAvg/float64(n))
t.Log("Last frame count (src) :", frameCount)
t.Log("Average last frame count (dst) :", lastFrameCountAvg/float64(n))
})
}
})
}
}