Compare commits

...

5 Commits

Author SHA1 Message Date
Lukas Herman
af6d31fde5 Revert go.mod 2020-10-30 00:37:38 -07:00
Lukas Herman
2f5e4ee914 New mediadevices design
Changelog:
  * Better support for non-webrtc use cases
  * Enable multiple readers
  * Enhance codec selectors
  * Update APIs to reflect on the new v3 webrtc design
  * Cleaner APIs
2020-10-30 00:33:55 -07:00
Lukas Herman
1720eee38c Fix unpropagated audio sampling rate from microphones 2020-10-29 22:41:10 -07:00
Lukas Herman
00877c74a0 Add audio latency detection 2020-10-29 22:37:29 -07:00
Lukas Herman
559c6a13a1 Update readers to be memory pool friendly 2020-10-29 00:04:12 -07:00
60 changed files with 840 additions and 784 deletions

117
codec.go Normal file
View File

@@ -0,0 +1,117 @@
package mediadevices
import (
"errors"
"fmt"
"strings"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
)
// CodecSelector is a container of video and audio encoder builders, which later will be used
// for codec matching.
type CodecSelector struct {
videoEncoders []codec.VideoEncoderBuilder
audioEncoders []codec.AudioEncoderBuilder
}
// CodecSelectorOption is a type for specifying CodecSelector options
type CodecSelectorOption func(*CodecSelector)
// WithVideoEncoders replace current video codecs with listed encoders
func WithVideoEncoders(encoders ...codec.VideoEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.videoEncoders = encoders
}
}
// WithVideoEncoders replace current audio codecs with listed encoders
func WithAudioEncoders(encoders ...codec.AudioEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.audioEncoders = encoders
}
}
// NewCodecSelector constructs CodecSelector with given variadic options
func NewCodecSelector(opts ...CodecSelectorOption) *CodecSelector {
var track CodecSelector
for _, opt := range opts {
opt(&track)
}
return &track
}
// Populate lets the webrtc engine be aware of supported codecs that are contained in CodecSelector
func (selector *CodecSelector) Populate(setting *webrtc.MediaEngine) {
for _, encoder := range selector.videoEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
for _, encoder := range selector.audioEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
}
func (selector *CodecSelector) selectVideoCodec(wantCodecs []*webrtc.RTPCodec, reader video.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.VideoEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.videoEncoders {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildVideoEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectAudioCodec(wantCodecs []*webrtc.RTPCodec, reader audio.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.AudioEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.audioEncoders {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildAudioEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}

77
examples/http/main.go Normal file
View File

@@ -0,0 +1,77 @@
// This is an example of using mediadevices to broadcast your camera through http.
// The example doesn't aim to be performant, but rather it strives to be simple.
package main
import (
"bytes"
"fmt"
"image/jpeg"
"io"
"log"
"mime/multipart"
"net/http"
"net/textproto"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/prop"
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
// _ "github.com/pion/mediadevices/pkg/driver/videotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
)
func must(err error) {
if err != nil {
panic(err)
}
}
func main() {
s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(constraint *mediadevices.MediaTrackConstraints) {
constraint.Width = prop.Int(600)
constraint.Height = prop.Int(400)
},
})
must(err)
t := s.GetVideoTracks()[0]
videoTrack := t.(*mediadevices.VideoTrack)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf bytes.Buffer
videoReader := videoTrack.NewReader(false)
mimeWriter := multipart.NewWriter(w)
contentType := fmt.Sprintf("multipart/x-mixed-replace;boundary=%s", mimeWriter.Boundary())
w.Header().Add("Content-Type", contentType)
partHeader := make(textproto.MIMEHeader)
partHeader.Add("Content-Type", "image/jpeg")
for {
frame, release, err := videoReader.Read()
if err == io.EOF {
return
}
must(err)
err = jpeg.Encode(&buf, frame, nil)
// Since we're done with img, we need to release img so that that the original owner can reuse
// this memory.
release()
must(err)
partWriter, err := mimeWriter.CreatePart(partHeader)
must(err)
_, err = partWriter.Write(buf.Bytes())
buf.Reset()
must(err)
}
})
fmt.Println("listening on http://localhost:1313")
log.Println(http.ListenAndServe("localhost:1313", nil))
}

View File

@@ -5,26 +5,23 @@ import (
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/examples/internal/signal"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
// This is required to use opus audio encoder
"github.com/pion/mediadevices/pkg/codec/opus"
// If you don't like vpx, you can also use x264 by importing as below
// "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
// or you can also use openh264 for alternative h264 implementation
// "github.com/pion/mediadevices/pkg/codec/openh264"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
"github.com/pion/mediadevices/pkg/codec/openh264" // This is required to use VP8/VP9 video encoder
"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use VP8/VP9 video encoder
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
// _ "github.com/pion/mediadevices/pkg/driver/videotest"
// _ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
_ "github.com/pion/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter
_ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
)
const (
@@ -61,44 +58,48 @@ func main() {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
})
md := mediadevices.NewMediaDevices(peerConnection)
vp8Params, err := openh264.NewParams()
if err != nil {
panic(err)
}
vp8Params.BitRate = 300_000 // 300kbps
opusParams, err := opus.NewParams()
if err != nil {
panic(err)
}
opusParams.BitRate = 32000 // 32kbps
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
mediadevices.WithAudioEncoders(&opusParams),
)
vp8Params, err := vpx.NewVP8Params()
if err != nil {
panic(err)
}
vp8Params.BitRate = 100000 // 100kbps
s, err := md.GetUserMedia(mediadevices.MediaStreamConstraints{
Audio: func(c *mediadevices.MediaTrackConstraints) {
c.Enabled = true
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&opusParams}
},
s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
c.Enabled = true
c.Width = prop.Int(640)
c.Height = prop.Int(480)
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&vp8Params}
},
Audio: func(c *mediadevices.MediaTrackConstraints) {
},
Codec: codecSelector,
})
if err != nil {
panic(err)
}
for _, tracker := range s.GetTracks() {
t := tracker.Track()
tracker.OnEnded(func(err error) {
fmt.Printf("Track (ID: %s, Label: %s) ended with error: %v\n",
t.ID(), t.Label(), err)
fmt.Printf("Track (ID: %s) ended with error: %v\n",
tracker.ID(), err)
})
_, err = peerConnection.AddTransceiverFromTrack(t,
// In Pion/webrtc v3, bind will be called automatically after SDP negotiation
webrtcTrack, err := tracker.Bind(peerConnection)
if err != nil {
panic(err)
}
_, err = peerConnection.AddTransceiverFromTrack(webrtcTrack,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},

View File

@@ -7,95 +7,26 @@ import (
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
)
var errNotFound = fmt.Errorf("failed to find the best driver that fits the constraints")
// MediaDevices is an interface that's defined on https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices
type MediaDevices interface {
GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error)
GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error)
EnumerateDevices() []MediaDeviceInfo
}
// NewMediaDevices creates MediaDevices interface that provides access to connected media input devices
// like cameras and microphones, as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevices(pc *webrtc.PeerConnection, opts ...MediaDevicesOption) MediaDevices {
codecs := make(map[webrtc.RTPCodecType][]*webrtc.RTPCodec)
for _, kind := range []webrtc.RTPCodecType{
webrtc.RTPCodecTypeAudio,
webrtc.RTPCodecTypeVideo,
} {
codecs[kind] = pc.GetRegisteredRTPCodecs(kind)
}
return NewMediaDevicesFromCodecs(codecs, opts...)
}
// NewMediaDevicesFromCodecs creates MediaDevices interface from lists of the available codecs
// that provides access to connected media input devices like cameras and microphones,
// as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevicesFromCodecs(codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec, opts ...MediaDevicesOption) MediaDevices {
mdo := MediaDevicesOptions{
codecs: codecs,
trackGenerator: defaultTrackGenerator,
}
for _, o := range opts {
o(&mdo)
}
return &mediaDevices{
MediaDevicesOptions: mdo,
}
}
// TrackGenerator is a function to create new track.
type TrackGenerator func(payloadType uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error)
var defaultTrackGenerator = TrackGenerator(func(pt uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error) {
return webrtc.NewTrack(pt, ssrc, id, label, codec)
})
type mediaDevices struct {
MediaDevicesOptions
}
// MediaDevicesOptions stores parameters used by MediaDevices.
type MediaDevicesOptions struct {
codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec
trackGenerator TrackGenerator
}
// MediaDevicesOption is a type of MediaDevices functional option.
type MediaDevicesOption func(*MediaDevicesOptions)
// WithTrackGenerator specifies a TrackGenerator to use customized track.
func WithTrackGenerator(gen TrackGenerator) MediaDevicesOption {
return func(o *MediaDevicesOptions) {
o.trackGenerator = gen
}
}
// GetDisplayMedia prompts the user to select and grant permission to capture the contents
// of a display or portion thereof (such as a window) as a MediaStream.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia
func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) {
trackers := make([]Tracker, 0)
func GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) {
trackers := make([]Track, 0)
cleanTrackers := func() {
for _, t := range trackers {
t.Stop()
t.Close()
}
}
var videoConstraints MediaTrackConstraints
if constraints.Video != nil {
constraints.Video(&videoConstraints)
}
if videoConstraints.Enabled {
tracker, err := m.selectScreen(videoConstraints)
tracker, err := selectScreen(videoConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -116,27 +47,20 @@ func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (Medi
// GetUserMedia prompts the user for permission to use a media input which produces a MediaStream
// with tracks containing the requested types of media.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) {
func GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) {
// TODO: It should return media stream based on constraints
trackers := make([]Tracker, 0)
trackers := make([]Track, 0)
cleanTrackers := func() {
for _, t := range trackers {
t.Stop()
t.Close()
}
}
var videoConstraints, audioConstraints MediaTrackConstraints
if constraints.Video != nil {
constraints.Video(&videoConstraints)
}
if constraints.Audio != nil {
constraints.Audio(&audioConstraints)
}
if videoConstraints.Enabled {
tracker, err := m.selectVideo(videoConstraints)
tracker, err := selectVideo(videoConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -145,8 +69,9 @@ func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaSt
trackers = append(trackers, tracker)
}
if audioConstraints.Enabled {
tracker, err := m.selectAudio(audioConstraints)
if constraints.Audio != nil {
constraints.Audio(&audioConstraints)
tracker, err := selectAudio(audioConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -240,7 +165,7 @@ func selectBestDriver(filter driver.FilterFn, constraints MediaTrackConstraints)
return bestDriver, constraints, nil
}
func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker, error) {
func selectAudio(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterAudioRecorder()
d, c, err := selectBestDriver(typeFilter, constraints)
@@ -248,9 +173,9 @@ func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, error) {
func selectVideo(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder()
notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen))
filter := driver.FilterAnd(typeFilter, notScreenFilter)
@@ -260,10 +185,10 @@ func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, error) {
func selectScreen(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder()
screenFilter := driver.FilterDeviceType(driver.Screen)
filter := driver.FilterAnd(typeFilter, screenFilter)
@@ -273,10 +198,10 @@ func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) EnumerateDevices() []MediaDeviceInfo {
func EnumerateDevices() []MediaDeviceInfo {
drivers := driver.GetManager().Query(
driver.FilterFn(func(driver.Driver) bool { return true }))
info := make([]MediaDeviceInfo, 0, len(drivers))

View File

@@ -1,91 +1,42 @@
package mediadevices
import (
"errors"
"io"
"testing"
"time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver"
_ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/videotest"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
func TestGetUserMedia(t *testing.T) {
videoParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 100000,
},
name: "MockVideo",
}
audioParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 32000,
},
name: "MockAudio",
}
md := NewMediaDevicesFromCodecs(
map[webrtc.RTPCodecType][]*webrtc.RTPCodec{
webrtc.RTPCodecTypeVideo: {
{Type: webrtc.RTPCodecTypeVideo, Name: "MockVideo", PayloadType: 1},
},
webrtc.RTPCodecTypeAudio: {
{Type: webrtc.RTPCodecTypeAudio, Name: "MockAudio", PayloadType: 2},
},
},
WithTrackGenerator(
func(_ uint8, _ uint32, id, _ string, codec *webrtc.RTPCodec) (
LocalTrack, error,
) {
return newMockTrack(codec, id), nil
},
),
)
constraints := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) {
c.Enabled = true
c.Width = prop.Int(640)
c.Height = prop.Int(480)
params := videoParams
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
},
Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
},
}
constraintsWrong := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) {
c.Enabled = true
c.Width = prop.Int(640)
c.Width = prop.IntExact(10000)
c.Height = prop.Int(480)
params := videoParams
params.BitRate = 0
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
},
Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
},
}
// GetUserMedia with broken parameters
ms, err := md.GetUserMedia(constraintsWrong)
ms, err := GetUserMedia(constraintsWrong)
if err == nil {
t.Fatal("Expected error, but got nil")
}
// GetUserMedia with correct parameters
ms, err = md.GetUserMedia(constraints)
ms, err = GetUserMedia(constraints)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -103,11 +54,11 @@ func TestGetUserMedia(t *testing.T) {
time.Sleep(50 * time.Millisecond)
for _, track := range tracks {
track.Stop()
track.Close()
}
// Stop and retry GetUserMedia
ms, err = md.GetUserMedia(constraints)
ms, err = GetUserMedia(constraints)
if err != nil {
t.Fatalf("Failed to GetUserMedia after the previsous tracks stopped: %v", err)
}
@@ -124,106 +75,10 @@ func TestGetUserMedia(t *testing.T) {
}
time.Sleep(50 * time.Millisecond)
for _, track := range tracks {
track.Stop()
track.Close()
}
}
type mockTrack struct {
codec *webrtc.RTPCodec
id string
}
func newMockTrack(codec *webrtc.RTPCodec, id string) *mockTrack {
return &mockTrack{
codec: codec,
id: id,
}
}
func (t *mockTrack) WriteSample(s media.Sample) error {
return nil
}
func (t *mockTrack) Codec() *webrtc.RTPCodec {
return t.codec
}
func (t *mockTrack) ID() string {
return t.id
}
func (t *mockTrack) Kind() webrtc.RTPCodecType {
return t.codec.Type
}
type mockParams struct {
codec.BaseParams
name string
}
func (params *mockParams) RTPCodec() *codec.RTPCodec {
rtpCodec := codec.NewRTPH264Codec(90000)
rtpCodec.Name = params.name
return rtpCodec
}
func (params *mockParams) BuildVideoEncoder(r video.Reader, p prop.Media) (codec.ReadCloser, error) {
if params.BitRate == 0 {
// This is a dummy error to test the failure condition.
return nil, errors.New("wrong codec parameter")
}
return &mockVideoCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
func (params *mockParams) BuildAudioEncoder(r audio.Reader, p prop.Media) (codec.ReadCloser, error) {
return &mockAudioCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
type mockCodec struct{}
func (e *mockCodec) SetBitRate(b int) error {
return nil
}
func (e *mockCodec) ForceKeyFrame() error {
return nil
}
type mockVideoCodec struct {
mockCodec
r video.Reader
closed chan struct{}
}
func (m *mockVideoCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockVideoCodec) Close() error { return nil }
type mockAudioCodec struct {
mockCodec
r audio.Reader
closed chan struct{}
}
func (m *mockAudioCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockAudioCodec) Close() error { return nil }
func TestSelectBestDriverConstraintsResultIsSetProperly(t *testing.T) {
filterFn := driver.FilterVideoRecorder()
drivers := driver.GetManager().Query(filterFn)

View File

@@ -7,80 +7,80 @@ import (
// MediaStream is an interface that represents a collection of existing tracks.
type MediaStream interface {
// GetAudioTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getaudiotracks
GetAudioTracks() []Tracker
GetAudioTracks() []Track
// GetVideoTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getvideotracks
GetVideoTracks() []Tracker
GetVideoTracks() []Track
// GetTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-gettracks
GetTracks() []Tracker
GetTracks() []Track
// AddTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-addtrack
AddTrack(t Tracker)
AddTrack(t Track)
// RemoveTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-removetrack
RemoveTrack(t Tracker)
RemoveTrack(t Track)
}
type mediaStream struct {
trackers map[Tracker]struct{}
l sync.RWMutex
tracks map[Track]struct{}
l sync.RWMutex
}
const trackTypeDefault MediaDeviceType = 0
// NewMediaStream creates a MediaStream interface that's defined in
// https://w3c.github.io/mediacapture-main/#dom-mediastream
func NewMediaStream(trackers ...Tracker) (MediaStream, error) {
m := mediaStream{trackers: make(map[Tracker]struct{})}
func NewMediaStream(tracks ...Track) (MediaStream, error) {
m := mediaStream{tracks: make(map[Track]struct{})}
for _, tracker := range trackers {
if _, ok := m.trackers[tracker]; !ok {
m.trackers[tracker] = struct{}{}
for _, track := range tracks {
if _, ok := m.tracks[track]; !ok {
m.tracks[track] = struct{}{}
}
}
return &m, nil
}
func (m *mediaStream) GetAudioTracks() []Tracker {
func (m *mediaStream) GetAudioTracks() []Track {
return m.queryTracks(AudioInput)
}
func (m *mediaStream) GetVideoTracks() []Tracker {
func (m *mediaStream) GetVideoTracks() []Track {
return m.queryTracks(VideoInput)
}
func (m *mediaStream) GetTracks() []Tracker {
func (m *mediaStream) GetTracks() []Track {
return m.queryTracks(trackTypeDefault)
}
// queryTracks returns all tracks that are the same kind as t.
// If t is 0, which is the default, queryTracks will return all the tracks.
func (m *mediaStream) queryTracks(t MediaDeviceType) []Tracker {
func (m *mediaStream) queryTracks(t MediaDeviceType) []Track {
m.l.RLock()
defer m.l.RUnlock()
result := make([]Tracker, 0)
for tracker := range m.trackers {
if tracker.Kind() == t || t == trackTypeDefault {
result = append(result, tracker)
result := make([]Track, 0)
for track := range m.tracks {
if track.Kind() == t || t == trackTypeDefault {
result = append(result, track)
}
}
return result
}
func (m *mediaStream) AddTrack(t Tracker) {
func (m *mediaStream) AddTrack(t Track) {
m.l.Lock()
defer m.l.Unlock()
if _, ok := m.trackers[t]; ok {
if _, ok := m.tracks[t]; ok {
return
}
m.trackers[t] = struct{}{}
m.tracks[t] = struct{}{}
}
func (m *mediaStream) RemoveTrack(t Tracker) {
func (m *mediaStream) RemoveTrack(t Track) {
m.l.Lock()
defer m.l.Unlock()
delete(m.trackers, t)
delete(m.tracks, t)
}

View File

@@ -10,17 +10,14 @@ type mockMediaStreamTrack struct {
kind MediaDeviceType
}
func (track *mockMediaStreamTrack) Track() *webrtc.Track {
return nil
func (track *mockMediaStreamTrack) ID() string {
return ""
}
func (track *mockMediaStreamTrack) LocalTrack() LocalTrack {
func (track *mockMediaStreamTrack) Close() error {
return nil
}
func (track *mockMediaStreamTrack) Stop() {
}
func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
return track.kind
}
@@ -28,8 +25,16 @@ func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
func (track *mockMediaStreamTrack) OnEnded(handler func(error)) {
}
func (track *mockMediaStreamTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
return nil, nil
}
func (track *mockMediaStreamTrack) Unbind(pc *webrtc.PeerConnection) error {
return nil
}
func TestMediaStreamFilters(t *testing.T) {
audioTracks := []Tracker{
audioTracks := []Track{
&mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput},
@@ -37,7 +42,7 @@ func TestMediaStreamFilters(t *testing.T) {
&mockMediaStreamTrack{AudioInput},
}
videoTracks := []Tracker{
videoTracks := []Track{
&mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput},
@@ -49,7 +54,7 @@ func TestMediaStreamFilters(t *testing.T) {
t.Fatal(err)
}
expect := func(t *testing.T, actual, expected []Tracker) {
expect := func(t *testing.T, actual, expected []Track) {
if len(actual) != len(expected) {
t.Fatalf("%s: Expected to get %d trackers, but got %d trackers", t.Name(), len(expected), len(actual))
}

View File

@@ -1,40 +1,18 @@
package mediadevices
import (
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
type MediaStreamConstraints struct {
Audio MediaOption
Video MediaOption
Codec *CodecSelector
}
// MediaTrackConstraints represents https://w3c.github.io/mediacapture-main/#dom-mediatrackconstraints
type MediaTrackConstraints struct {
prop.MediaConstraints
Enabled bool
// VideoEncoderBuilders are codec builders that are used for encoding the video
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
VideoEncoderBuilders []codec.VideoEncoderBuilder
// AudioEncoderBuilders are codec builders that are used for encoding the audio
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
AudioEncoderBuilders []codec.AudioEncoderBuilder
// VideoTransform will be used to transform the video that's coming from the driver.
// So, basically it'll look like following: driver -> VideoTransform -> codec
VideoTransform video.TransformFunc
// AudioTransform will be used to transform the audio that's coming from the driver.
// So, basically it'll look like following: driver -> AudioTransform -> code
AudioTransform audio.TransformFunc
selectedMedia prop.Media
}

View File

@@ -15,7 +15,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()
return currentProp, err
}
@@ -29,7 +29,7 @@ func detectCurrentAudioProp(broadcaster *audio.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()
return currentProp, err
}

View File

@@ -17,12 +17,12 @@ func TestDetectCurrentVideoProp(t *testing.T) {
second.Pix[0] = 2
isFirst := true
source := video.ReaderFunc(func() (image.Image, error) {
source := video.ReaderFunc(func() (image.Image, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})
@@ -42,7 +42,7 @@ func TestDetectCurrentVideoProp(t *testing.T) {
}
reader := broadcaster.NewReader(false)
img, err := reader.Read()
img, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}
@@ -65,12 +65,12 @@ func TestDetectCurrentAudioProp(t *testing.T) {
second.Data[0] = 2
isFirst := true
source := audio.ReaderFunc(func() (wave.Audio, error) {
source := audio.ReaderFunc(func() (wave.Audio, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})
@@ -86,7 +86,7 @@ func TestDetectCurrentAudioProp(t *testing.T) {
}
reader := broadcaster.NewReader(false)
chunk, err := reader.Read()
chunk, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -110,12 +110,12 @@ func (rc *ReadCloser) dataCb(data []byte) {
// Read reads raw data, the format is determined by the media type and property:
// - For video, each call will return a frame.
// - For audio, each call will return a chunk which its size configured by Latency
func (rc *ReadCloser) Read() ([]byte, error) {
func (rc *ReadCloser) Read() ([]byte, func(), error) {
data, ok := <-rc.dataChan
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
return data, nil
return data, func() {}, nil
}
// Close closes the capturing session, and no data will flow anymore

View File

@@ -58,7 +58,7 @@ type VideoEncoderBuilder interface {
// ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame
type ReadCloser interface {
Read() ([]byte, error)
Read() (b []byte, release func(), err error)
Close() error
// SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted
// but this also means that the quality will also be lower.

View File

@@ -55,17 +55,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imgReal := img.(*image.YCbCr)
var y, cb, cr C.Slice
@@ -79,7 +79,7 @@ func (e *encoder) Read() ([]byte, error) {
var encodedBuffer *C.MMAL_BUFFER_HEADER_T
status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer)
if status.code != 0 {
return nil, statusToErr(&status)
return nil, func() {}, statusToErr(&status)
}
// GoBytes copies the C array to Go slice. After this, it's safe to release the C array
@@ -87,7 +87,7 @@ func (e *encoder) Read() ([]byte, error) {
// Release the buffer so that mmal can reuse this memory
C.mmal_buffer_header_release(encodedBuffer)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -50,17 +50,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
}, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -74,11 +74,11 @@ func (e *encoder) Read() ([]byte, error) {
width: C.int(bounds.Max.X - bounds.Min.X),
}, &rv)
if err := errResult(rv); err != nil {
return nil, fmt.Errorf("failed in encoding: %v", err)
return nil, func() {}, fmt.Errorf("failed in encoding: %v", err)
}
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, nil
return encoded, func() {}, nil
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -72,22 +72,22 @@ func newEncoder(r audio.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
buff, err := e.reader.Read()
func (e *encoder) Read() ([]byte, func(), error) {
buff, _, err := e.reader.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
encoded := make([]byte, 1024)
switch b := buff.(type) {
case *wave.Int16Interleaved:
n, err := e.engine.Encode(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
case *wave.Float32Interleaved:
n, err := e.engine.EncodeFloat32(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
default:
return nil, errors.New("unknown type of audio buffer")
return nil, func() {}, errors.New("unknown type of audio buffer")
}
}

View File

@@ -64,7 +64,6 @@ import (
"unsafe"
"github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
@@ -296,17 +295,17 @@ func newVP8Encoder(r video.Reader, p prop.Media, params ParamsVP8) (codec.ReadCl
return e, nil
}
func (e *encoderVP8) Read() ([]byte, error) {
func (e *encoderVP8) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -348,7 +347,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
}
}
if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE {
return nil, errors.New("no available surface")
return nil, func() {}, errors.New("no available surface")
}
C.setForceKFFlagVP8(&e.picParam, 0)
@@ -416,7 +415,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
C.size_t(uintptr(p.src)),
&id,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
}
buffs = append(buffs, id)
}
@@ -426,17 +425,17 @@ func (e *encoderVP8) Read() ([]byte, error) {
e.display, e.ctxID,
e.surfs[surfaceVP8Input],
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Upload image
var vaImg C.VAImage
var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// TODO: use vaImg.pitches to support padding
C.memcpy(
@@ -452,10 +451,10 @@ func (e *encoderVP8) Read() ([]byte, error) {
unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)),
)
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaRenderPicture(
@@ -463,38 +462,38 @@ func (e *encoderVP8) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput
C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaEndPicture(
e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Load encoded data
for retry := 3; retry >= 0; retry-- {
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
}
var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
}
if surfStat == C.VASurfaceReady {
break
}
if retry == 0 {
return nil, fmt.Errorf("failed to sync surface: %d", surfStat)
return nil, func() {}, fmt.Errorf("failed to sync surface: %d", surfStat)
}
}
var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 {
return nil, errors.New("buffer size too small")
return nil, func() {}, errors.New("buffer size too small")
}
if cap(e.frame) < int(seg.size) {
@@ -507,13 +506,13 @@ func (e *encoderVP8) Read() ([]byte, error) {
)
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// Destroy buffers
for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
}
}
@@ -538,7 +537,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoderVP8) SetBitRate(b int) error {

View File

@@ -47,7 +47,6 @@ import (
"unsafe"
"github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
@@ -285,17 +284,17 @@ func newVP9Encoder(r video.Reader, p prop.Media, params ParamsVP9) (codec.ReadCl
return e, nil
}
func (e *encoderVP9) Read() ([]byte, error) {
func (e *encoderVP9) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -379,7 +378,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.size_t(uintptr(p.src)),
&id,
); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
}
buffs = append(buffs, id)
}
@@ -389,17 +388,17 @@ func (e *encoderVP9) Read() ([]byte, error) {
e.display, e.ctxID,
e.surfs[surfaceVP9Input],
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Upload image
var vaImg C.VAImage
var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP9Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// TODO: use vaImg.pitches to support padding
C.copyI420toNV12(
@@ -410,10 +409,10 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.uint(len(yuvImg.Y)),
)
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaRenderPicture(
@@ -421,27 +420,27 @@ func (e *encoderVP9) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput
C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaEndPicture(
e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Load encoded data
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
}
var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
}
var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if cap(e.frame) < int(seg.size) {
e.frame = make([]byte, int(seg.size))
@@ -453,13 +452,13 @@ func (e *encoderVP9) Read() ([]byte, error) {
)
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// Destroy buffers
for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
}
}
@@ -473,7 +472,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoderVP9) SetBitRate(b int) error {

View File

@@ -204,17 +204,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params, codecIface *C.vpx_c
}, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
bounds := yuvImg.Bounds()
@@ -230,7 +230,7 @@ func (e *encoder) Read() ([]byte, error) {
if e.cfg.g_w != C.uint(width) || e.cfg.g_h != C.uint(height) {
e.cfg.g_w, e.cfg.g_h = C.uint(width), C.uint(height)
if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
return nil, func() {}, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
}
e.raw.w, e.raw.h = C.uint(width), C.uint(height)
e.raw.r_w, e.raw.r_h = C.uint(width), C.uint(height)
@@ -243,7 +243,7 @@ func (e *encoder) Read() ([]byte, error) {
C.long(t-e.tStart), C.ulong(t-e.tLastFrame), C.long(flags), C.ulong(e.deadline),
(*C.uchar)(&yuvImg.Y[0]), (*C.uchar)(&yuvImg.Cb[0]), (*C.uchar)(&yuvImg.Cr[0]),
); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_encode failed (%d)", ec)
return nil, func() {}, fmt.Errorf("vpx_codec_encode failed (%d)", ec)
}
e.frameIndex++
@@ -264,7 +264,7 @@ func (e *encoder) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -94,17 +94,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -117,11 +117,11 @@ func (e *encoder) Read() ([]byte, error) {
&rc,
)
if err := errFromC(rc); err != nil {
return nil, err
return nil, func() {}, err
}
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -52,10 +52,10 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
closed := d.closed
reader := audio.ReaderFunc(func() (wave.Audio, error) {
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
select {
case <-closed:
return nil, io.EOF
return nil, func() {}, io.EOF
default:
}
@@ -78,7 +78,7 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
a.SetFloat32(i, ch, wave.Float32Sample(sin[phase]))
}
}
return a, nil
return a, func() {}, nil
})
return reader, nil
}

View File

@@ -56,10 +56,10 @@ func (cam *camera) VideoRecord(property prop.Media) (video.Reader, error) {
if err != nil {
return nil, err
}
r := video.ReaderFunc(func() (image.Image, error) {
frame, err := rc.Read()
r := video.ReaderFunc(func() (image.Image, func(), error) {
frame, _, err := rc.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
return decoder.Decode(frame, property.Width, property.Height)
})

View File

@@ -182,7 +182,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
var buf []byte
r := video.ReaderFunc(func() (img image.Image, err error) {
r := video.ReaderFunc(func() (img image.Image, release func(), err error) {
// Lock to avoid accessing the buffer after StopStreaming()
c.mutex.Lock()
defer c.mutex.Unlock()
@@ -191,23 +191,23 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
for i := 0; i < maxEmptyFrameCount; i++ {
if ctx.Err() != nil {
// Return EOF if the camera is already closed.
return nil, io.EOF
return nil, func() {}, io.EOF
}
err := cam.WaitForFrame(5) // 5 seconds
switch err.(type) {
case nil:
case *webcam.Timeout:
return nil, errReadTimeout
return nil, func() {}, errReadTimeout
default:
// Camera has been stopped.
return nil, err
return nil, func() {}, err
}
b, err := cam.ReadFrame()
if err != nil {
// Camera has been stopped.
return nil, err
return nil, func() {}, err
}
// Frame is empty.
@@ -227,7 +227,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
n := copy(buf, b)
return decoder.Decode(buf[:n], p.Width, p.Height)
}
return nil, errEmptyFrame
return nil, func() {}, errEmptyFrame
})
return r, nil

View File

@@ -116,10 +116,10 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img := &image.YCbCr{}
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
b, ok := <-c.ch
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img.Y = b[:nPix]
img.Cb = b[nPix : nPix+nPix/2]
@@ -128,7 +128,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img.CStride = p.Width / 2
img.SubsampleRatio = image.YCbCrSubsampleRatio422
img.Rect = image.Rect(0, 0, p.Width, p.Height)
return img, nil
return img, func() {}, nil
})
return r, nil
}

View File

@@ -97,22 +97,23 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
return nil, err
}
reader := audio.ReaderFunc(func() (wave.Audio, error) {
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
buff, ok := <-samplesChan
if !ok {
stream.Close()
return nil, io.EOF
return nil, func() {}, io.EOF
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: len(buff) / p.ChannelCount,
Channels: p.ChannelCount,
Len: len(buff) / p.ChannelCount,
SamplingRate: p.SampleRate,
},
)
copy(a.Data, buff)
return a, nil
return a, func() {}, nil
})
stream.Start()

View File

@@ -194,10 +194,10 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
// TODO: detect microphone device disconnection and return EOF
reader := audio.ReaderFunc(func() (wave.Audio, error) {
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
b, ok := <-m.chBuf
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
select {
@@ -210,14 +210,15 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
uintptr(unsafe.Sizeof(b.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
return nil, func() {}, err
}
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount,
Channels: p.ChannelCount,
Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount,
SamplingRate: p.SampleRate,
},
)
@@ -229,7 +230,7 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
}
}
return a, nil
return a, func() {}, nil
})
return reader, nil
}

View File

@@ -68,9 +68,9 @@ func (s *screen) VideoRecord(p prop.Media) (video.Reader, error) {
var dst image.RGBA
reader := s.reader
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
<-s.tick.C
return reader.Read().ToRGBA(&dst), nil
return reader.Read().ToRGBA(&dst), func() {}, nil
})
return r, nil
}

View File

@@ -103,10 +103,10 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
d.tick = tick
closed := d.closed
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
select {
case <-closed:
return nil, io.EOF
return nil, func() {}, io.EOF
default:
}
@@ -130,7 +130,7 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
CStride: p.Width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, p.Width, p.Height),
}, nil
}, func() {}, nil
})
return r, nil

View File

@@ -6,6 +6,7 @@ import (
"image/jpeg"
)
func decodeMJPEG(frame []byte, width, height int) (image.Image, error) {
return jpeg.Decode(bytes.NewReader(frame))
func decodeMJPEG(frame []byte, width, height int) (image.Image, func(), error) {
img, err := jpeg.Decode(bytes.NewReader(frame))
return img, func() {}, err
}

View File

@@ -3,12 +3,12 @@ package frame
import "image"
type Decoder interface {
Decode(frame []byte, width, height int) (image.Image, error)
Decode(frame []byte, width, height int) (image.Image, func(), error)
}
// DecoderFunc is a proxy type for Decoder
type decoderFunc func(frame []byte, width, height int) (image.Image, error)
type decoderFunc func(frame []byte, width, height int) (image.Image, func(), error)
func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, error) {
func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, func(), error) {
return f(frame, width, height)
}

View File

@@ -5,13 +5,13 @@ import (
"image"
)
func decodeI420(frame []byte, width, height int) (image.Image, error) {
func decodeI420(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
cbi := yi + width*height/4
cri := cbi + width*height/4
if cri > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
}
return &image.YCbCr{
@@ -22,15 +22,15 @@ func decodeI420(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeNV21(frame []byte, width, height int) (image.Image, error) {
func decodeNV21(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi + width*height/2
if ci > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
}
var cb, cr []byte
@@ -47,5 +47,5 @@ func decodeNV21(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -12,13 +12,13 @@ import (
// void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height);
import "C"
func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -41,16 +41,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -73,5 +73,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -7,13 +7,13 @@ import (
"image"
)
func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -39,16 +39,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -74,5 +74,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -27,7 +27,7 @@ func TestDecodeYUY2(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, err := decodeYUY2(input, width, height)
img, _, err := decodeYUY2(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -56,7 +56,7 @@ func TestDecodeUYVY(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, err := decodeUYVY(input, width, height)
img, _, err := decodeUYVY(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -77,7 +77,7 @@ func BenchmarkDecodeYUY2(b *testing.B) {
b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) {
input := make([]byte, sz.width*sz.height*2)
for i := 0; i < b.N; i++ {
_, err := decodeYUY2(input, sz.width, sz.height)
_, _, err := decodeYUY2(input, sz.width, sz.height)
if err != nil {
b.Fatal(err)
}

View File

@@ -5,13 +5,14 @@ import (
)
type Reader interface {
Read() (wave.Audio, error)
Read() (chunk wave.Audio, release func(), err error)
}
type ReaderFunc func() (wave.Audio, error)
type ReaderFunc func() (chunk wave.Audio, release func(), err error)
func (rf ReaderFunc) Read() (wave.Audio, error) {
return rf()
func (rf ReaderFunc) Read() (chunk wave.Audio, release func(), err error) {
chunk, release, err = rf()
return
}
// TransformFunc produces a new Reader that will produces a transformed audio

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core
}
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader {
}
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (wave.Audio, error) {
data, err := reader.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
data, _, err := reader.Read()
chunk, _ := data.(wave.Audio)
return chunk, err
return chunk, func() {}, err
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}))
}
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (wave.Audio, error) {
data, err := source.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
data, _, err := source.Read()
img, _ := data.(wave.Audio)
return img, err
return img, func() {}, err
})
}

View File

@@ -14,18 +14,18 @@ func TestBroadcast(t *testing.T) {
SamplingRate: 48000,
})
source := ReaderFunc(func() (wave.Audio, error) {
return chunk, nil
source := ReaderFunc(func() (wave.Audio, func(), error) {
return chunk, func() {}, nil
})
broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read()
actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil {
t.Fatal(err)
}
actualWithoutCopy2, err := readerWithoutCopy2.Read()
actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil {
t.Fatal(err)
}
@@ -39,7 +39,7 @@ func TestBroadcast(t *testing.T) {
}
readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read()
actualWithCopy, _, err := readerWithCopy.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -13,15 +13,15 @@ func NewBuffer(nSamples int) TransformFunc {
var inBuff wave.Audio
return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
for {
if inBuff != nil && inBuff.ChunkInfo().Len >= nSamples {
break
}
buff, err := r.Read()
buff, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
switch b := buff.(type) {
case *wave.Float32Interleaved:
@@ -59,7 +59,7 @@ func NewBuffer(nSamples int) TransformFunc {
ib.Size.Len += b.Size.Len
default:
return nil, errUnsupported
return nil, func() {}, errUnsupported
}
}
switch ib := inBuff.(type) {
@@ -71,7 +71,7 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples
return &ibCopy, nil
return &ibCopy, func() {}, nil
case *wave.Float32Interleaved:
ibCopy := *ib
@@ -81,9 +81,9 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples
return &ibCopy, nil
return &ibCopy, func() {}, nil
}
return nil, errUnsupported
return nil, func() {}, errUnsupported
})
}
}

View File

@@ -49,16 +49,16 @@ func TestBuffer(t *testing.T) {
trans := NewBuffer(3)
var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) {
r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) {
iSent++
return input[iSent-1], nil
return input[iSent-1], func() {}, nil
}
return nil, io.EOF
return nil, func() {}, io.EOF
}))
for i := 0; ; i++ {
a, err := r.Read()
a, _, err := r.Read()
if err != nil {
if err == io.EOF && i >= len(expected) {
break

View File

@@ -13,12 +13,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
return func(r Reader) Reader {
var currentProp prop.Media
var chunkCount uint
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
var dirty bool
chunk, err := r.Read()
chunk, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
info := chunk.ChunkInfo()
@@ -32,6 +32,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
dirty = true
}
latency := time.Duration(chunk.ChunkInfo().Len) * time.Second / time.Nanosecond / time.Duration(currentProp.SampleRate)
if currentProp.Latency != latency {
currentProp.Latency = latency
dirty = true
}
// TODO: Also detect sample format changes?
// TODO: Add audio detect changes. As of now, there's no useful property to track.
@@ -40,7 +46,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
}
chunkCount++
return chunk, nil
return chunk, func() {}, nil
})
}
}

View File

@@ -11,12 +11,12 @@ import (
func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
return wave.NewFloat32Interleaved(wave.ChunkInfo{
Len: 0,
Len: 960,
Channels: p.ChannelCount,
SamplingRate: p.SampleRate,
}), nil
}), func() {}, nil
}), func(newProp prop.Media) {
p = newProp
}
@@ -28,13 +28,14 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media
expected.ChannelCount = 2
expected.SampleRate = 48000
expected.Latency = time.Millisecond * 20
src, _ := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
actual = p
detectBeforeFirstChunk = true
})(src)
_, err := src.Read()
_, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -53,24 +54,22 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media
expected.ChannelCount = 2
expected.SampleRate = 48000
expected.Latency = 20 * time.Millisecond
src, update := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
actual = p
})(src)
for channelCount := 1; channelCount < 8; channelCount++ {
for sampleRate := 12000; sampleRate <= 48000; sampleRate += 4000 {
expected.ChannelCount = channelCount
expected.SampleRate = sampleRate
update(expected)
_, err := src.Read()
if err != nil {
t.Fatal(err)
}
expected.ChannelCount = channelCount
update(expected)
_, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual)
}
}
})

View File

@@ -8,14 +8,14 @@ import (
// NewChannelMixer creates audio transform to mix audio channels.
func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) {
buff, err := r.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
buff, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
ci := buff.ChunkInfo()
if ci.Channels == channels {
return buff, nil
return buff, func() {}, nil
}
ci.Channels = channels
@@ -32,9 +32,9 @@ func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
mixed = wave.NewFloat32NonInterleaved(ci)
}
if err := mixer.Mix(mixed, buff); err != nil {
return nil, err
return nil, func() {}, err
}
return mixed, nil
return mixed, func() {}, nil
})
}
}

View File

@@ -34,16 +34,16 @@ func TestMixer(t *testing.T) {
trans := NewChannelMixer(1, &mixer.MonoMixer{})
var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) {
r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) {
iSent++
return input[iSent-1], nil
return input[iSent-1], func() {}, nil
}
return nil, io.EOF
return nil, func() {}, io.EOF
}))
for i := 0; ; i++ {
a, err := r.Read()
a, _, err := r.Read()
if err != nil {
if err == io.EOF && i >= len(expected) {
break

View File

@@ -127,10 +127,10 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader {
currentCount := broadcaster.buffer.lastCount()
return ReaderFunc(func() (data interface{}, err error) {
return ReaderFunc(func() (data interface{}, release func(), err error) {
currentCount++
if push := broadcaster.buffer.acquire(currentCount); push != nil {
data, err = broadcaster.source.Load().(Reader).Read()
data, _, err = broadcaster.source.Load().(Reader).Read()
push(&broadcasterData{
data: data,
err: err,

View File

@@ -57,7 +57,7 @@ func TestBroadcast(t *testing.T) {
frameCount := 0
frameSent := 0
lastSend := time.Now()
src = ReaderFunc(func() (interface{}, error) {
src = ReaderFunc(func() (interface{}, func(), error) {
if pauseCond.src && frameSent == 30 {
time.Sleep(time.Second)
}
@@ -74,7 +74,7 @@ func TestBroadcast(t *testing.T) {
frame := frames[frameCount]
frameCount++
frameSent++
return frame, nil
return frame, func() {}, nil
})
broadcaster := NewBroadcaster(src, nil)
var done uint32
@@ -95,7 +95,7 @@ func TestBroadcast(t *testing.T) {
if pauseCond.dst && count == 30 {
time.Sleep(time.Second)
}
frame, err := reader.Read()
frame, _, err := reader.Read()
if err != nil {
t.Error(err)
}

View File

@@ -3,12 +3,13 @@ package io
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
// to provide strong type.
type Reader interface {
Read() (interface{}, error)
Read() (data interface{}, release func(), err error)
}
// ReaderFunc is a proxy type for Reader
type ReaderFunc func() (interface{}, error)
type ReaderFunc func() (data interface{}, release func(), err error)
func (f ReaderFunc) Read() (interface{}, error) {
return f()
func (f ReaderFunc) Read() (data interface{}, release func(), err error) {
data, release, err = f()
return
}

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core
}
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
}
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (image.Image, error) {
data, err := reader.Read()
return ReaderFunc(func() (image.Image, func(), error) {
data, _, err := reader.Read()
img, _ := data.(image.Image)
return img, err
return img, func() {}, err
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}))
}
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (image.Image, error) {
data, err := source.Read()
return ReaderFunc(func() (image.Image, func(), error) {
data, _, err := source.Read()
img, _ := data.(image.Image)
return img, err
return img, func() {}, err
})
}

View File

@@ -9,18 +9,18 @@ import (
func TestBroadcast(t *testing.T) {
resolution := image.Rect(0, 0, 1920, 1080)
img := image.NewGray(resolution)
source := ReaderFunc(func() (image.Image, error) {
return img, nil
source := ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
})
broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read()
actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil {
t.Fatal(err)
}
actualWithoutCopy2, err := readerWithoutCopy2.Read()
actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil {
t.Fatal(err)
}
@@ -34,7 +34,7 @@ func TestBroadcast(t *testing.T) {
}
readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read()
actualWithCopy, _, err := readerWithCopy.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -63,10 +63,10 @@ func imageToYCbCr(dst *image.YCbCr, src image.Image) {
// ToI420 converts r to a new reader that will output images in I420 format
func ToI420(r Reader) Reader {
var yuvImg image.YCbCr
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imageToYCbCr(&yuvImg, img)
@@ -79,11 +79,11 @@ func ToI420(r Reader) Reader {
i422ToI420(&yuvImg)
case image.YCbCrSubsampleRatio420:
default:
return nil, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio)
return nil, func() {}, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio)
}
yuvImg.SubsampleRatio = image.YCbCrSubsampleRatio420
return &yuvImg, nil
return &yuvImg, func() {}, nil
})
}
@@ -130,13 +130,13 @@ func imageToRGBA(dst *image.RGBA, src image.Image) {
// ToRGBA converts r to a new reader that will output images in RGBA format
func ToRGBA(r Reader) Reader {
var dst image.RGBA
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imageToRGBA(&dst, img)
return &dst, nil
return &dst, func() {}, nil
})
}

View File

@@ -144,10 +144,10 @@ func TestToI420(t *testing.T) {
for name, c := range cases {
c := c
t.Run(name, func(t *testing.T) {
r := ToI420(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -199,10 +199,10 @@ func TestToRGBA(t *testing.T) {
for name, c := range cases {
c := c
t.Run(name, func(t *testing.T) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -225,12 +225,12 @@ func BenchmarkToI420(b *testing.B) {
for name, img := range cases {
img := img
b.Run(name, func(b *testing.B) {
r := ToI420(ReaderFunc(func() (image.Image, error) {
return img, nil
r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}
@@ -253,12 +253,12 @@ func BenchmarkToRGBA(b *testing.B) {
for name, img := range cases {
img := img
b.Run(name, func(b *testing.B) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) {
return img, nil
r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

View File

@@ -14,12 +14,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
var currentProp prop.Media
var lastTaken time.Time
var frames uint
return ReaderFunc(func() (image.Image, error) {
return ReaderFunc(func() (image.Image, func(), error) {
var dirty bool
img, err := r.Read()
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
bounds := img.Bounds()
@@ -52,7 +52,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
}
frames++
return img, nil
return img, func() {}, nil
})
}
}

View File

@@ -12,8 +12,8 @@ import (
func BenchmarkDetectChanges(b *testing.B) {
var src Reader
src = ReaderFunc(func() (image.Image, error) {
return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), nil
src = ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), func() {}, nil
})
b.Run("WithoutDetectChanges", func(b *testing.B) {
@@ -40,8 +40,8 @@ func BenchmarkDetectChanges(b *testing.B) {
func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (image.Image, error) {
return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), nil
return ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), func() {}, nil
}), func(newProp prop.Media) {
p = newProp
}
@@ -86,7 +86,7 @@ func TestDetectChanges(t *testing.T) {
detectBeforeFirstFrame = true
})(src)
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -113,7 +113,7 @@ func TestDetectChanges(t *testing.T) {
expected.Width = width
expected.Height = height
update(expected)
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -143,7 +143,7 @@ func TestDetectChanges(t *testing.T) {
})(src)
for count < 3 {
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -156,10 +156,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
}
}
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
switch v := img.(type) {
@@ -169,7 +169,7 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, rect, v, v.Rect, draw.Src, nil)
cloned := *dst // clone metadata
return &cloned, nil
return &cloned, func() {}, nil
case *image.YCbCr:
ycbcrRealloc(v)
@@ -184,10 +184,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, dst.Bounds(), src, src.Bounds(), draw.Src, nil)
cloned := *(imgScaled.(*image.YCbCr)) // clone metadata
return &cloned, nil
return &cloned, func() {}, nil
default:
return nil, errUnsupportedImageType
return nil, func() {}, errUnsupportedImageType
}
})
}

View File

@@ -215,11 +215,11 @@ func TestScale(t *testing.T) {
c := c
t.Run(name, func(t *testing.T) {
trans := Scale(c.width, c.height, algo)
r := trans(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := trans(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
for i := 0; i < 4; i++ {
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -261,12 +261,12 @@ func BenchmarkScale(b *testing.B) {
img := img
b.Run(name, func(b *testing.B) {
trans := Scale(640, 360, algo)
r := trans(ReaderFunc(func() (image.Image, error) {
return img, nil
r := trans(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

View File

@@ -10,16 +10,16 @@ import (
func Throttle(rate float32) TransformFunc {
return func(r Reader) Reader {
ticker := time.NewTicker(time.Duration(int64(float64(time.Second) / float64(rate))))
return ReaderFunc(func() (image.Image, error) {
return ReaderFunc(func() (image.Image, func(), error) {
for {
img, err := r.Read()
img, _, err := r.Read()
if err != nil {
ticker.Stop()
return nil, err
return nil, func() {}, err
}
select {
case <-ticker.C:
return img, nil
return img, func() {}, nil
default:
}
}

View File

@@ -19,14 +19,14 @@ func TestThrottle(t *testing.T) {
var cntPush int
trans := Throttle(50)
r := trans(ReaderFunc(func() (image.Image, error) {
r := trans(ReaderFunc(func() (image.Image, func(), error) {
<-ticker.C
cntPush++
return img, nil
return img, func() {}, nil
}))
for i := 0; i < 20; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@@ -5,13 +5,14 @@ import (
)
type Reader interface {
Read() (img image.Image, err error)
Read() (img image.Image, release func(), err error)
}
type ReaderFunc func() (img image.Image, err error)
type ReaderFunc func() (img image.Image, release func(), err error)
func (rf ReaderFunc) Read() (img image.Image, err error) {
return rf()
func (rf ReaderFunc) Read() (img image.Image, release func(), err error) {
img, release, err = rf()
return
}
// TransformFunc produces a new Reader that will produces a transformed video

View File

@@ -4,6 +4,7 @@ import (
"math"
"time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
@@ -11,7 +12,7 @@ type samplerFunc func(b []byte) error
// newVideoSampler creates a video sampler that uses the actual video frame rate and
// the codec's clock rate to come up with a duration for each sample.
func newVideoSampler(t LocalTrack) samplerFunc {
func newVideoSampler(t *webrtc.Track) samplerFunc {
clockRate := float64(t.Codec().ClockRate)
lastTimestamp := time.Now()
@@ -27,7 +28,7 @@ func newVideoSampler(t LocalTrack) samplerFunc {
// newAudioSampler creates a audio sampler that uses a fixed latency and
// the codec's clock rate to come up with a duration for each sample.
func newAudioSampler(t LocalTrack, latency time.Duration) samplerFunc {
func newAudioSampler(t *webrtc.Track, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds()))
return samplerFunc(func(b []byte) error {
return t.WriteSample(media.Sample{Data: b, Samples: samples})

1
source.go Normal file
View File

@@ -0,0 +1 @@
package mediadevices

439
track.go
View File

@@ -2,239 +2,326 @@ package mediadevices
import (
"errors"
"fmt"
"image"
"math/rand"
"sync"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/wave"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
// Tracker is an interface that represent MediaStreamTrack
var (
errInvalidDriverType = errors.New("invalid driver type")
errNotFoundPeerConnection = errors.New("failed to find given peer connection")
)
// Source is a generic representation of a media source
type Source interface {
ID() string
Close() error
}
// VideoSource is a specific type of media source that emits a series of video frames
type VideoSource interface {
video.Reader
Source
}
// AudioSource is a specific type of media source that emits a series of audio chunks
type AudioSource interface {
audio.Reader
Source
}
// Track is an interface that represent MediaStreamTrack
// Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack
type Tracker interface {
Track() *webrtc.Track
LocalTrack() LocalTrack
Stop()
Kind() MediaDeviceType
type Track interface {
Source
// OnEnded registers a handler to receive an error from the media stream track.
// If the error is already occured before registering, the handler will be
// immediately called.
OnEnded(func(error))
Kind() MediaDeviceType
// Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind
// call will happen automatically after the SDP negotiation. Users won't need to call this manually.
Bind(*webrtc.PeerConnection) (*webrtc.Track, error)
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
// be called automatically in the future.
Unbind(*webrtc.PeerConnection) error
}
type LocalTrack interface {
WriteSample(s media.Sample) error
Codec() *webrtc.RTPCodec
ID() string
Kind() webrtc.RTPCodecType
type baseTrack struct {
Source
err error
onErrorHandler func(error)
mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
selector *CodecSelector
activePeerConnections map[*webrtc.PeerConnection]chan<- chan<- struct{}
}
type track struct {
localTrack LocalTrack
d driver.Driver
sample samplerFunc
encoder codec.ReadCloser
onErrorHandler func(error)
err error
mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
}
func newTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*track, error) {
var encoderBuilders []encoderBuilder
var rtpCodecs []*webrtc.RTPCodec
var buildSampler func(t LocalTrack) samplerFunc
var kind MediaDeviceType
var err error
err = d.Open()
if err != nil {
return nil, err
func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack {
return &baseTrack{
Source: source,
kind: kind,
selector: selector,
activePeerConnections: make(map[*webrtc.PeerConnection]chan<- chan<- struct{}),
}
switch r := d.(type) {
case driver.VideoRecorder:
kind = VideoInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeVideo]
buildSampler = newVideoSampler
encoderBuilders, err = newVideoEncoderBuilders(r, constraints)
case driver.AudioRecorder:
kind = AudioInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeAudio]
buildSampler = func(t LocalTrack) samplerFunc {
return newAudioSampler(t, constraints.selectedMedia.Latency)
}
encoderBuilders, err = newAudioEncoderBuilders(r, constraints)
default:
err = errors.New("newTrack: invalid driver type")
}
if err != nil {
d.Close()
return nil, err
}
for _, builder := range encoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == builder.name {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
localTrack, err := opts.trackGenerator(
matchedRTPCodec.PayloadType,
rand.Uint32(),
d.ID(),
matchedRTPCodec.Type.String(),
matchedRTPCodec,
)
if err != nil {
continue
}
encoder, err := builder.build()
if err != nil {
continue
}
t := track{
localTrack: localTrack,
sample: buildSampler(localTrack),
d: d,
encoder: encoder,
kind: kind,
}
go t.start()
return &t, nil
}
d.Close()
return nil, errors.New("newTrack: failed to find a matching codec")
}
// Kind returns track's kind
func (t *track) Kind() MediaDeviceType {
return t.kind
func (track *baseTrack) Kind() MediaDeviceType {
return track.kind
}
// OnEnded sets an error handler. When a track has been created and started, if an
// error occurs, handler will get called with the error given to the parameter.
func (t *track) OnEnded(handler func(error)) {
t.mu.Lock()
t.onErrorHandler = handler
err := t.err
t.mu.Unlock()
func (track *baseTrack) OnEnded(handler func(error)) {
track.mu.Lock()
track.onErrorHandler = handler
err := track.err
track.mu.Unlock()
if err != nil && handler != nil {
// Already errored.
t.endOnce.Do(func() {
track.endOnce.Do(func() {
handler(err)
})
}
}
// onError is a callback when an error occurs
func (t *track) onError(err error) {
t.mu.Lock()
t.err = err
handler := t.onErrorHandler
t.mu.Unlock()
func (track *baseTrack) onError(err error) {
track.mu.Lock()
track.err = err
handler := track.onErrorHandler
track.mu.Unlock()
if handler != nil {
t.endOnce.Do(func() {
track.endOnce.Do(func() {
handler(err)
})
}
}
// start starts the data flow from the driver all the way to the localTrack
func (t *track) start() {
for {
buff, err := t.encoder.Read()
func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sampler func(*webrtc.Track) samplerFunc) (*webrtc.Track, error) {
track.mu.Lock()
defer track.mu.Unlock()
webrtcTrack, err := pc.NewTrack(selectedCodec.PayloadType, rand.Uint32(), track.ID(), selectedCodec.MimeType)
if err != nil {
return nil, err
}
sample := sampler(webrtcTrack)
signalCh := make(chan chan<- struct{})
track.activePeerConnections[pc] = signalCh
fmt.Println("Binding")
go func() {
var doneCh chan<- struct{}
defer func() {
encodedReader.Close()
// When there's another call to unbind, it won't block since we mark the signalCh to be closed
close(signalCh)
if doneCh != nil {
close(doneCh)
}
}()
for {
select {
case doneCh = <-signalCh:
return
default:
}
buff, _, err := encodedReader.Read()
if err != nil {
track.onError(err)
return
}
if err := sample(buff); err != nil {
track.onError(err)
return
}
}
}()
return webrtcTrack, nil
}
func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error {
track.mu.Lock()
defer track.mu.Unlock()
ch, ok := track.activePeerConnections[pc]
if !ok {
return errNotFoundPeerConnection
}
doneCh := make(chan struct{})
ch <- doneCh
<-doneCh
delete(track.activePeerConnections, pc)
return nil
}
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
if err := d.Open(); err != nil {
return nil, err
}
switch recorder := d.(type) {
case driver.VideoRecorder:
return newVideoTrackFromDriver(d, recorder, constraints, selector)
case driver.AudioRecorder:
return newAudioTrackFromDriver(d, recorder, constraints, selector)
default:
panic(errInvalidDriverType)
}
}
// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate.
type VideoTrack struct {
*baseTrack
*video.Broadcaster
}
// NewVideoTrack constructs a new VideoTrack
func NewVideoTrack(source VideoSource, selector *CodecSelector) Track {
return newVideoTrackFromReader(source, source, selector)
}
func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, VideoInput, selector)
wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) {
img, _, err = reader.Read()
if err != nil {
t.onError(err)
return
base.onError(err)
}
return img, func() {}, err
})
if err := t.sample(buff); err != nil {
t.onError(err)
return
}
// TODO: Allow users to configure broadcaster
broadcaster := video.NewBroadcaster(wrappedReader, nil)
return &VideoTrack{
baseTrack: base,
Broadcaster: broadcaster,
}
}
// Stop stops the underlying driver and encoder
func (t *track) Stop() {
t.d.Close()
t.encoder.Close()
}
func (t *track) Track() *webrtc.Track {
return t.localTrack.(*webrtc.Track)
}
func (t *track) LocalTrack() LocalTrack {
return t.localTrack
}
// encoderBuilder is a generic encoder builder that acts as a delegator for codec.VideoEncoderBuilder and
// codec.AudioEncoderBuilder. The idea of having a delegator is to reduce redundant codes that are being
// duplicated for managing video and audio.
type encoderBuilder struct {
name string
build func() (codec.ReadCloser, error)
}
// newVideoEncoderBuilders transforms video given by VideoRecorder with the video transformer that is passed through
// constraints and create a list of generic encoder builders
func newVideoEncoderBuilders(vr driver.VideoRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := vr.VideoRecord(constraints.selectedMedia)
// newVideoTrackFromDriver is an internal video track creation from driver
func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
reader, err := recorder.VideoRecord(constraints.selectedMedia)
if err != nil {
return nil, err
}
if constraints.VideoTransform != nil {
r = constraints.VideoTransform(r)
}
encoderBuilders := make([]encoderBuilder, len(constraints.VideoEncoderBuilders))
for i, b := range constraints.VideoEncoderBuilders {
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildVideoEncoder(r, constraints.selectedMedia)
}
}
return encoderBuilders, nil
return newVideoTrackFromReader(d, reader, selector), nil
}
// newAudioEncoderBuilders transforms audio given by AudioRecorder with the audio transformer that is passed through
// constraints and create a list of generic encoder builders
func newAudioEncoderBuilders(ar driver.AudioRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := ar.AudioRecord(constraints.selectedMedia)
// Transform transforms the underlying source by applying the given fns in serial order
func (track *VideoTrack) Transform(fns ...video.TransformFunc) {
src := track.Broadcaster.Source()
track.Broadcaster.ReplaceSource(video.Merge(fns...)(src))
}
func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
}
if constraints.AudioTransform != nil {
r = constraints.AudioTransform(r)
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo)
fmt.Println(wantCodecs)
fmt.Println(&inputProp)
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
}
encoderBuilders := make([]encoderBuilder, len(constraints.AudioEncoderBuilders))
for i, b := range constraints.AudioEncoderBuilders {
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildAudioEncoder(r, constraints.selectedMedia)
}
}
return encoderBuilders, nil
return track.bind(pc, encodedReader, selectedCodec, newVideoSampler)
}
func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
// manipulate.
type AudioTrack struct {
*baseTrack
*audio.Broadcaster
}
// NewAudioTrack constructs a new VideoTrack
func NewAudioTrack(source AudioSource, selector *CodecSelector) Track {
return newAudioTrackFromReader(source, source, selector)
}
func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, AudioInput, selector)
wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) {
chunk, _, err = reader.Read()
if err != nil {
base.onError(err)
}
return chunk, func() {}, err
})
// TODO: Allow users to configure broadcaster
broadcaster := audio.NewBroadcaster(wrappedReader, nil)
return &AudioTrack{
baseTrack: base,
Broadcaster: broadcaster,
}
}
// newAudioTrackFromDriver is an internal audio track creation from driver
func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
reader, err := recorder.AudioRecord(constraints.selectedMedia)
if err != nil {
return nil, err
}
return newAudioTrackFromReader(d, reader, selector), nil
}
// Transform transforms the underlying source by applying the given fns in serial order
func (track *AudioTrack) Transform(fns ...audio.TransformFunc) {
src := track.Broadcaster.Source()
track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src))
}
func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio)
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
}
return track.bind(pc, encodedReader, selectedCodec, func(t *webrtc.Track) samplerFunc { return newAudioSampler(t, inputProp.Latency) })
}
func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}

View File

@@ -10,7 +10,7 @@ func TestOnEnded(t *testing.T) {
errExpected := errors.New("an error")
t.Run("ErrorAfterRegister", func(t *testing.T) {
tr := &track{}
tr := &baseTrack{}
called := make(chan error, 1)
tr.OnEnded(func(error) {
@@ -35,7 +35,7 @@ func TestOnEnded(t *testing.T) {
})
t.Run("ErrorBeforeRegister", func(t *testing.T) {
tr := &track{}
tr := &baseTrack{}
tr.onError(errExpected)