Compare commits

..

7 Commits

Author SHA1 Message Date
renovate[bot]
f472618b71 Update module pion/rtp to v1.6.2 (#265)
Generated by Renovate Bot

Co-authored-by: Renovate Bot <bot@renovateapp.com>
2020-12-14 22:08:56 -05:00
renovate[bot]
7f4d1bc5ad Update module gen2brain/malgo to v0.10.27 (#259)
Generated by Renovate Bot

Co-authored-by: Renovate Bot <bot@renovateapp.com>
2020-12-14 22:08:29 -05:00
Lukas Herman
97046bc6ec Add NewEncodedIOReader (#263)
Changes:
  * [BREAKING CHANGE] NewEncodedReader is renamed to NewEncodedIOReader
  * NewEncodedReader now returns a non-standard buffer reader to give
  more meta data such as sample count
2020-12-14 22:07:57 -05:00
f-fl0
7bcc9111f4 Tolerate video frame rate variation in properties detector (#261)
Changes:
* Add argument to tolerate some FPS variations.
   * Update wrapper function.
   * Update tests.
* Add test about frame rate change tolerance.
   * Verify the onChange function is not called when the frame rate change
is within the the specified tolerance.
* Update test about frame rate variation detection
   * Create dedicated throttle transform function to slow down after a specific amount of time.
* Remove unnecessary code.
2020-12-11 12:36:55 -08:00
Atsushi Watanabe
044b5566d1 Don't cancel other matrix tests on fail 2020-12-11 15:28:00 -05:00
Atsushi Watanabe
7f41f9b8df Skip bitrate measure test on darwin 2020-12-11 15:28:00 -05:00
Lukas Herman
5d5001d0b4 Bring back x11 adapter for Linux build only
Original x11 adapter uses ~40% less CPU usage than kbinani/screenshot.
Ideally, we should migrate 100% to kbinani/screenshot. But, the CPU
usage difference is substantial. In the future, we should look deeper
into kbinani/screenshot and try to improve it.
2020-11-23 20:36:17 -05:00
14 changed files with 604 additions and 87 deletions

View File

@@ -11,6 +11,7 @@ jobs:
build-linux:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
go: [ '1.15', '1.14' ]
name: Linux Go ${{ matrix.go }}
@@ -48,6 +49,7 @@ jobs:
build-darwin:
runs-on: macos-latest
strategy:
fail-fast: false
matrix:
go: [ '1.15', '1.14' ]
name: Darwin Go ${{ matrix.go }}

4
go.mod
View File

@@ -5,13 +5,13 @@ go 1.13
require (
github.com/BurntSushi/xgb v0.0.0-20201008132610-5f9e7b3c49cd // indirect
github.com/blackjack/webcam v0.0.0-20200313125108-10ed912a8539
github.com/gen2brain/malgo v0.10.25
github.com/gen2brain/malgo v0.10.27
github.com/gen2brain/shm v0.0.0-20200228170931-49f9650110c5 // indirect
github.com/kbinani/screenshot v0.0.0-20191211154542-3a185f1ce18f
github.com/lherman-cs/opus v0.0.2
github.com/lxn/win v0.0.0-20201111105847-2a20daff6a55 // indirect
github.com/pion/logging v0.2.2
github.com/pion/rtp v1.6.1
github.com/pion/rtp v1.6.2
github.com/pion/webrtc/v2 v2.2.26
github.com/satori/go.uuid v1.2.0
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5

8
go.sum
View File

@@ -9,8 +9,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/gen2brain/malgo v0.10.25 h1:VRiYTBmBeHTCXD0wCg7XyLi6lJJBqND/XVmSEyrGkGc=
github.com/gen2brain/malgo v0.10.25/go.mod h1:zHSUNZAXfCeNsZou0RtQ6Zk7gDYLIcKOrUWtAdksnEs=
github.com/gen2brain/malgo v0.10.27 h1:KlNitZIO8V4W2VnjtTM8AGMy/XBb2pN+fnIB5bEps8E=
github.com/gen2brain/malgo v0.10.27/go.mod h1:zHSUNZAXfCeNsZou0RtQ6Zk7gDYLIcKOrUWtAdksnEs=
github.com/gen2brain/shm v0.0.0-20200228170931-49f9650110c5 h1:Y5Q2mEwfzjMt5+3u70Gtw93ZOu2UuPeeeTBDntF7FoY=
github.com/gen2brain/shm v0.0.0-20200228170931-49f9650110c5/go.mod h1:uF6rMu/1nvu+5DpiRLwusA6xB8zlkNoGzKn8lmYONUo=
github.com/golang/mock v1.2.0 h1:28o5sBqPkBsMGnC6b4MvE2TzSr5/AT4c/1fLqVGIwlk=
@@ -61,8 +61,8 @@ github.com/pion/rtcp v1.2.3 h1:2wrhKnqgSz91Q5nzYTO07mQXztYPtxL8a0XOss4rJqA=
github.com/pion/rtcp v1.2.3/go.mod h1:zGhIv0RPRF0Z1Wiij22pUt5W/c9fevqSzT4jje/oK7I=
github.com/pion/rtp v1.6.0 h1:4Ssnl/T5W2LzxHj9ssYpGVEQh3YYhQFNVmSWO88MMwk=
github.com/pion/rtp v1.6.0/go.mod h1:QgfogHsMBVE/RFNno467U/KBqfUywEH+HK+0rtnwsdI=
github.com/pion/rtp v1.6.1 h1:2Y2elcVBrahYnHKN2X7rMHX/r1R4TEBMP1LaVu/wNhk=
github.com/pion/rtp v1.6.1/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U=
github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko=
github.com/pion/sctp v1.7.10 h1:o3p3/hZB5Cx12RMGyWmItevJtZ6o2cpuxaw6GOS4x+8=
github.com/pion/sctp v1.7.10/go.mod h1:EhpTUQu1/lcK3xI+eriS6/96fWetHGCvBi9MSsnaBN0=
github.com/pion/sdp/v2 v2.4.0 h1:luUtaETR5x2KNNpvEMv/r4Y+/kzImzbz4Lm1z8eQNQI=

View File

@@ -2,12 +2,18 @@ package codec
import (
"io"
"runtime"
"sync"
"testing"
"time"
)
func TestMeasureBitRateStatic(t *testing.T) {
// https://github.com/pion/mediadevices/issues/198
if runtime.GOOS == "darwin" {
t.Skip("Skipping because Darwin CI is not reliable for timing related tests.")
}
r, w := io.Pipe()
const (
dataSize = 1000
@@ -54,6 +60,11 @@ func TestMeasureBitRateStatic(t *testing.T) {
}
func TestMeasureBitRateDynamic(t *testing.T) {
// https://github.com/pion/mediadevices/issues/198
if runtime.GOOS == "darwin" {
t.Skip("Skipping because Darwin CI is not reliable for timing related tests.")
}
r, w := io.Pipe()
const (
dataSize = 1000

View File

@@ -1,14 +1,61 @@
package mediadevices
type EncodedBuffer struct {
Data []byte
Samples uint32
}
type EncodedReadCloser interface {
Read() (EncodedBuffer, func(), error)
Close() error
}
type encodedReadCloserImpl struct {
readFn func([]byte) (int, error)
readFn func() (EncodedBuffer, func(), error)
closeFn func() error
}
func (r *encodedReadCloserImpl) Read(b []byte) (int, error) {
return r.readFn(b)
func (r *encodedReadCloserImpl) Read() (EncodedBuffer, func(), error) {
return r.readFn()
}
func (r *encodedReadCloserImpl) Close() error {
return r.closeFn()
}
type encodedIOReadCloserImpl struct {
readFn func([]byte) (int, error)
closeFn func() error
}
func newEncodedIOReadCloserImpl(reader EncodedReadCloser) *encodedIOReadCloserImpl {
var encoded EncodedBuffer
release := func() {}
return &encodedIOReadCloserImpl{
readFn: func(b []byte) (int, error) {
var err error
if len(encoded.Data) == 0 {
release()
encoded, release, err = reader.Read()
if err != nil {
reader.Close()
return 0, err
}
}
n := copy(b, encoded.Data)
encoded.Data = encoded.Data[n:]
return n, nil
},
closeFn: reader.Close,
}
}
func (r *encodedIOReadCloserImpl) Read(b []byte) (int, error) {
return r.readFn(b)
}
func (r *encodedIOReadCloserImpl) Close() error {
return r.closeFn()
}

View File

@@ -38,7 +38,11 @@ func (track *mockMediaStreamTrack) NewRTPReader(codecName string, mtu int) (RTPR
return nil, nil
}
func (track *mockMediaStreamTrack) NewEncodedReader(codecName string) (io.ReadCloser, error) {
func (track *mockMediaStreamTrack) NewEncodedReader(codecName string) (EncodedReadCloser, error) {
return nil, nil
}
func (track *mockMediaStreamTrack) NewEncodedIOReader(codecName string) (io.ReadCloser, error) {
return nil, nil
}

View File

@@ -14,7 +14,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error)
// buffered frame or a new frame from the source. This also implies that no frame will be lost
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
metaReader = video.DetectChanges(0, 0, func(p prop.Media) { currentProp = p })(metaReader)
_, _, err := metaReader.Read()
return currentProp, err

View File

@@ -1,3 +1,5 @@
// +build !linux
package screen
import (

View File

@@ -0,0 +1,92 @@
package screen
import (
"fmt"
"image"
"time"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
type screen struct {
num int
reader *reader
tick *time.Ticker
}
func deviceID(num int) string {
return fmt.Sprintf("X11Screen%d", num)
}
func init() {
dp, err := openDisplay()
if err != nil {
// No x11 display available.
return
}
defer dp.Close()
numScreen := dp.NumScreen()
for i := 0; i < numScreen; i++ {
driver.GetManager().Register(
&screen{
num: i,
},
driver.Info{
Label: deviceID(i),
DeviceType: driver.Screen,
},
)
}
}
func (s *screen) Open() error {
r, err := newReader(s.num)
if err != nil {
return err
}
s.reader = r
return nil
}
func (s *screen) Close() error {
s.reader.Close()
if s.tick != nil {
s.tick.Stop()
}
return nil
}
func (s *screen) VideoRecord(p prop.Media) (video.Reader, error) {
if p.FrameRate == 0 {
p.FrameRate = 10
}
s.tick = time.NewTicker(time.Duration(float32(time.Second) / p.FrameRate))
var dst image.RGBA
reader := s.reader
r := video.ReaderFunc(func() (image.Image, func(), error) {
<-s.tick.C
return reader.Read().ToRGBA(&dst), func() {}, nil
})
return r, nil
}
func (s *screen) Properties() []prop.Media {
rect := s.reader.img.Bounds()
w := rect.Dx()
h := rect.Dy()
return []prop.Media{
{
DeviceID: deviceID(s.num),
Video: prop.Video{
Width: w,
Height: h,
FrameFormat: frame.FormatRGBA,
},
},
}
}

View File

@@ -0,0 +1,283 @@
package screen
// #cgo pkg-config: x11 xext
// #include <stdint.h>
// #include <sys/shm.h>
// #include <X11/Xlib.h>
// #define XUTIL_DEFINE_FUNCTIONS
// #include <X11/Xutil.h>
// #include <X11/extensions/XShm.h>
//
// void copyBGR24(void *dst, char *src, size_t l) { // 64bit aligned copy
// uint64_t *d = (uint64_t*)dst;
// uint64_t *s = (uint64_t*)src;
// l /= 8;
// for (size_t i = 0; i < l; i ++) {
// uint64_t v = *s;
// // Reorder BGR to RGB
// *d = 0xFF000000FF000000 |
// ((v >> 16) & 0xFF00000000) | (v & 0xFF0000000000) | ((v & 0xFF00000000) << 16) |
// ((v >> 16) & 0xFF) | (v & 0xFF00) | ((v & 0xFF) << 16);
// d++;
// s++;
// }
// }
//
// void copyBGR16(void *dst, char *src, size_t l) { // 64bit aligned copy
// uint64_t *d = (uint64_t*)dst;
// uint32_t *s = (uint32_t*)src;
// l /= 8;
// for (size_t i = 0; i < l; i ++) {
// uint64_t v = *s;
// // Reorder BGR to RGB
// *d = 0xFF000000FF000000 |
// ((v & 0xF8000000) << 8) | ((v & 0x7E00000) << 21) | ((v & 0x1F0000) << 35) |
// ((v & 0xF800) >> 8) | ((v & 0x7E0) << 5) | ((v & 0x1F) << 19);
// d++;
// s++;
// }
// }
//
// char *align64(char *ptr) { // return 64bit aligned pointer
// if (((size_t)ptr & 0x07) == 0) {
// return ptr;
// }
// // Clear lower 3bits to align the address to 8bytes.
// return (char*)(((size_t)ptr & (~(size_t)0x07)) + 0x08);
// }
// size_t align64ForTest(size_t ptr) {
// return (size_t)align64((char*)ptr);
// }
import "C"
import (
"errors"
"fmt"
"image"
"image/color"
"unsafe"
)
const shmaddrInvalid = ^uintptr(0)
type display C.Display
type pixelFormat int
const (
pixFmtBGR24 pixelFormat = iota
pixFmtRGB24
pixFmtBGR16
pixFmtRGB16
)
func openDisplay() (*display, error) {
dp := C.XOpenDisplay(nil)
if dp == nil {
return nil, errors.New("failed to open display")
}
return (*display)(dp), nil
}
func (d *display) c() *C.Display {
return (*C.Display)(d)
}
func (d *display) Close() {
C.XCloseDisplay(d.c())
}
func (d *display) NumScreen() int {
return int(C.XScreenCount(d.c()))
}
type shmImage struct {
dp *C.Display
img *C.XImage
shm C.XShmSegmentInfo
b []byte
pixFmt pixelFormat
}
func (s *shmImage) Free() {
if s.img != nil {
C.XShmDetach(s.dp, &s.shm)
C.XDestroyImage(s.img)
}
if uintptr(unsafe.Pointer(s.shm.shmaddr)) != shmaddrInvalid {
C.shmdt(unsafe.Pointer(s.shm.shmaddr))
}
}
func (s *shmImage) ColorModel() color.Model {
return color.RGBAModel
}
func (s *shmImage) Bounds() image.Rectangle {
return image.Rect(0, 0, int(s.img.width), int(s.img.height))
}
type colorFunc func() (r, g, b, a uint32)
func (c colorFunc) RGBA() (r, g, b, a uint32) {
return c()
}
func (s *shmImage) At(x, y int) color.Color {
switch s.pixFmt {
case pixFmtBGR24:
addr := (x + y*int(s.img.width)) * 4
b := uint32(s.b[addr]) * 0x100
g := uint32(s.b[addr+1]) * 0x100
r := uint32(s.b[addr+2]) * 0x100
return colorFunc(func() (_, _, _, _ uint32) {
return r, g, b, 0xFFFF
})
case pixFmtBGR16:
addr := (x + y*int(s.img.width)) * 2
b1, b2 := s.b[addr], s.b[addr+1]
b := uint32(b1>>3) * 0x100
g := uint32((b1&0x7)<<3|(b2&0xE0)>>5) * 0x100
r := uint32(b2&0x1F) * 0x100
return colorFunc(func() (_, _, _, _ uint32) {
return r, g, b, 0xFFFF
})
default:
panic("unsupported pixel format")
}
}
func (s *shmImage) RGBAAt(x, y int) color.RGBA {
switch s.pixFmt {
case pixFmtBGR24:
addr := (x + y*int(s.img.width)) * 4
b := s.b[addr]
g := s.b[addr+1]
r := s.b[addr+2]
return color.RGBA{R: r, G: g, B: b, A: 0xFF}
case pixFmtBGR16:
addr := (x + y*int(s.img.width)) * 2
b1, b2 := s.b[addr], s.b[addr+1]
b := b1 >> 3
g := (b1&0x7)<<3 | (b2&0xE0)>>5
r := b2 & 0x1F
return color.RGBA{R: r, G: g, B: b, A: 0xFF}
default:
panic("unsupported pixel format")
}
}
func (s *shmImage) ToRGBA(dst *image.RGBA) *image.RGBA {
dst.Rect = s.Bounds()
dst.Stride = int(s.img.width) * 4
l := int(4 * s.img.width * s.img.height)
if len(dst.Pix) < l {
if cap(dst.Pix) < l {
dst.Pix = make([]uint8, l)
}
dst.Pix = dst.Pix[:l]
}
switch s.pixFmt {
case pixFmtBGR24:
C.copyBGR24(unsafe.Pointer(&dst.Pix[0]), s.img.data, C.ulong(len(dst.Pix)))
return dst
case pixFmtBGR16:
C.copyBGR16(unsafe.Pointer(&dst.Pix[0]), s.img.data, C.ulong(len(dst.Pix)))
return dst
default:
panic("unsupported pixel format")
}
}
func newShmImage(dp *C.Display, screen int) (*shmImage, error) {
cScreen := C.int(screen)
w := int(C.XDisplayWidth(dp, cScreen))
h := int(C.XDisplayHeight(dp, cScreen))
v := C.XDefaultVisual(dp, cScreen)
depth := int(C.XDefaultDepth(dp, cScreen))
s := &shmImage{dp: dp}
switch {
case v.red_mask == 0xFF0000 && v.green_mask == 0xFF00 && v.blue_mask == 0xFF:
s.pixFmt = pixFmtBGR24
case v.red_mask == 0xF800 && v.green_mask == 0x7E0 && v.blue_mask == 0x1F:
s.pixFmt = pixFmtBGR16
default:
fmt.Printf("x11capture: unsupported pixel format (R: %0x, G: %0x, B: %0x)\n",
v.red_mask, v.green_mask, v.blue_mask)
return nil, errors.New("unsupported pixel format")
}
s.shm.shmid = C.shmget(C.IPC_PRIVATE, C.ulong(w*h*4+8), C.IPC_CREAT|0600)
if s.shm.shmid == -1 {
return nil, errors.New("failed to get shared memory")
}
s.shm.shmaddr = (*C.char)(C.shmat(s.shm.shmid, unsafe.Pointer(nil), 0))
if uintptr(unsafe.Pointer(s.shm.shmaddr)) == shmaddrInvalid {
s.shm.shmaddr = nil
return nil, errors.New("failed to get shared memory address")
}
s.shm.readOnly = 0
C.shmctl(s.shm.shmid, C.IPC_RMID, nil)
s.img = C.XShmCreateImage(
dp, v, C.uint(depth), C.ZPixmap, C.align64(s.shm.shmaddr), &s.shm, C.uint(w), C.uint(h))
if s.img == nil {
s.Free()
return nil, errors.New("failed to create XShm image")
}
C.XShmAttach(dp, &s.shm)
C.XSync(dp, 0)
return s, nil
}
type reader struct {
dp *C.Display
img *shmImage
}
func newReader(screen int) (*reader, error) {
dp := C.XOpenDisplay(nil)
if dp == nil {
return nil, errors.New("failed to open display")
}
if C.XShmQueryExtension(dp) == 0 {
return nil, errors.New("no XShm support")
}
img, err := newShmImage(dp, screen)
if err != nil {
C.XCloseDisplay(dp)
return nil, err
}
return &reader{
dp: dp,
img: img,
}, nil
}
func (r *reader) Size() (int, int) {
return int(r.img.img.width), int(r.img.img.height)
}
func (r *reader) Read() *shmImage {
C.XShmGetImage(r.dp, C.XDefaultRootWindow(r.dp), r.img.img, 0, 0, C.AllPlanes)
r.img.b = C.GoBytes(
unsafe.Pointer(r.img.img.data),
C.int(r.img.img.width*r.img.img.height*4),
)
return r.img
}
func (r *reader) Close() {
r.img.Free()
C.XCloseDisplay(r.dp)
}
// cAlign64 is fot testing
func cAlign64(ptr uintptr) uintptr {
return uintptr(C.align64ForTest(C.ulong(uintptr(ptr))))
}

View File

@@ -0,0 +1,17 @@
package screen
import (
"testing"
)
func TestAlign64(t *testing.T) {
if ret := cAlign64(0x00010008); ret != 0x00010008 {
t.Errorf("Wrong alignment, expected %x, got %x", 0x00010008, ret)
}
if ret := cAlign64(0x00010006); ret != 0x00010008 {
t.Errorf("Wrong alignment, expected %x, got %x", 0x00010008, ret)
}
if ret := cAlign64(0x00010009); ret != 0x00010010 {
t.Errorf("Wrong alignment, expected %x, got %x", 0x00010010, ret)
}
}

View File

@@ -2,6 +2,7 @@ package video
import (
"image"
"math"
"time"
"github.com/pion/mediadevices/pkg/prop"
@@ -9,7 +10,7 @@ import (
// DetectChanges will detect frame and video property changes. For video property detection,
// since it's time related, interval will be used to determine the sample rate.
func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformFunc {
func DetectChanges(interval time.Duration, fpsDiffTolerance float64, onChange func(prop.Media)) TransformFunc {
return func(r Reader) Reader {
var currentProp prop.Media
var lastTaken time.Time
@@ -40,11 +41,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
elapsed := now.Sub(lastTaken)
if elapsed >= interval {
fps := float32(float64(frames) / elapsed.Seconds())
// TODO: maybe add some epsilon so that small changes will not mark as dirty
currentProp.FrameRate = fps
frames = 0
lastTaken = now
dirty = true
if math.Abs(float64(currentProp.FrameRate-fps)) > fpsDiffTolerance {
currentProp.FrameRate = fps
dirty = true
}
}
if dirty {

View File

@@ -28,7 +28,7 @@ func BenchmarkDetectChanges(b *testing.B) {
src := src
b.Run(fmt.Sprintf("WithDetectChanges%d", n), func(b *testing.B) {
for i := 0; i < n; i++ {
src = DetectChanges(time.Microsecond, func(p prop.Media) {})(src)
src = DetectChanges(time.Microsecond, 0, func(p prop.Media) {})(src)
}
for i := 0; i < b.N; i++ {
@@ -74,6 +74,27 @@ func TestDetectChanges(t *testing.T) {
}
}
SlowDownAfterThrottle := func(rate float32, factor float64, after time.Duration) TransformFunc {
return func(r Reader) Reader {
sleep := float64(time.Second) / float64(rate)
start := time.Now()
f := 1.0
return ReaderFunc(func() (image.Image, func(), error) {
for {
img, _, err := r.Read()
if err != nil {
return nil, func() {}, err
}
if time.Since(start) > after {
f = factor
}
time.Sleep(time.Duration(sleep * f))
return img, func() {}, nil
}
})
}
}
t.Run("OnChangeCalledBeforeFirstFrame", func(t *testing.T) {
var detectBeforeFirstFrame bool
var expected prop.Media
@@ -81,7 +102,7 @@ func TestDetectChanges(t *testing.T) {
expected.Width = 1920
expected.Height = 1080
src, _ := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
src = DetectChanges(time.Second, 0, func(p prop.Media) {
actual = p
detectBeforeFirstFrame = true
})(src)
@@ -104,7 +125,7 @@ func TestDetectChanges(t *testing.T) {
expected.Width = 1920
expected.Height = 1080
src, update := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
src = DetectChanges(time.Second, 0, func(p prop.Media) {
actual = p
})(src)
@@ -137,7 +158,7 @@ func TestDetectChanges(t *testing.T) {
expected.FrameRate = 30
src, _ := buildSource(expected)
src = Throttle(expected.FrameRate)(src)
src = DetectChanges(time.Second*5, func(p prop.Media) {
src = DetectChanges(time.Second*5, 0, func(p prop.Media) {
actual = p
count++
})(src)
@@ -155,4 +176,31 @@ func TestDetectChanges(t *testing.T) {
assertEq(t, actual, expected, frame, checkFrameRate)
}
})
t.Run("OnChangeNotCalledForToleratedFrameRateVariation", func(t *testing.T) {
// https://github.com/pion/mediadevices/issues/198
if runtime.GOOS == "darwin" {
t.Skip("Skipping because Darwin CI is not reliable for timing related tests.")
}
var expected prop.Media
var count int
expected.Width = 1920
expected.Height = 1080
expected.FrameRate = 30
src, _ := buildSource(expected)
src = SlowDownAfterThrottle(expected.FrameRate, 1.1, time.Second)(src)
src = DetectChanges(time.Second, 5, func(p prop.Media) {
count++
})(src)
for start := time.Now(); time.Since(start) < 3*time.Second; {
src.Read()
}
// onChange is called once before first frame: prop.FrameRate still 0.
// onChange is called again after receiving frames during the specified interval: prop.FrameRate is properly calculated
// So if the frame rate only changes within the specified tolerance, onChange should no longer be called.
if count > 2 {
t.Fatalf("onChange was called more than twice.")
}
})
}

145
track.go
View File

@@ -58,8 +58,10 @@ type Track interface {
// NewRTPReader creates a new reader from the source. The reader will encode the source, and packetize
// the encoded data in RTP format with given mtu size.
NewRTPReader(codecName string, mtu int) (RTPReadCloser, error)
// NewEncodedReader creates a EncodedReadCloser that reads the encoded data in codecName format
NewEncodedReader(codecName string) (EncodedReadCloser, error)
// NewEncodedReader creates a new Go standard io.ReadCloser that reads the encoded data in codecName format
NewEncodedReader(codecName string) (io.ReadCloser, error)
NewEncodedIOReader(codecName string) (io.ReadCloser, error)
}
type baseTrack struct {
@@ -185,31 +187,6 @@ func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error {
return nil
}
func (track *baseTrack) newEncodedReader(reader codec.ReadCloser) (io.ReadCloser, error) {
var encoded []byte
release := func() {}
return &encodedReadCloserImpl{
readFn: func(b []byte) (int, error) {
var err error
if len(encoded) == 0 {
release()
encoded, release, err = reader.Read()
if err != nil {
reader.Close()
track.onError(err)
return 0, err
}
}
n := copy(b, encoded)
encoded = encoded[n:]
return n, nil
},
closeFn: reader.Close,
}, nil
}
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
if err := d.Open(); err != nil {
return nil, err
@@ -291,20 +268,52 @@ func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
func (track *VideoTrack) newEncodedReader(codecNames ...string) (EncodedReadCloser, *codec.RTPCodec, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
return nil, nil, err
}
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecName)
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecNames...)
if err != nil {
return nil, err
return nil, nil, err
}
sample := newVideoSampler(selectedCodec.ClockRate)
return &encodedReadCloserImpl{
readFn: func() (EncodedBuffer, func(), error) {
data, release, err := encodedReader.Read()
buffer := EncodedBuffer{
Data: data,
Samples: sample(),
}
return buffer, release, err
},
closeFn: encodedReader.Close,
}, selectedCodec, nil
}
func (track *VideoTrack) NewEncodedReader(codecName string) (EncodedReadCloser, error) {
reader, _, err := track.newEncodedReader(codecName)
return reader, err
}
func (track *VideoTrack) NewEncodedIOReader(codecName string) (io.ReadCloser, error) {
encodedReader, _, err := track.newEncodedReader(codecName)
if err != nil {
return nil, err
}
return newEncodedIOReadCloserImpl(encodedReader), nil
}
func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
encodedReader, selectedCodec, err := track.newEncodedReader(codecName)
if err != nil {
return nil, err
}
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
@@ -318,29 +327,13 @@ func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser,
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
pkts := packetizer.Packetize(encoded.Data, encoded.Samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}
func (track *VideoTrack) NewEncodedReader(codecName string) (io.ReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, _, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
return track.newEncodedReader(encodedReader)
}
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
// manipulate.
type AudioTrack struct {
@@ -408,20 +401,52 @@ func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *AudioTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
func (track *AudioTrack) newEncodedReader(codecNames ...string) (EncodedReadCloser, *codec.RTPCodec, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
return nil, nil, err
}
encodedReader, selectedCodec, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecName)
encodedReader, selectedCodec, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecNames...)
if err != nil {
return nil, err
return nil, nil, err
}
sample := newAudioSampler(selectedCodec.ClockRate, inputProp.Latency)
return &encodedReadCloserImpl{
readFn: func() (EncodedBuffer, func(), error) {
data, release, err := encodedReader.Read()
buffer := EncodedBuffer{
Data: data,
Samples: sample(),
}
return buffer, release, err
},
closeFn: encodedReader.Close,
}, selectedCodec, nil
}
func (track *AudioTrack) NewEncodedReader(codecName string) (EncodedReadCloser, error) {
reader, _, err := track.newEncodedReader(codecName)
return reader, err
}
func (track *AudioTrack) NewEncodedIOReader(codecName string) (io.ReadCloser, error) {
encodedReader, _, err := track.newEncodedReader(codecName)
if err != nil {
return nil, err
}
return newEncodedIOReadCloserImpl(encodedReader), nil
}
func (track *AudioTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
encodedReader, selectedCodec, err := track.newEncodedReader(codecName)
if err != nil {
return nil, err
}
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
@@ -435,25 +460,9 @@ func (track *AudioTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser,
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
pkts := packetizer.Packetize(encoded.Data, encoded.Samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}
func (track *AudioTrack) NewEncodedReader(codecName string) (io.ReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, _, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
return track.newEncodedReader(encodedReader)
}