Compare commits

...

14 Commits

Author SHA1 Message Date
renovate[bot]
0466694f95 fix(deps): update module golang.org/x/image to v0.31.0
Generated by Renovate Bot
2025-09-26 05:27:10 +00:00
Lei Kang
cb394eb4c5 resolve comment 2025-09-17 16:50:52 -07:00
Atsushi Watanabe
e9f3dc20b6 Fix reading multiple decoded frames 2025-09-17 16:50:52 -07:00
Lei Kang
0710906fc7 fix the test 2025-09-17 16:50:52 -07:00
Lei Kang
7fdafa9598 add codec decoder interface 2025-09-17 16:50:52 -07:00
Lei Kang
5a19127623 add return error code 2025-09-17 16:50:52 -07:00
Lei Kang
8ca6903676 add null pointer from C 2025-09-17 16:50:52 -07:00
Lei Kang
de517d790b wrap vpx_image into a struct 2025-09-17 16:50:52 -07:00
Lei Kang
81cfc047d5 add vpx decoder 2025-09-17 16:50:52 -07:00
renovate[bot]
1406108fb2 fix(deps): update module github.com/stretchr/testify to v1.11.1
Generated by Renovate Bot
2025-09-14 22:46:53 -04:00
renovate[bot]
a2a211857c chore(deps): update actions/setup-go action to v6
Generated by Renovate Bot
2025-09-14 22:41:32 -04:00
philipch07
c0721738c4 Apply go modernize (#650) 2025-09-14 21:55:37 -04:00
Leo (Lei) Kang
6047a32ea0 [VPX] vpx dynamic encoding (#647)
* Add vp8 decoder and dynamic vp8 decoding

* Add QPController

* change parameters into const

* move decoder into another PR

* use explicit parameter name
2025-09-04 14:33:07 -07:00
Leo (Lei) Kang
60bf158757 [CODEC] Add encoder bitrate tracker (#646)
add encoder bitrate tracker
2025-09-03 15:55:37 -07:00
23 changed files with 484 additions and 69 deletions

View File

@@ -22,7 +22,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Setup Go - name: Setup Go
uses: actions/setup-go@v5 uses: actions/setup-go@v6
with: with:
go-version: ${{ matrix.go }} go-version: ${{ matrix.go }}
- name: Install dependencies - name: Install dependencies
@@ -53,7 +53,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Setup Go - name: Setup Go
uses: actions/setup-go@v5 uses: actions/setup-go@v6
with: with:
go-version: ${{ matrix.go }} go-version: ${{ matrix.go }}
- name: Install dependencies - name: Install dependencies
@@ -76,7 +76,7 @@ jobs:
- name: Checkout - name: Checkout
uses: actions/checkout@v4 uses: actions/checkout@v4
- name: Setup Go - name: Setup Go
uses: actions/setup-go@v5 uses: actions/setup-go@v6
with: with:
go-version: stable go-version: stable
- name: Installing go-licenses - name: Installing go-licenses

6
go.mod
View File

@@ -1,6 +1,6 @@
module github.com/pion/mediadevices module github.com/pion/mediadevices
go 1.21 go 1.24.0
require ( require (
github.com/blackjack/webcam v0.6.1 github.com/blackjack/webcam v0.6.1
@@ -12,8 +12,8 @@ require (
github.com/pion/rtcp v1.2.15 github.com/pion/rtcp v1.2.15
github.com/pion/rtp v1.8.19 github.com/pion/rtp v1.8.19
github.com/pion/webrtc/v4 v4.1.2 github.com/pion/webrtc/v4 v4.1.2
github.com/stretchr/testify v1.10.0 github.com/stretchr/testify v1.11.1
golang.org/x/image v0.23.0 golang.org/x/image v0.31.0
) )
require ( require (

8
go.sum
View File

@@ -56,14 +56,14 @@ github.com/pion/webrtc/v4 v4.1.2 h1:mpuUo/EJ1zMNKGE79fAdYNFZBX790KE7kQQpLMjjR54=
github.com/pion/webrtc/v4 v4.1.2/go.mod h1:xsCXiNAmMEjIdFxAYU0MbB3RwRieJsegSB2JZsGN+8U= github.com/pion/webrtc/v4 v4.1.2/go.mod h1:xsCXiNAmMEjIdFxAYU0MbB3RwRieJsegSB2JZsGN+8U=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU=
github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus= golang.org/x/crypto v0.33.0 h1:IOBPskki6Lysi0lo9qQvbxiQ+FvsCC/YWOecCHAixus=
golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M= golang.org/x/crypto v0.33.0/go.mod h1:bVdXmD7IV/4GdElGPozy6U7lWdRXA4qyRVGJV57uQ5M=
golang.org/x/image v0.23.0 h1:HseQ7c2OpPKTPVzNjG5fwJsOTCiiwS4QdsYi5XU6H68= golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA=
golang.org/x/image v0.23.0/go.mod h1:wJJBTdLfCCf3tiHa1fNxpZmUI4mmoZvwMCPP0ddoNKY= golang.org/x/image v0.31.0/go.mod h1:R9ec5Lcp96v9FTF+ajwaH3uGxPH4fKfHHAVbUILxghA=
golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8=
golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk=
golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201018230417-eeed37f84f13/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -2,6 +2,7 @@ package mediadevices
import ( import (
"io" "io"
"slices"
"testing" "testing"
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
@@ -93,13 +94,7 @@ func TestMediaStreamFilters(t *testing.T) {
} }
for _, a := range actual { for _, a := range actual {
found := false found := slices.Contains(expected, a)
for _, e := range expected {
if e == a {
found = true
break
}
}
if !found { if !found {
t.Fatalf("%s: Expected to find %p in the query results", t.Name(), a) t.Fatalf("%s: Expected to find %p in the query results", t.Name(), a)

View File

@@ -0,0 +1,48 @@
package codec
import (
"time"
)
type BitrateTracker struct {
windowSize time.Duration
buffer []int
times []time.Time
}
func NewBitrateTracker(windowSize time.Duration) *BitrateTracker {
return &BitrateTracker{
windowSize: windowSize,
}
}
func (bt *BitrateTracker) AddFrame(sizeBytes int, timestamp time.Time) {
bt.buffer = append(bt.buffer, sizeBytes)
bt.times = append(bt.times, timestamp)
// Remove old entries outside the window
cutoff := timestamp.Add(-bt.windowSize)
i := 0
for ; i < len(bt.times); i++ {
if bt.times[i].After(cutoff) {
break
}
}
bt.buffer = bt.buffer[i:]
bt.times = bt.times[i:]
}
func (bt *BitrateTracker) GetBitrate() float64 {
if len(bt.times) < 2 {
return 0
}
totalBytes := 0
for _, b := range bt.buffer {
totalBytes += b
}
duration := bt.times[len(bt.times)-1].Sub(bt.times[0]).Seconds()
if duration <= 0 {
return 0
}
return float64(totalBytes*8) / duration // bits per second
}

View File

@@ -0,0 +1,19 @@
package codec
import (
"math"
"testing"
"time"
)
func TestBitrateTracker(t *testing.T) {
packetSize := 1000
bt := NewBitrateTracker(time.Second)
bt.AddFrame(packetSize, time.Now())
bt.AddFrame(packetSize, time.Now().Add(time.Millisecond*100))
bt.AddFrame(packetSize, time.Now().Add(time.Millisecond*999))
eps := float64(packetSize*8) / 10
if got, want := bt.GetBitrate(), float64(packetSize*8)*3; math.Abs(got-want) > eps {
t.Fatalf("GetBitrate() = %v, want %v (|diff| <= %v)", got, want, eps)
}
}

View File

@@ -1,6 +1,8 @@
package codec package codec
import ( import (
"image"
"io"
"time" "time"
"github.com/pion/mediadevices/pkg/io/audio" "github.com/pion/mediadevices/pkg/io/audio"
@@ -153,10 +155,19 @@ type ReadCloser interface {
Controllable Controllable
} }
type VideoDecoderBuilder interface {
BuildVideoDecoder(r io.Reader, p prop.Media) (VideoDecoder, error)
}
type VideoDecoder interface {
Read() (image.Image, func(), error)
Close() error
}
// EncoderController is the interface allowing to control the encoder behaviour after it's initialisation. // EncoderController is the interface allowing to control the encoder behaviour after it's initialisation.
// It will possibly have common control method in the future. // It will possibly have common control method in the future.
// A controller can have optional methods represented by *Controller interfaces // A controller can have optional methods represented by *Controller interfaces
type EncoderController interface{} type EncoderController any
// Controllable is a interface representing a encoder which can be controlled // Controllable is a interface representing a encoder which can be controlled
// after it's initialisation with an EncoderController // after it's initialisation with an EncoderController
@@ -179,6 +190,12 @@ type BitRateController interface {
SetBitRate(int) error SetBitRate(int) error
} }
type QPController interface {
EncoderController
// DynamicQPControl adjusts the QP of the encoder based on the current and target bitrate
DynamicQPControl(currentBitrate int, targetBitrate int) error
}
// BaseParams represents an codec's encoding properties // BaseParams represents an codec's encoding properties
type BaseParams struct { type BaseParams struct {
// Target bitrate in bps. // Target bitrate in bps.

View File

@@ -54,6 +54,7 @@ import (
"fmt" "fmt"
"image" "image"
"io" "io"
"math"
"sync" "sync"
"time" "time"
"unsafe" "unsafe"
@@ -81,6 +82,12 @@ type encoder struct {
closed bool closed bool
} }
const (
kRateControlThreshold = 0.15
kMinQuantizer = 20
kMaxQuantizer = 63
)
// VP8Params is codec specific paramaters // VP8Params is codec specific paramaters
type VP8Params struct { type VP8Params struct {
Params Params
@@ -254,6 +261,10 @@ func (e *encoder) Read() ([]byte, func(), error) {
e.raw.d_w, e.raw.d_h = C.uint(width), C.uint(height) e.raw.d_w, e.raw.d_h = C.uint(width), C.uint(height)
} }
if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != 0 {
return nil, func() {}, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
}
duration := t.Sub(e.tLastFrame).Microseconds() duration := t.Sub(e.tLastFrame).Microseconds()
// VPX doesn't allow 0 duration. If 0 is given, vpx_codec_encode will fail with VPX_CODEC_INVALID_PARAM. // VPX doesn't allow 0 duration. If 0 is given, vpx_codec_encode will fail with VPX_CODEC_INVALID_PARAM.
// 0 duration is possible because mediadevices first gets the frame meta data by reading from the source, // 0 duration is possible because mediadevices first gets the frame meta data by reading from the source,
@@ -322,6 +333,24 @@ func (e *encoder) SetBitRate(bitrate int) error {
return nil return nil
} }
func (e *encoder) DynamicQPControl(currentBitrate int, targetBitrate int) error {
e.mu.Lock()
defer e.mu.Unlock()
bitrateDiff := math.Abs(float64(currentBitrate - targetBitrate))
if bitrateDiff <= float64(currentBitrate)*kRateControlThreshold {
return nil
}
currentMax := e.cfg.rc_max_quantizer
if targetBitrate < currentBitrate {
e.cfg.rc_max_quantizer = min(currentMax+1, kMaxQuantizer)
} else {
e.cfg.rc_max_quantizer = max(currentMax-1, kMinQuantizer)
}
e.cfg.rc_min_quantizer = e.cfg.rc_max_quantizer
return nil
}
func (e *encoder) Controller() codec.EncoderController { func (e *encoder) Controller() codec.EncoderController {
return e return e
} }

View File

@@ -0,0 +1,155 @@
package vpx
/*
#cgo pkg-config: vpx
#include <stdlib.h>
#include <stdint.h>
#include <vpx/vpx_decoder.h>
#include <vpx/vpx_codec.h>
#include <vpx/vpx_image.h>
#include <vpx/vp8dx.h>
vpx_codec_iface_t *ifaceVP8Decoder() {
return vpx_codec_vp8_dx();
}
vpx_codec_iface_t *ifaceVP9Decoder() {
return vpx_codec_vp9_dx();
}
// Allocates a new decoder context
vpx_codec_ctx_t* newDecoderCtx() {
return (vpx_codec_ctx_t*)malloc(sizeof(vpx_codec_ctx_t));
}
// Initializes the decoder
vpx_codec_err_t decoderInit(vpx_codec_ctx_t* ctx, vpx_codec_iface_t* iface) {
return vpx_codec_dec_init_ver(ctx, iface, NULL, 0, VPX_DECODER_ABI_VERSION);
}
// Decodes an encoded frame
vpx_codec_err_t decodeFrame(vpx_codec_ctx_t* ctx, const uint8_t* data, unsigned int data_sz) {
return vpx_codec_decode(ctx, data, data_sz, NULL, 0);
}
// Creates an iterator
vpx_codec_iter_t* newIter() {
return (vpx_codec_iter_t*)malloc(sizeof(vpx_codec_iter_t));
}
// Returns the next decoded frame
vpx_image_t* getFrame(vpx_codec_ctx_t* ctx, vpx_codec_iter_t* iter) {
return vpx_codec_get_frame(ctx, iter);
}
// Frees a decoded frane
void freeFrame(vpx_image_t* f) {
vpx_img_free(f);
}
// Frees a decoder context
void freeDecoderCtx(vpx_codec_ctx_t* ctx) {
vpx_codec_destroy(ctx);
free(ctx);
}
*/
import "C"
import (
"fmt"
"image"
"io"
"sync"
"time"
"unsafe"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/prop"
)
type decoder struct {
codec *C.vpx_codec_ctx_t
raw *C.vpx_image_t
cfg *C.vpx_codec_dec_cfg_t
iter C.vpx_codec_iter_t
frameIndex int
tStart time.Time
tLastFrame time.Time
reader io.Reader
buf []byte
mu sync.Mutex
closed bool
}
func BuildVideoDecoder(r io.Reader, property prop.Media) (codec.VideoDecoder, error) {
return NewDecoder(r, property)
}
func NewDecoder(r io.Reader, p prop.Media) (codec.VideoDecoder, error) {
cfg := &C.vpx_codec_dec_cfg_t{}
cfg.threads = 1
cfg.w = C.uint(p.Width)
cfg.h = C.uint(p.Height)
codec := C.newDecoderCtx()
if C.decoderInit(codec, C.ifaceVP8Decoder()) != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_dec_init failed")
}
return &decoder{
codec: codec,
cfg: cfg,
iter: nil, // initialize to NULL to start iteration
reader: r,
buf: make([]byte, 1024*1024),
}, nil
}
func (d *decoder) Read() (image.Image, func(), error) {
var input *C.vpx_image_t
for {
input = C.getFrame(d.codec, &d.iter)
if input != nil {
break
}
d.iter = nil
// Read if there are no remained frames in the decoder
n, err := d.reader.Read(d.buf)
if err != nil {
return nil, nil, err
}
status := C.decodeFrame(d.codec, (*C.uint8_t)(&d.buf[0]), C.uint(n))
if status != C.VPX_CODEC_OK {
return nil, nil, fmt.Errorf("decode failed: %v", status)
}
}
w := int(input.d_w)
h := int(input.d_h)
yStride := int(input.stride[0])
uStride := int(input.stride[1])
vStride := int(input.stride[2])
ySrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[0])), yStride*h)
uSrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[1])), uStride*h/2)
vSrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[2])), vStride*h/2)
dst := image.NewYCbCr(image.Rect(0, 0, w, h), image.YCbCrSubsampleRatio420)
// copy luma
for r := 0; r < h; r++ {
copy(dst.Y[r*dst.YStride:r*dst.YStride+w], ySrc[r*yStride:r*yStride+w])
}
// copy chroma
for r := 0; r < h/2; r++ {
copy(dst.Cb[r*dst.CStride:r*dst.CStride+w/2], uSrc[r*uStride:r*uStride+w/2])
copy(dst.Cr[r*dst.CStride:r*dst.CStride+w/2], vSrc[r*vStride:r*vStride+w/2])
}
C.freeFrame(input)
return dst, func() {}, nil
}
func (d *decoder) Close() error {
C.freeDecoderCtx(d.codec)
d.closed = true
return nil
}

View File

@@ -4,6 +4,9 @@ import (
"context" "context"
"image" "image"
"io" "io"
"math"
"math/rand"
"sync"
"sync/atomic" "sync/atomic"
"testing" "testing"
"time" "time"
@@ -13,6 +16,7 @@ import (
"github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
"github.com/stretchr/testify/assert"
) )
func TestEncoder(t *testing.T) { func TestEncoder(t *testing.T) {
@@ -360,3 +364,155 @@ func TestEncoderFrameMonotonic(t *testing.T) {
} }
} }
} }
func TestVP8DynamicQPControl(t *testing.T) {
t.Run("VP8", func(t *testing.T) {
p, err := NewVP8Params()
if err != nil {
t.Fatal(err)
}
p.LagInFrames = 0 // Disable frame lag buffering for real-time encoding
p.RateControlEndUsage = RateControlCBR
totalFrames := 100
frameRate := 10
initialWidth, initialHeight := 800, 600
var cnt uint32
r, err := p.BuildVideoEncoder(
video.ReaderFunc(func() (image.Image, func(), error) {
i := atomic.AddUint32(&cnt, 1)
if i == uint32(totalFrames+1) {
return nil, nil, io.EOF
}
img := image.NewYCbCr(image.Rect(0, 0, initialWidth, initialHeight), image.YCbCrSubsampleRatio420)
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for i := range img.Y {
img.Y[i] = uint8(r.Intn(256))
}
for i := range img.Cb {
img.Cb[i] = uint8(r.Intn(256))
}
for i := range img.Cr {
img.Cr[i] = uint8(r.Intn(256))
}
return img, func() {}, nil
}),
prop.Media{
Video: prop.Video{
Width: initialWidth,
Height: initialHeight,
FrameRate: float32(frameRate),
FrameFormat: frame.FormatI420,
},
},
)
if err != nil {
t.Fatal(err)
}
initialBitrate := 100
currentBitrate := initialBitrate
targetBitrate := 300
for i := 0; i < totalFrames; i++ {
r.Controller().(codec.KeyFrameController).ForceKeyFrame()
r.Controller().(codec.QPController).DynamicQPControl(currentBitrate, targetBitrate)
data, rel, err := r.Read()
if err != nil {
t.Fatal(err)
}
rel()
encodedSize := len(data)
currentBitrate = encodedSize * 8 / 1000 / frameRate
}
assert.Less(t, math.Abs(float64(targetBitrate-currentBitrate)), math.Abs(float64(initialBitrate-currentBitrate)))
})
}
func TestVP8EncodeDecode(t *testing.T) {
t.Run("VP8", func(t *testing.T) {
initialWidth, initialHeight := 800, 600
reader, writer := io.Pipe()
decoder, err := BuildVideoDecoder(reader, prop.Media{
Video: prop.Video{
Width: initialWidth,
Height: initialHeight,
FrameFormat: frame.FormatI420,
},
})
if err != nil {
t.Fatalf("Error creating VP8 decoder: %v", err)
}
defer decoder.Close()
// [... encoder setup code ...]
p, err := NewVP8Params()
if err != nil {
t.Fatal(err)
}
p.LagInFrames = 0 // Disable frame lag buffering for real-time encoding
p.RateControlEndUsage = RateControlCBR
totalFrames := 10
var cnt uint32
r, err := p.BuildVideoEncoder(
video.ReaderFunc(func() (image.Image, func(), error) {
i := atomic.AddUint32(&cnt, 1)
if i == uint32(totalFrames+1) {
return nil, nil, io.EOF
}
img := image.NewYCbCr(image.Rect(0, 0, initialWidth, initialHeight), image.YCbCrSubsampleRatio420)
return img, func() {}, nil
}),
prop.Media{
Video: prop.Video{
Width: initialWidth,
Height: initialHeight,
FrameRate: 30,
FrameFormat: frame.FormatI420,
},
},
)
if err != nil {
t.Fatal(err)
}
var wg sync.WaitGroup
wg.Add(1)
counter := 0
go func() {
defer wg.Done()
for {
img, rel, err := decoder.Read()
if err == io.EOF {
return
}
if err != nil {
t.Errorf("decoder read error: %v", err)
return
}
assert.Equal(t, initialWidth, img.Bounds().Dx())
assert.Equal(t, initialHeight, img.Bounds().Dy())
rel()
counter++
}
}()
// --- feed encoded frames to writer
for {
data, rel, err := r.Read()
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("encoder error: %v", err)
}
_, werr := writer.Write(data)
rel()
if werr != nil {
t.Fatalf("writer error: %v", werr)
}
}
writer.Close()
wg.Wait()
assert.Equal(t, totalFrames, counter)
})
}

View File

@@ -1,7 +1,8 @@
// Package vnc implements a VNC client. // Package vnc implements a VNC client.
// //
// References: // References:
// [PROTOCOL]: http://tools.ietf.org/html/rfc6143 //
// [PROTOCOL]: http://tools.ietf.org/html/rfc6143
package vnc package vnc
import ( import (
@@ -96,7 +97,7 @@ func (c *ClientConn) CutText(text string) error {
var buf bytes.Buffer var buf bytes.Buffer
// This is the fixed size data we'll send // This is the fixed size data we'll send
fixedData := []interface{}{ fixedData := []any{
uint8(6), uint8(6),
uint8(0), uint8(0),
uint8(0), uint8(0),
@@ -141,7 +142,7 @@ func (c *ClientConn) FramebufferUpdateRequest(incremental bool, x, y, width, hei
incrementalByte = 1 incrementalByte = 1
} }
data := []interface{}{ data := []any{
uint8(3), uint8(3),
incrementalByte, incrementalByte,
x, y, width, height, x, y, width, height,
@@ -172,7 +173,7 @@ func (c *ClientConn) KeyEvent(keysym uint32, down bool) error {
downFlag = 1 downFlag = 1
} }
data := []interface{}{ data := []any{
uint8(4), uint8(4),
downFlag, downFlag,
uint8(0), uint8(0),
@@ -199,7 +200,7 @@ func (c *ClientConn) KeyEvent(keysym uint32, down bool) error {
func (c *ClientConn) PointerEvent(mask ButtonMask, x, y uint16) error { func (c *ClientConn) PointerEvent(mask ButtonMask, x, y uint16) error {
var buf bytes.Buffer var buf bytes.Buffer
data := []interface{}{ data := []any{
uint8(5), uint8(5),
uint8(mask), uint8(mask),
x, x,
@@ -225,7 +226,7 @@ func (c *ClientConn) PointerEvent(mask ButtonMask, x, y uint16) error {
// //
// See RFC 6143 Section 7.5.2 // See RFC 6143 Section 7.5.2
func (c *ClientConn) SetEncodings(encs []Encoding) error { func (c *ClientConn) SetEncodings(encs []Encoding) error {
data := make([]interface{}, 3+len(encs)) data := make([]any, 3+len(encs))
data[0] = uint8(2) data[0] = uint8(2)
data[1] = uint8(0) data[1] = uint8(0)
data[2] = uint16(len(encs)) data[2] = uint16(len(encs))
@@ -319,7 +320,7 @@ func (c *ClientConn) handshake() error {
} }
// Respond with the version we will support // Respond with the version we will support
if maxMinor<8 { if maxMinor < 8 {
if _, err = c.c.Write([]byte("RFB 003.003\n")); err != nil { if _, err = c.c.Write([]byte("RFB 003.003\n")); err != nil {
return err return err
} }
@@ -331,7 +332,7 @@ func (c *ClientConn) handshake() error {
if numSecurityTypes == 0 { if numSecurityTypes == 0 {
return fmt.Errorf("no security types: %s", c.readErrorReason()) return fmt.Errorf("no security types: %s", c.readErrorReason())
} }
}else{ } else {
if _, err = c.c.Write([]byte("RFB 003.008\n")); err != nil { if _, err = c.c.Write([]byte("RFB 003.008\n")); err != nil {
return err return err
} }

View File

@@ -63,7 +63,7 @@ func (*FramebufferUpdateMessage) Read(c *ClientConn, r io.Reader) (ServerMessage
var encodingType int32 var encodingType int32
rect := &rects[i] rect := &rects[i]
data := []interface{}{ data := []any{
&rect.X, &rect.X,
&rect.Y, &rect.Y,
&rect.Width, &rect.Width,
@@ -128,7 +128,7 @@ func (*SetColorMapEntriesMessage) Read(c *ClientConn, r io.Reader) (ServerMessag
for i := uint16(0); i < numColors; i++ { for i := uint16(0); i < numColors; i++ {
color := &result.Colors[i] color := &result.Colors[i]
data := []interface{}{ data := []any{
&color.R, &color.R,
&color.G, &color.G,
&color.B, &color.B,

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core coreConfig = config.Core
} }
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) { broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (any, func(), error) {
return source.Read() return source.Read()
}), coreConfig) }), coreConfig)
@@ -39,11 +39,11 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer // buffer, this means that slow readers might miss some data if they're really late and the data is no longer
// in the ring buffer. // in the ring buffer.
func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader { func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader {
copyFn := func(src interface{}) interface{} { return src } copyFn := func(src any) any { return src }
if copyChunk { if copyChunk {
buffer := wave.NewBuffer() buffer := wave.NewBuffer()
copyFn = func(src interface{}) interface{} { copyFn = func(src any) any {
realSrc, _ := src.(wave.Audio) realSrc, _ := src.(wave.Audio)
buffer.StoreCopy(realSrc) buffer.StoreCopy(realSrc)
return buffer.Load() return buffer.Load()
@@ -60,7 +60,7 @@ func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader {
// ReplaceSource replaces the underlying source. This operation is thread safe. // ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) { return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (any, func(), error) {
return source.Read() return source.Read()
})) }))
} }

View File

@@ -17,7 +17,7 @@ const (
var errEmptySource = fmt.Errorf("Source can't be nil") var errEmptySource = fmt.Errorf("Source can't be nil")
type broadcasterData struct { type broadcasterData struct {
data interface{} data any
count uint32 count uint32
err error err error
} }
@@ -124,10 +124,10 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
// copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring // copyFn is used to copy the data from the source to individual readers. Broadcaster uses a small ring
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer // buffer, this means that slow readers might miss some data if they're really late and the data is no longer
// in the ring buffer. // in the ring buffer.
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader { func (broadcaster *Broadcaster) NewReader(copyFn func(any) any) Reader {
currentCount := broadcaster.buffer.lastCount() currentCount := broadcaster.buffer.lastCount()
return ReaderFunc(func() (data interface{}, release func(), err error) { return ReaderFunc(func() (data any, release func(), err error) {
currentCount++ currentCount++
if push := broadcaster.buffer.acquire(currentCount); push != nil { if push := broadcaster.buffer.acquire(currentCount); push != nil {
data, _, err = broadcaster.source.Load().(Reader).Read() data, _, err = broadcaster.source.Load().(Reader).Read()

View File

@@ -57,7 +57,7 @@ func TestBroadcast(t *testing.T) {
frameCount := 0 frameCount := 0
frameSent := 0 frameSent := 0
lastSend := time.Now() lastSend := time.Now()
src = ReaderFunc(func() (interface{}, func(), error) { src = ReaderFunc(func() (any, func(), error) {
if pauseCond.src && frameSent == 30 { if pauseCond.src && frameSent == 30 {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
@@ -85,7 +85,7 @@ func TestBroadcast(t *testing.T) {
wg.Add(n) wg.Add(n)
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
go func() { go func() {
reader := broadcaster.NewReader(func(src interface{}) interface{} { return src }) reader := broadcaster.NewReader(func(src any) any { return src })
count := 0 count := 0
lastFrameCount := -1 lastFrameCount := -1
droppedFrames := 0 droppedFrames := 0

View File

@@ -11,13 +11,13 @@ type Reader interface {
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence, // there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish // these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect. // slower as the heap memory usage increases and more garbage to collect.
Read() (data interface{}, release func(), err error) Read() (data any, release func(), err error)
} }
// ReaderFunc is a proxy type for Reader // ReaderFunc is a proxy type for Reader
type ReaderFunc func() (data interface{}, release func(), err error) type ReaderFunc func() (data any, release func(), err error)
func (f ReaderFunc) Read() (data interface{}, release func(), err error) { func (f ReaderFunc) Read() (data any, release func(), err error) {
data, release, err = f() data, release, err = f()
return return
} }

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core coreConfig = config.Core
} }
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) { broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (any, func(), error) {
return source.Read() return source.Read()
}), coreConfig) }), coreConfig)
@@ -39,11 +39,11 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
// buffer, this means that slow readers might miss some data if they're really late and the data is no longer // buffer, this means that slow readers might miss some data if they're really late and the data is no longer
// in the ring buffer. // in the ring buffer.
func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader { func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
copyFn := func(src interface{}) interface{} { return src } copyFn := func(src any) any { return src }
if copyFrame { if copyFrame {
buffer := NewFrameBuffer(0) buffer := NewFrameBuffer(0)
copyFn = func(src interface{}) interface{} { copyFn = func(src any) any {
realSrc, _ := src.(image.Image) realSrc, _ := src.(image.Image)
buffer.StoreCopy(realSrc) buffer.StoreCopy(realSrc)
return buffer.Load() return buffer.Load()
@@ -60,7 +60,7 @@ func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
// ReplaceSource replaces the underlying source. This operation is thread safe. // ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) { return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (any, func(), error) {
return source.Read() return source.Read()
})) }))
} }

View File

@@ -3,6 +3,7 @@ package prop
import ( import (
"fmt" "fmt"
"math" "math"
"slices"
"strings" "strings"
"time" "time"
) )
@@ -54,10 +55,8 @@ type DurationOneOf []time.Duration
// Compare implements DurationConstraint. // Compare implements DurationConstraint.
func (d DurationOneOf) Compare(a time.Duration) (float64, bool) { func (d DurationOneOf) Compare(a time.Duration) (float64, bool) {
for _, ii := range d { if slices.Contains(d, a) {
if ii == a { return 0.0, true
return 0.0, true
}
} }
return 1.0, false return 1.0, false
} }

View File

@@ -3,6 +3,7 @@ package prop
import ( import (
"fmt" "fmt"
"math" "math"
"slices"
"strings" "strings"
) )
@@ -53,10 +54,8 @@ type FloatOneOf []float32
// Compare implements FloatConstraint. // Compare implements FloatConstraint.
func (f FloatOneOf) Compare(a float32) (float64, bool) { func (f FloatOneOf) Compare(a float32) (float64, bool) {
for _, ff := range f { if slices.Contains(f, a) {
if ff == a { return 0.0, true
return 0.0, true
}
} }
return 1.0, false return 1.0, false
} }

View File

@@ -3,6 +3,7 @@ package prop
import ( import (
"fmt" "fmt"
"github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/frame"
"slices"
"strings" "strings"
) )
@@ -56,10 +57,8 @@ type FrameFormatOneOf []frame.Format
// Compare implements FrameFormatConstraint. // Compare implements FrameFormatConstraint.
func (f FrameFormatOneOf) Compare(a frame.Format) (float64, bool) { func (f FrameFormatOneOf) Compare(a frame.Format) (float64, bool) {
for _, ff := range f { if slices.Contains(f, a) {
if ff == a { return 0.0, true
return 0.0, true
}
} }
return 1.0, false return 1.0, false
} }

View File

@@ -3,6 +3,7 @@ package prop
import ( import (
"fmt" "fmt"
"math" "math"
"slices"
"strings" "strings"
) )
@@ -53,10 +54,8 @@ type IntOneOf []int
// Compare implements IntConstraint. // Compare implements IntConstraint.
func (i IntOneOf) Compare(a int) (float64, bool) { func (i IntOneOf) Compare(a int) (float64, bool) {
for _, ii := range i { if slices.Contains(i, a) {
if ii == a { return 0.0, true
return 0.0, true
}
} }
return 1.0, false return 1.0, false
} }

View File

@@ -32,7 +32,7 @@ func (m *Media) String() string {
return prettifyStruct(m) return prettifyStruct(m)
} }
func prettifyStruct(i interface{}) string { func prettifyStruct(i any) string {
var rows []string var rows []string
var addRows func(int, reflect.Value) var addRows func(int, reflect.Value)
addRows = func(level int, obj reflect.Value) { addRows = func(level int, obj reflect.Value) {
@@ -67,7 +67,7 @@ type setterFn func(fieldA, fieldB reflect.Value)
// merge merges all the field values from o to p, except zero values. It's guaranteed that setterFn will be called // merge merges all the field values from o to p, except zero values. It's guaranteed that setterFn will be called
// when fieldA and fieldB are not struct. // when fieldA and fieldB are not struct.
func (p *Media) merge(o interface{}, set setterFn) { func (p *Media) merge(o any, set setterFn) {
rp := reflect.ValueOf(p).Elem() rp := reflect.ValueOf(p).Elem()
ro := reflect.ValueOf(o) ro := reflect.ValueOf(o)
@@ -159,13 +159,13 @@ func (p *MediaConstraints) FitnessDistance(o Media) (float64, bool) {
} }
type comparisons []struct { type comparisons []struct {
desired, actual interface{} desired, actual any
} }
func (c *comparisons) add(desired, actual interface{}) { func (c *comparisons) add(desired, actual any) {
if desired != nil { if desired != nil {
*c = append(*c, *c = append(*c,
struct{ desired, actual interface{} }{ struct{ desired, actual any }{
desired, actual, desired, actual,
}, },
) )

View File

@@ -2,6 +2,7 @@ package prop
import ( import (
"fmt" "fmt"
"slices"
"strings" "strings"
) )
@@ -55,10 +56,8 @@ type StringOneOf []string
// Compare implements StringConstraint. // Compare implements StringConstraint.
func (f StringOneOf) Compare(a string) (float64, bool) { func (f StringOneOf) Compare(a string) (float64, bool) {
for _, ff := range f { if slices.Contains(f, a) {
if ff == a { return 0.0, true
return 0.0, true
}
} }
return 1.0, false return 1.0, false
} }