Compare commits

...

5 Commits

Author SHA1 Message Date
Lukas Herman
af6d31fde5 Revert go.mod 2020-10-30 00:37:38 -07:00
Lukas Herman
2f5e4ee914 New mediadevices design
Changelog:
  * Better support for non-webrtc use cases
  * Enable multiple readers
  * Enhance codec selectors
  * Update APIs to reflect on the new v3 webrtc design
  * Cleaner APIs
2020-10-30 00:33:55 -07:00
Lukas Herman
1720eee38c Fix unpropagated audio sampling rate from microphones 2020-10-29 22:41:10 -07:00
Lukas Herman
00877c74a0 Add audio latency detection 2020-10-29 22:37:29 -07:00
Lukas Herman
559c6a13a1 Update readers to be memory pool friendly 2020-10-29 00:04:12 -07:00
60 changed files with 840 additions and 784 deletions

117
codec.go Normal file
View File

@@ -0,0 +1,117 @@
package mediadevices
import (
"errors"
"fmt"
"strings"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
)
// CodecSelector is a container of video and audio encoder builders, which later will be used
// for codec matching.
type CodecSelector struct {
videoEncoders []codec.VideoEncoderBuilder
audioEncoders []codec.AudioEncoderBuilder
}
// CodecSelectorOption is a type for specifying CodecSelector options
type CodecSelectorOption func(*CodecSelector)
// WithVideoEncoders replace current video codecs with listed encoders
func WithVideoEncoders(encoders ...codec.VideoEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.videoEncoders = encoders
}
}
// WithVideoEncoders replace current audio codecs with listed encoders
func WithAudioEncoders(encoders ...codec.AudioEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.audioEncoders = encoders
}
}
// NewCodecSelector constructs CodecSelector with given variadic options
func NewCodecSelector(opts ...CodecSelectorOption) *CodecSelector {
var track CodecSelector
for _, opt := range opts {
opt(&track)
}
return &track
}
// Populate lets the webrtc engine be aware of supported codecs that are contained in CodecSelector
func (selector *CodecSelector) Populate(setting *webrtc.MediaEngine) {
for _, encoder := range selector.videoEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
for _, encoder := range selector.audioEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
}
func (selector *CodecSelector) selectVideoCodec(wantCodecs []*webrtc.RTPCodec, reader video.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.VideoEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.videoEncoders {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildVideoEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectAudioCodec(wantCodecs []*webrtc.RTPCodec, reader audio.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.AudioEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.audioEncoders {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildAudioEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}

77
examples/http/main.go Normal file
View File

@@ -0,0 +1,77 @@
// This is an example of using mediadevices to broadcast your camera through http.
// The example doesn't aim to be performant, but rather it strives to be simple.
package main
import (
"bytes"
"fmt"
"image/jpeg"
"io"
"log"
"mime/multipart"
"net/http"
"net/textproto"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/prop"
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
// _ "github.com/pion/mediadevices/pkg/driver/videotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
)
func must(err error) {
if err != nil {
panic(err)
}
}
func main() {
s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(constraint *mediadevices.MediaTrackConstraints) {
constraint.Width = prop.Int(600)
constraint.Height = prop.Int(400)
},
})
must(err)
t := s.GetVideoTracks()[0]
videoTrack := t.(*mediadevices.VideoTrack)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf bytes.Buffer
videoReader := videoTrack.NewReader(false)
mimeWriter := multipart.NewWriter(w)
contentType := fmt.Sprintf("multipart/x-mixed-replace;boundary=%s", mimeWriter.Boundary())
w.Header().Add("Content-Type", contentType)
partHeader := make(textproto.MIMEHeader)
partHeader.Add("Content-Type", "image/jpeg")
for {
frame, release, err := videoReader.Read()
if err == io.EOF {
return
}
must(err)
err = jpeg.Encode(&buf, frame, nil)
// Since we're done with img, we need to release img so that that the original owner can reuse
// this memory.
release()
must(err)
partWriter, err := mimeWriter.CreatePart(partHeader)
must(err)
_, err = partWriter.Write(buf.Bytes())
buf.Reset()
must(err)
}
})
fmt.Println("listening on http://localhost:1313")
log.Println(http.ListenAndServe("localhost:1313", nil))
}

View File

@@ -5,26 +5,23 @@ import (
"github.com/pion/mediadevices" "github.com/pion/mediadevices"
"github.com/pion/mediadevices/examples/internal/signal" "github.com/pion/mediadevices/examples/internal/signal"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
// This is required to use opus audio encoder
"github.com/pion/mediadevices/pkg/codec/opus"
// If you don't like vpx, you can also use x264 by importing as below // If you don't like vpx, you can also use x264 by importing as below
// "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder // "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
// or you can also use openh264 for alternative h264 implementation // or you can also use openh264 for alternative h264 implementation
// "github.com/pion/mediadevices/pkg/codec/openh264" // "github.com/pion/mediadevices/pkg/codec/openh264"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder "github.com/pion/mediadevices/pkg/codec/openh264" // This is required to use VP8/VP9 video encoder
"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use VP8/VP9 video encoder
// Note: If you don't have a camera or microphone or your adapters are not supported, // Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below. // you can always swap your adapters with our dummy adapters below.
// _ "github.com/pion/mediadevices/pkg/driver/videotest" // _ "github.com/pion/mediadevices/pkg/driver/videotest"
// _ "github.com/pion/mediadevices/pkg/driver/audiotest" // _ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter _ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/microphone" // This is required to register microphone adapter _ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
) )
const ( const (
@@ -61,44 +58,48 @@ func main() {
fmt.Printf("Connection State has changed %s \n", connectionState.String()) fmt.Printf("Connection State has changed %s \n", connectionState.String())
}) })
md := mediadevices.NewMediaDevices(peerConnection) vp8Params, err := openh264.NewParams()
if err != nil {
panic(err)
}
vp8Params.BitRate = 300_000 // 300kbps
opusParams, err := opus.NewParams() opusParams, err := opus.NewParams()
if err != nil { if err != nil {
panic(err) panic(err)
} }
opusParams.BitRate = 32000 // 32kbps codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
mediadevices.WithAudioEncoders(&opusParams),
)
vp8Params, err := vpx.NewVP8Params() s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
if err != nil {
panic(err)
}
vp8Params.BitRate = 100000 // 100kbps
s, err := md.GetUserMedia(mediadevices.MediaStreamConstraints{
Audio: func(c *mediadevices.MediaTrackConstraints) {
c.Enabled = true
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&opusParams}
},
Video: func(c *mediadevices.MediaTrackConstraints) { Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormat(frame.FormatYUY2) c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
c.Enabled = true
c.Width = prop.Int(640) c.Width = prop.Int(640)
c.Height = prop.Int(480) c.Height = prop.Int(480)
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&vp8Params}
}, },
Audio: func(c *mediadevices.MediaTrackConstraints) {
},
Codec: codecSelector,
}) })
if err != nil { if err != nil {
panic(err) panic(err)
} }
for _, tracker := range s.GetTracks() { for _, tracker := range s.GetTracks() {
t := tracker.Track()
tracker.OnEnded(func(err error) { tracker.OnEnded(func(err error) {
fmt.Printf("Track (ID: %s, Label: %s) ended with error: %v\n", fmt.Printf("Track (ID: %s) ended with error: %v\n",
t.ID(), t.Label(), err) tracker.ID(), err)
}) })
_, err = peerConnection.AddTransceiverFromTrack(t,
// In Pion/webrtc v3, bind will be called automatically after SDP negotiation
webrtcTrack, err := tracker.Bind(peerConnection)
if err != nil {
panic(err)
}
_, err = peerConnection.AddTransceiverFromTrack(webrtcTrack,
webrtc.RtpTransceiverInit{ webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly, Direction: webrtc.RTPTransceiverDirectionSendonly,
}, },

View File

@@ -7,95 +7,26 @@ import (
"github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
) )
var errNotFound = fmt.Errorf("failed to find the best driver that fits the constraints") var errNotFound = fmt.Errorf("failed to find the best driver that fits the constraints")
// MediaDevices is an interface that's defined on https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices
type MediaDevices interface {
GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error)
GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error)
EnumerateDevices() []MediaDeviceInfo
}
// NewMediaDevices creates MediaDevices interface that provides access to connected media input devices
// like cameras and microphones, as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevices(pc *webrtc.PeerConnection, opts ...MediaDevicesOption) MediaDevices {
codecs := make(map[webrtc.RTPCodecType][]*webrtc.RTPCodec)
for _, kind := range []webrtc.RTPCodecType{
webrtc.RTPCodecTypeAudio,
webrtc.RTPCodecTypeVideo,
} {
codecs[kind] = pc.GetRegisteredRTPCodecs(kind)
}
return NewMediaDevicesFromCodecs(codecs, opts...)
}
// NewMediaDevicesFromCodecs creates MediaDevices interface from lists of the available codecs
// that provides access to connected media input devices like cameras and microphones,
// as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevicesFromCodecs(codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec, opts ...MediaDevicesOption) MediaDevices {
mdo := MediaDevicesOptions{
codecs: codecs,
trackGenerator: defaultTrackGenerator,
}
for _, o := range opts {
o(&mdo)
}
return &mediaDevices{
MediaDevicesOptions: mdo,
}
}
// TrackGenerator is a function to create new track.
type TrackGenerator func(payloadType uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error)
var defaultTrackGenerator = TrackGenerator(func(pt uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error) {
return webrtc.NewTrack(pt, ssrc, id, label, codec)
})
type mediaDevices struct {
MediaDevicesOptions
}
// MediaDevicesOptions stores parameters used by MediaDevices.
type MediaDevicesOptions struct {
codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec
trackGenerator TrackGenerator
}
// MediaDevicesOption is a type of MediaDevices functional option.
type MediaDevicesOption func(*MediaDevicesOptions)
// WithTrackGenerator specifies a TrackGenerator to use customized track.
func WithTrackGenerator(gen TrackGenerator) MediaDevicesOption {
return func(o *MediaDevicesOptions) {
o.trackGenerator = gen
}
}
// GetDisplayMedia prompts the user to select and grant permission to capture the contents // GetDisplayMedia prompts the user to select and grant permission to capture the contents
// of a display or portion thereof (such as a window) as a MediaStream. // of a display or portion thereof (such as a window) as a MediaStream.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia // Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia
func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) { func GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) {
trackers := make([]Tracker, 0) trackers := make([]Track, 0)
cleanTrackers := func() { cleanTrackers := func() {
for _, t := range trackers { for _, t := range trackers {
t.Stop() t.Close()
} }
} }
var videoConstraints MediaTrackConstraints var videoConstraints MediaTrackConstraints
if constraints.Video != nil { if constraints.Video != nil {
constraints.Video(&videoConstraints) constraints.Video(&videoConstraints)
} tracker, err := selectScreen(videoConstraints, constraints.Codec)
if videoConstraints.Enabled {
tracker, err := m.selectScreen(videoConstraints)
if err != nil { if err != nil {
cleanTrackers() cleanTrackers()
return nil, err return nil, err
@@ -116,27 +47,20 @@ func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (Medi
// GetUserMedia prompts the user for permission to use a media input which produces a MediaStream // GetUserMedia prompts the user for permission to use a media input which produces a MediaStream
// with tracks containing the requested types of media. // with tracks containing the requested types of media.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia // Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) { func GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) {
// TODO: It should return media stream based on constraints // TODO: It should return media stream based on constraints
trackers := make([]Tracker, 0) trackers := make([]Track, 0)
cleanTrackers := func() { cleanTrackers := func() {
for _, t := range trackers { for _, t := range trackers {
t.Stop() t.Close()
} }
} }
var videoConstraints, audioConstraints MediaTrackConstraints var videoConstraints, audioConstraints MediaTrackConstraints
if constraints.Video != nil { if constraints.Video != nil {
constraints.Video(&videoConstraints) constraints.Video(&videoConstraints)
} tracker, err := selectVideo(videoConstraints, constraints.Codec)
if constraints.Audio != nil {
constraints.Audio(&audioConstraints)
}
if videoConstraints.Enabled {
tracker, err := m.selectVideo(videoConstraints)
if err != nil { if err != nil {
cleanTrackers() cleanTrackers()
return nil, err return nil, err
@@ -145,8 +69,9 @@ func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaSt
trackers = append(trackers, tracker) trackers = append(trackers, tracker)
} }
if audioConstraints.Enabled { if constraints.Audio != nil {
tracker, err := m.selectAudio(audioConstraints) constraints.Audio(&audioConstraints)
tracker, err := selectAudio(audioConstraints, constraints.Codec)
if err != nil { if err != nil {
cleanTrackers() cleanTrackers()
return nil, err return nil, err
@@ -240,7 +165,7 @@ func selectBestDriver(filter driver.FilterFn, constraints MediaTrackConstraints)
return bestDriver, constraints, nil return bestDriver, constraints, nil
} }
func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker, error) { func selectAudio(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterAudioRecorder() typeFilter := driver.FilterAudioRecorder()
d, c, err := selectBestDriver(typeFilter, constraints) d, c, err := selectBestDriver(typeFilter, constraints)
@@ -248,9 +173,9 @@ func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker,
return nil, err return nil, err
} }
return newTrack(&m.MediaDevicesOptions, d, c) return newTrackFromDriver(d, c, selector)
} }
func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, error) { func selectVideo(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder() typeFilter := driver.FilterVideoRecorder()
notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen)) notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen))
filter := driver.FilterAnd(typeFilter, notScreenFilter) filter := driver.FilterAnd(typeFilter, notScreenFilter)
@@ -260,10 +185,10 @@ func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker,
return nil, err return nil, err
} }
return newTrack(&m.MediaDevicesOptions, d, c) return newTrackFromDriver(d, c, selector)
} }
func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, error) { func selectScreen(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder() typeFilter := driver.FilterVideoRecorder()
screenFilter := driver.FilterDeviceType(driver.Screen) screenFilter := driver.FilterDeviceType(driver.Screen)
filter := driver.FilterAnd(typeFilter, screenFilter) filter := driver.FilterAnd(typeFilter, screenFilter)
@@ -273,10 +198,10 @@ func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker,
return nil, err return nil, err
} }
return newTrack(&m.MediaDevicesOptions, d, c) return newTrackFromDriver(d, c, selector)
} }
func (m *mediaDevices) EnumerateDevices() []MediaDeviceInfo { func EnumerateDevices() []MediaDeviceInfo {
drivers := driver.GetManager().Query( drivers := driver.GetManager().Query(
driver.FilterFn(func(driver.Driver) bool { return true })) driver.FilterFn(func(driver.Driver) bool { return true }))
info := make([]MediaDeviceInfo, 0, len(drivers)) info := make([]MediaDeviceInfo, 0, len(drivers))

View File

@@ -1,91 +1,42 @@
package mediadevices package mediadevices
import ( import (
"errors"
"io" "io"
"testing" "testing"
"time" "time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver"
_ "github.com/pion/mediadevices/pkg/driver/audiotest" _ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/videotest" _ "github.com/pion/mediadevices/pkg/driver/videotest"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
) )
func TestGetUserMedia(t *testing.T) { func TestGetUserMedia(t *testing.T) {
videoParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 100000,
},
name: "MockVideo",
}
audioParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 32000,
},
name: "MockAudio",
}
md := NewMediaDevicesFromCodecs(
map[webrtc.RTPCodecType][]*webrtc.RTPCodec{
webrtc.RTPCodecTypeVideo: {
{Type: webrtc.RTPCodecTypeVideo, Name: "MockVideo", PayloadType: 1},
},
webrtc.RTPCodecTypeAudio: {
{Type: webrtc.RTPCodecTypeAudio, Name: "MockAudio", PayloadType: 2},
},
},
WithTrackGenerator(
func(_ uint8, _ uint32, id, _ string, codec *webrtc.RTPCodec) (
LocalTrack, error,
) {
return newMockTrack(codec, id), nil
},
),
)
constraints := MediaStreamConstraints{ constraints := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) { Video: func(c *MediaTrackConstraints) {
c.Enabled = true
c.Width = prop.Int(640) c.Width = prop.Int(640)
c.Height = prop.Int(480) c.Height = prop.Int(480)
params := videoParams
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
}, },
Audio: func(c *MediaTrackConstraints) { Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
}, },
} }
constraintsWrong := MediaStreamConstraints{ constraintsWrong := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) { Video: func(c *MediaTrackConstraints) {
c.Enabled = true c.Width = prop.IntExact(10000)
c.Width = prop.Int(640)
c.Height = prop.Int(480) c.Height = prop.Int(480)
params := videoParams
params.BitRate = 0
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
}, },
Audio: func(c *MediaTrackConstraints) { Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
}, },
} }
// GetUserMedia with broken parameters // GetUserMedia with broken parameters
ms, err := md.GetUserMedia(constraintsWrong) ms, err := GetUserMedia(constraintsWrong)
if err == nil { if err == nil {
t.Fatal("Expected error, but got nil") t.Fatal("Expected error, but got nil")
} }
// GetUserMedia with correct parameters // GetUserMedia with correct parameters
ms, err = md.GetUserMedia(constraints) ms, err = GetUserMedia(constraints)
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@@ -103,11 +54,11 @@ func TestGetUserMedia(t *testing.T) {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
for _, track := range tracks { for _, track := range tracks {
track.Stop() track.Close()
} }
// Stop and retry GetUserMedia // Stop and retry GetUserMedia
ms, err = md.GetUserMedia(constraints) ms, err = GetUserMedia(constraints)
if err != nil { if err != nil {
t.Fatalf("Failed to GetUserMedia after the previsous tracks stopped: %v", err) t.Fatalf("Failed to GetUserMedia after the previsous tracks stopped: %v", err)
} }
@@ -124,106 +75,10 @@ func TestGetUserMedia(t *testing.T) {
} }
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
for _, track := range tracks { for _, track := range tracks {
track.Stop() track.Close()
} }
} }
type mockTrack struct {
codec *webrtc.RTPCodec
id string
}
func newMockTrack(codec *webrtc.RTPCodec, id string) *mockTrack {
return &mockTrack{
codec: codec,
id: id,
}
}
func (t *mockTrack) WriteSample(s media.Sample) error {
return nil
}
func (t *mockTrack) Codec() *webrtc.RTPCodec {
return t.codec
}
func (t *mockTrack) ID() string {
return t.id
}
func (t *mockTrack) Kind() webrtc.RTPCodecType {
return t.codec.Type
}
type mockParams struct {
codec.BaseParams
name string
}
func (params *mockParams) RTPCodec() *codec.RTPCodec {
rtpCodec := codec.NewRTPH264Codec(90000)
rtpCodec.Name = params.name
return rtpCodec
}
func (params *mockParams) BuildVideoEncoder(r video.Reader, p prop.Media) (codec.ReadCloser, error) {
if params.BitRate == 0 {
// This is a dummy error to test the failure condition.
return nil, errors.New("wrong codec parameter")
}
return &mockVideoCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
func (params *mockParams) BuildAudioEncoder(r audio.Reader, p prop.Media) (codec.ReadCloser, error) {
return &mockAudioCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
type mockCodec struct{}
func (e *mockCodec) SetBitRate(b int) error {
return nil
}
func (e *mockCodec) ForceKeyFrame() error {
return nil
}
type mockVideoCodec struct {
mockCodec
r video.Reader
closed chan struct{}
}
func (m *mockVideoCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockVideoCodec) Close() error { return nil }
type mockAudioCodec struct {
mockCodec
r audio.Reader
closed chan struct{}
}
func (m *mockAudioCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockAudioCodec) Close() error { return nil }
func TestSelectBestDriverConstraintsResultIsSetProperly(t *testing.T) { func TestSelectBestDriverConstraintsResultIsSetProperly(t *testing.T) {
filterFn := driver.FilterVideoRecorder() filterFn := driver.FilterVideoRecorder()
drivers := driver.GetManager().Query(filterFn) drivers := driver.GetManager().Query(filterFn)

View File

@@ -7,80 +7,80 @@ import (
// MediaStream is an interface that represents a collection of existing tracks. // MediaStream is an interface that represents a collection of existing tracks.
type MediaStream interface { type MediaStream interface {
// GetAudioTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getaudiotracks // GetAudioTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getaudiotracks
GetAudioTracks() []Tracker GetAudioTracks() []Track
// GetVideoTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getvideotracks // GetVideoTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getvideotracks
GetVideoTracks() []Tracker GetVideoTracks() []Track
// GetTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-gettracks // GetTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-gettracks
GetTracks() []Tracker GetTracks() []Track
// AddTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-addtrack // AddTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-addtrack
AddTrack(t Tracker) AddTrack(t Track)
// RemoveTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-removetrack // RemoveTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-removetrack
RemoveTrack(t Tracker) RemoveTrack(t Track)
} }
type mediaStream struct { type mediaStream struct {
trackers map[Tracker]struct{} tracks map[Track]struct{}
l sync.RWMutex l sync.RWMutex
} }
const trackTypeDefault MediaDeviceType = 0 const trackTypeDefault MediaDeviceType = 0
// NewMediaStream creates a MediaStream interface that's defined in // NewMediaStream creates a MediaStream interface that's defined in
// https://w3c.github.io/mediacapture-main/#dom-mediastream // https://w3c.github.io/mediacapture-main/#dom-mediastream
func NewMediaStream(trackers ...Tracker) (MediaStream, error) { func NewMediaStream(tracks ...Track) (MediaStream, error) {
m := mediaStream{trackers: make(map[Tracker]struct{})} m := mediaStream{tracks: make(map[Track]struct{})}
for _, tracker := range trackers { for _, track := range tracks {
if _, ok := m.trackers[tracker]; !ok { if _, ok := m.tracks[track]; !ok {
m.trackers[tracker] = struct{}{} m.tracks[track] = struct{}{}
} }
} }
return &m, nil return &m, nil
} }
func (m *mediaStream) GetAudioTracks() []Tracker { func (m *mediaStream) GetAudioTracks() []Track {
return m.queryTracks(AudioInput) return m.queryTracks(AudioInput)
} }
func (m *mediaStream) GetVideoTracks() []Tracker { func (m *mediaStream) GetVideoTracks() []Track {
return m.queryTracks(VideoInput) return m.queryTracks(VideoInput)
} }
func (m *mediaStream) GetTracks() []Tracker { func (m *mediaStream) GetTracks() []Track {
return m.queryTracks(trackTypeDefault) return m.queryTracks(trackTypeDefault)
} }
// queryTracks returns all tracks that are the same kind as t. // queryTracks returns all tracks that are the same kind as t.
// If t is 0, which is the default, queryTracks will return all the tracks. // If t is 0, which is the default, queryTracks will return all the tracks.
func (m *mediaStream) queryTracks(t MediaDeviceType) []Tracker { func (m *mediaStream) queryTracks(t MediaDeviceType) []Track {
m.l.RLock() m.l.RLock()
defer m.l.RUnlock() defer m.l.RUnlock()
result := make([]Tracker, 0) result := make([]Track, 0)
for tracker := range m.trackers { for track := range m.tracks {
if tracker.Kind() == t || t == trackTypeDefault { if track.Kind() == t || t == trackTypeDefault {
result = append(result, tracker) result = append(result, track)
} }
} }
return result return result
} }
func (m *mediaStream) AddTrack(t Tracker) { func (m *mediaStream) AddTrack(t Track) {
m.l.Lock() m.l.Lock()
defer m.l.Unlock() defer m.l.Unlock()
if _, ok := m.trackers[t]; ok { if _, ok := m.tracks[t]; ok {
return return
} }
m.trackers[t] = struct{}{} m.tracks[t] = struct{}{}
} }
func (m *mediaStream) RemoveTrack(t Tracker) { func (m *mediaStream) RemoveTrack(t Track) {
m.l.Lock() m.l.Lock()
defer m.l.Unlock() defer m.l.Unlock()
delete(m.trackers, t) delete(m.tracks, t)
} }

View File

@@ -10,17 +10,14 @@ type mockMediaStreamTrack struct {
kind MediaDeviceType kind MediaDeviceType
} }
func (track *mockMediaStreamTrack) Track() *webrtc.Track { func (track *mockMediaStreamTrack) ID() string {
return nil return ""
} }
func (track *mockMediaStreamTrack) LocalTrack() LocalTrack { func (track *mockMediaStreamTrack) Close() error {
return nil return nil
} }
func (track *mockMediaStreamTrack) Stop() {
}
func (track *mockMediaStreamTrack) Kind() MediaDeviceType { func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
return track.kind return track.kind
} }
@@ -28,8 +25,16 @@ func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
func (track *mockMediaStreamTrack) OnEnded(handler func(error)) { func (track *mockMediaStreamTrack) OnEnded(handler func(error)) {
} }
func (track *mockMediaStreamTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
return nil, nil
}
func (track *mockMediaStreamTrack) Unbind(pc *webrtc.PeerConnection) error {
return nil
}
func TestMediaStreamFilters(t *testing.T) { func TestMediaStreamFilters(t *testing.T) {
audioTracks := []Tracker{ audioTracks := []Track{
&mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput},
@@ -37,7 +42,7 @@ func TestMediaStreamFilters(t *testing.T) {
&mockMediaStreamTrack{AudioInput}, &mockMediaStreamTrack{AudioInput},
} }
videoTracks := []Tracker{ videoTracks := []Track{
&mockMediaStreamTrack{VideoInput}, &mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput}, &mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput}, &mockMediaStreamTrack{VideoInput},
@@ -49,7 +54,7 @@ func TestMediaStreamFilters(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
expect := func(t *testing.T, actual, expected []Tracker) { expect := func(t *testing.T, actual, expected []Track) {
if len(actual) != len(expected) { if len(actual) != len(expected) {
t.Fatalf("%s: Expected to get %d trackers, but got %d trackers", t.Name(), len(expected), len(actual)) t.Fatalf("%s: Expected to get %d trackers, but got %d trackers", t.Name(), len(expected), len(actual))
} }

View File

@@ -1,40 +1,18 @@
package mediadevices package mediadevices
import ( import (
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
) )
type MediaStreamConstraints struct { type MediaStreamConstraints struct {
Audio MediaOption Audio MediaOption
Video MediaOption Video MediaOption
Codec *CodecSelector
} }
// MediaTrackConstraints represents https://w3c.github.io/mediacapture-main/#dom-mediatrackconstraints // MediaTrackConstraints represents https://w3c.github.io/mediacapture-main/#dom-mediatrackconstraints
type MediaTrackConstraints struct { type MediaTrackConstraints struct {
prop.MediaConstraints prop.MediaConstraints
Enabled bool
// VideoEncoderBuilders are codec builders that are used for encoding the video
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
VideoEncoderBuilders []codec.VideoEncoderBuilder
// AudioEncoderBuilders are codec builders that are used for encoding the audio
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
AudioEncoderBuilders []codec.AudioEncoderBuilder
// VideoTransform will be used to transform the video that's coming from the driver.
// So, basically it'll look like following: driver -> VideoTransform -> codec
VideoTransform video.TransformFunc
// AudioTransform will be used to transform the audio that's coming from the driver.
// So, basically it'll look like following: driver -> AudioTransform -> code
AudioTransform audio.TransformFunc
selectedMedia prop.Media selectedMedia prop.Media
} }

View File

@@ -15,7 +15,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error)
// in any case. // in any case.
metaReader := broadcaster.NewReader(false) metaReader := broadcaster.NewReader(false)
metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader) metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read() _, _, err := metaReader.Read()
return currentProp, err return currentProp, err
} }
@@ -29,7 +29,7 @@ func detectCurrentAudioProp(broadcaster *audio.Broadcaster) (prop.Media, error)
// in any case. // in any case.
metaReader := broadcaster.NewReader(false) metaReader := broadcaster.NewReader(false)
metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader) metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read() _, _, err := metaReader.Read()
return currentProp, err return currentProp, err
} }

View File

@@ -17,12 +17,12 @@ func TestDetectCurrentVideoProp(t *testing.T) {
second.Pix[0] = 2 second.Pix[0] = 2
isFirst := true isFirst := true
source := video.ReaderFunc(func() (image.Image, error) { source := video.ReaderFunc(func() (image.Image, func(), error) {
if isFirst { if isFirst {
isFirst = true isFirst = true
return first, nil return first, func() {}, nil
} else { } else {
return second, nil return second, func() {}, nil
} }
}) })
@@ -42,7 +42,7 @@ func TestDetectCurrentVideoProp(t *testing.T) {
} }
reader := broadcaster.NewReader(false) reader := broadcaster.NewReader(false)
img, err := reader.Read() img, _, err := reader.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -65,12 +65,12 @@ func TestDetectCurrentAudioProp(t *testing.T) {
second.Data[0] = 2 second.Data[0] = 2
isFirst := true isFirst := true
source := audio.ReaderFunc(func() (wave.Audio, error) { source := audio.ReaderFunc(func() (wave.Audio, func(), error) {
if isFirst { if isFirst {
isFirst = true isFirst = true
return first, nil return first, func() {}, nil
} else { } else {
return second, nil return second, func() {}, nil
} }
}) })
@@ -86,7 +86,7 @@ func TestDetectCurrentAudioProp(t *testing.T) {
} }
reader := broadcaster.NewReader(false) reader := broadcaster.NewReader(false)
chunk, err := reader.Read() chunk, _, err := reader.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -110,12 +110,12 @@ func (rc *ReadCloser) dataCb(data []byte) {
// Read reads raw data, the format is determined by the media type and property: // Read reads raw data, the format is determined by the media type and property:
// - For video, each call will return a frame. // - For video, each call will return a frame.
// - For audio, each call will return a chunk which its size configured by Latency // - For audio, each call will return a chunk which its size configured by Latency
func (rc *ReadCloser) Read() ([]byte, error) { func (rc *ReadCloser) Read() ([]byte, func(), error) {
data, ok := <-rc.dataChan data, ok := <-rc.dataChan
if !ok { if !ok {
return nil, io.EOF return nil, func() {}, io.EOF
} }
return data, nil return data, func() {}, nil
} }
// Close closes the capturing session, and no data will flow anymore // Close closes the capturing session, and no data will flow anymore

View File

@@ -58,7 +58,7 @@ type VideoEncoderBuilder interface {
// ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame // ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame
type ReadCloser interface { type ReadCloser interface {
Read() ([]byte, error) Read() (b []byte, release func(), err error)
Close() error Close() error
// SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted // SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted
// but this also means that the quality will also be lower. // but this also means that the quality will also be lower.

View File

@@ -55,17 +55,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil return &e, nil
} }
func (e *encoder) Read() ([]byte, error) { func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
imgReal := img.(*image.YCbCr) imgReal := img.(*image.YCbCr)
var y, cb, cr C.Slice var y, cb, cr C.Slice
@@ -79,7 +79,7 @@ func (e *encoder) Read() ([]byte, error) {
var encodedBuffer *C.MMAL_BUFFER_HEADER_T var encodedBuffer *C.MMAL_BUFFER_HEADER_T
status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer) status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer)
if status.code != 0 { if status.code != 0 {
return nil, statusToErr(&status) return nil, func() {}, statusToErr(&status)
} }
// GoBytes copies the C array to Go slice. After this, it's safe to release the C array // GoBytes copies the C array to Go slice. After this, it's safe to release the C array
@@ -87,7 +87,7 @@ func (e *encoder) Read() ([]byte, error) {
// Release the buffer so that mmal can reuse this memory // Release the buffer so that mmal can reuse this memory
C.mmal_buffer_header_release(encodedBuffer) C.mmal_buffer_header_release(encodedBuffer)
return encoded, err return encoded, func() {}, err
} }
func (e *encoder) SetBitRate(b int) error { func (e *encoder) SetBitRate(b int) error {

View File

@@ -50,17 +50,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
}, nil }, nil
} }
func (e *encoder) Read() ([]byte, error) { func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
@@ -74,11 +74,11 @@ func (e *encoder) Read() ([]byte, error) {
width: C.int(bounds.Max.X - bounds.Min.X), width: C.int(bounds.Max.X - bounds.Min.X),
}, &rv) }, &rv)
if err := errResult(rv); err != nil { if err := errResult(rv); err != nil {
return nil, fmt.Errorf("failed in encoding: %v", err) return nil, func() {}, fmt.Errorf("failed in encoding: %v", err)
} }
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len) encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, nil return encoded, func() {}, nil
} }
func (e *encoder) SetBitRate(b int) error { func (e *encoder) SetBitRate(b int) error {

View File

@@ -72,22 +72,22 @@ func newEncoder(r audio.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil return &e, nil
} }
func (e *encoder) Read() ([]byte, error) { func (e *encoder) Read() ([]byte, func(), error) {
buff, err := e.reader.Read() buff, _, err := e.reader.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
encoded := make([]byte, 1024) encoded := make([]byte, 1024)
switch b := buff.(type) { switch b := buff.(type) {
case *wave.Int16Interleaved: case *wave.Int16Interleaved:
n, err := e.engine.Encode(b.Data, encoded) n, err := e.engine.Encode(b.Data, encoded)
return encoded[:n:n], err return encoded[:n:n], func() {}, err
case *wave.Float32Interleaved: case *wave.Float32Interleaved:
n, err := e.engine.EncodeFloat32(b.Data, encoded) n, err := e.engine.EncodeFloat32(b.Data, encoded)
return encoded[:n:n], err return encoded[:n:n], func() {}, err
default: default:
return nil, errors.New("unknown type of audio buffer") return nil, func() {}, errors.New("unknown type of audio buffer")
} }
} }

View File

@@ -64,7 +64,6 @@ import (
"unsafe" "unsafe"
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
) )
@@ -296,17 +295,17 @@ func newVP8Encoder(r video.Reader, p prop.Media, params ParamsVP8) (codec.ReadCl
return e, nil return e, nil
} }
func (e *encoderVP8) Read() ([]byte, error) { func (e *encoderVP8) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
@@ -348,7 +347,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
} }
} }
if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE { if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE {
return nil, errors.New("no available surface") return nil, func() {}, errors.New("no available surface")
} }
C.setForceKFFlagVP8(&e.picParam, 0) C.setForceKFFlagVP8(&e.picParam, 0)
@@ -416,7 +415,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
C.size_t(uintptr(p.src)), C.size_t(uintptr(p.src)),
&id, &id,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
buffs = append(buffs, id) buffs = append(buffs, id)
} }
@@ -426,17 +425,17 @@ func (e *encoderVP8) Read() ([]byte, error) {
e.display, e.ctxID, e.display, e.ctxID,
e.surfs[surfaceVP8Input], e.surfs[surfaceVP8Input],
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
} }
// Upload image // Upload image
var vaImg C.VAImage var vaImg C.VAImage
var rawBuf unsafe.Pointer var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS { if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS { if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
// TODO: use vaImg.pitches to support padding // TODO: use vaImg.pitches to support padding
C.memcpy( C.memcpy(
@@ -452,10 +451,10 @@ func (e *encoderVP8) Read() ([]byte, error) {
unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)), unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)),
) )
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS { if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS { if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaRenderPicture( if s := C.vaRenderPicture(
@@ -463,38 +462,38 @@ func (e *encoderVP8) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput &buffs[1], // 0 is for ouput
C.int(len(buffs)-1), C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaEndPicture( if s := C.vaEndPicture(
e.display, e.ctxID, e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
} }
// Load encoded data // Load encoded data
for retry := 3; retry >= 0; retry-- { for retry := 3; retry >= 0; retry-- {
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS { if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
} }
var surfStat C.VASurfaceStatus var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus( if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat, e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
} }
if surfStat == C.VASurfaceReady { if surfStat == C.VASurfaceReady {
break break
} }
if retry == 0 { if retry == 0 {
return nil, fmt.Errorf("failed to sync surface: %d", surfStat) return nil, func() {}, fmt.Errorf("failed to sync surface: %d", surfStat)
} }
} }
var seg *C.VACodedBufferSegment var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS { if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 { if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 {
return nil, errors.New("buffer size too small") return nil, func() {}, errors.New("buffer size too small")
} }
if cap(e.frame) < int(seg.size) { if cap(e.frame) < int(seg.size) {
@@ -507,13 +506,13 @@ func (e *encoderVP8) Read() ([]byte, error) {
) )
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS { if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
// Destroy buffers // Destroy buffers
for _, b := range buffs { for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS { if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
} }
@@ -538,7 +537,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame)) encoded := make([]byte, len(e.frame))
copy(encoded, e.frame) copy(encoded, e.frame)
return encoded, err return encoded, func() {}, err
} }
func (e *encoderVP8) SetBitRate(b int) error { func (e *encoderVP8) SetBitRate(b int) error {

View File

@@ -47,7 +47,6 @@ import (
"unsafe" "unsafe"
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video" "github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop" "github.com/pion/mediadevices/pkg/prop"
) )
@@ -285,17 +284,17 @@ func newVP9Encoder(r video.Reader, p prop.Media, params ParamsVP9) (codec.ReadCl
return e, nil return e, nil
} }
func (e *encoderVP9) Read() ([]byte, error) { func (e *encoderVP9) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
@@ -379,7 +378,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.size_t(uintptr(p.src)), C.size_t(uintptr(p.src)),
&id, &id,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
buffs = append(buffs, id) buffs = append(buffs, id)
} }
@@ -389,17 +388,17 @@ func (e *encoderVP9) Read() ([]byte, error) {
e.display, e.ctxID, e.display, e.ctxID,
e.surfs[surfaceVP9Input], e.surfs[surfaceVP9Input],
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
} }
// Upload image // Upload image
var vaImg C.VAImage var vaImg C.VAImage
var rawBuf unsafe.Pointer var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP9Input], &vaImg); s != C.VA_STATUS_SUCCESS { if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP9Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS { if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
// TODO: use vaImg.pitches to support padding // TODO: use vaImg.pitches to support padding
C.copyI420toNV12( C.copyI420toNV12(
@@ -410,10 +409,10 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.uint(len(yuvImg.Y)), C.uint(len(yuvImg.Y)),
) )
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS { if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS { if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaRenderPicture( if s := C.vaRenderPicture(
@@ -421,27 +420,27 @@ func (e *encoderVP9) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput &buffs[1], // 0 is for ouput
C.int(len(buffs)-1), C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
} }
if s := C.vaEndPicture( if s := C.vaEndPicture(
e.display, e.ctxID, e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
} }
// Load encoded data // Load encoded data
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS { if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
} }
var surfStat C.VASurfaceStatus var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus( if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat, e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS { ); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
} }
var seg *C.VACodedBufferSegment var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS { if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
if cap(e.frame) < int(seg.size) { if cap(e.frame) < int(seg.size) {
e.frame = make([]byte, int(seg.size)) e.frame = make([]byte, int(seg.size))
@@ -453,13 +452,13 @@ func (e *encoderVP9) Read() ([]byte, error) {
) )
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS { if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
// Destroy buffers // Destroy buffers
for _, b := range buffs { for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS { if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s))) return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
} }
} }
@@ -473,7 +472,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame)) encoded := make([]byte, len(e.frame))
copy(encoded, e.frame) copy(encoded, e.frame)
return encoded, err return encoded, func() {}, err
} }
func (e *encoderVP9) SetBitRate(b int) error { func (e *encoderVP9) SetBitRate(b int) error {

View File

@@ -204,17 +204,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params, codecIface *C.vpx_c
}, nil }, nil
} }
func (e *encoder) Read() ([]byte, error) { func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
bounds := yuvImg.Bounds() bounds := yuvImg.Bounds()
@@ -230,7 +230,7 @@ func (e *encoder) Read() ([]byte, error) {
if e.cfg.g_w != C.uint(width) || e.cfg.g_h != C.uint(height) { if e.cfg.g_w != C.uint(width) || e.cfg.g_h != C.uint(height) {
e.cfg.g_w, e.cfg.g_h = C.uint(width), C.uint(height) e.cfg.g_w, e.cfg.g_h = C.uint(width), C.uint(height)
if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != C.VPX_CODEC_OK { if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec) return nil, func() {}, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
} }
e.raw.w, e.raw.h = C.uint(width), C.uint(height) e.raw.w, e.raw.h = C.uint(width), C.uint(height)
e.raw.r_w, e.raw.r_h = C.uint(width), C.uint(height) e.raw.r_w, e.raw.r_h = C.uint(width), C.uint(height)
@@ -243,7 +243,7 @@ func (e *encoder) Read() ([]byte, error) {
C.long(t-e.tStart), C.ulong(t-e.tLastFrame), C.long(flags), C.ulong(e.deadline), C.long(t-e.tStart), C.ulong(t-e.tLastFrame), C.long(flags), C.ulong(e.deadline),
(*C.uchar)(&yuvImg.Y[0]), (*C.uchar)(&yuvImg.Cb[0]), (*C.uchar)(&yuvImg.Cr[0]), (*C.uchar)(&yuvImg.Y[0]), (*C.uchar)(&yuvImg.Cb[0]), (*C.uchar)(&yuvImg.Cr[0]),
); ec != C.VPX_CODEC_OK { ); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_encode failed (%d)", ec) return nil, func() {}, fmt.Errorf("vpx_codec_encode failed (%d)", ec)
} }
e.frameIndex++ e.frameIndex++
@@ -264,7 +264,7 @@ func (e *encoder) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame)) encoded := make([]byte, len(e.frame))
copy(encoded, e.frame) copy(encoded, e.frame)
return encoded, err return encoded, func() {}, err
} }
func (e *encoder) SetBitRate(b int) error { func (e *encoder) SetBitRate(b int) error {

View File

@@ -94,17 +94,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil return &e, nil
} }
func (e *encoder) Read() ([]byte, error) { func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock() e.mu.Lock()
defer e.mu.Unlock() defer e.mu.Unlock()
if e.closed { if e.closed {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img, err := e.r.Read() img, _, err := e.r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
@@ -117,11 +117,11 @@ func (e *encoder) Read() ([]byte, error) {
&rc, &rc,
) )
if err := errFromC(rc); err != nil { if err := errFromC(rc); err != nil {
return nil, err return nil, func() {}, err
} }
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len) encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, err return encoded, func() {}, err
} }
func (e *encoder) SetBitRate(b int) error { func (e *encoder) SetBitRate(b int) error {

View File

@@ -52,10 +52,10 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
closed := d.closed closed := d.closed
reader := audio.ReaderFunc(func() (wave.Audio, error) { reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
select { select {
case <-closed: case <-closed:
return nil, io.EOF return nil, func() {}, io.EOF
default: default:
} }
@@ -78,7 +78,7 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
a.SetFloat32(i, ch, wave.Float32Sample(sin[phase])) a.SetFloat32(i, ch, wave.Float32Sample(sin[phase]))
} }
} }
return a, nil return a, func() {}, nil
}) })
return reader, nil return reader, nil
} }

View File

@@ -56,10 +56,10 @@ func (cam *camera) VideoRecord(property prop.Media) (video.Reader, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
r := video.ReaderFunc(func() (image.Image, error) { r := video.ReaderFunc(func() (image.Image, func(), error) {
frame, err := rc.Read() frame, _, err := rc.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
return decoder.Decode(frame, property.Width, property.Height) return decoder.Decode(frame, property.Width, property.Height)
}) })

View File

@@ -182,7 +182,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel c.cancel = cancel
var buf []byte var buf []byte
r := video.ReaderFunc(func() (img image.Image, err error) { r := video.ReaderFunc(func() (img image.Image, release func(), err error) {
// Lock to avoid accessing the buffer after StopStreaming() // Lock to avoid accessing the buffer after StopStreaming()
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
@@ -191,23 +191,23 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
for i := 0; i < maxEmptyFrameCount; i++ { for i := 0; i < maxEmptyFrameCount; i++ {
if ctx.Err() != nil { if ctx.Err() != nil {
// Return EOF if the camera is already closed. // Return EOF if the camera is already closed.
return nil, io.EOF return nil, func() {}, io.EOF
} }
err := cam.WaitForFrame(5) // 5 seconds err := cam.WaitForFrame(5) // 5 seconds
switch err.(type) { switch err.(type) {
case nil: case nil:
case *webcam.Timeout: case *webcam.Timeout:
return nil, errReadTimeout return nil, func() {}, errReadTimeout
default: default:
// Camera has been stopped. // Camera has been stopped.
return nil, err return nil, func() {}, err
} }
b, err := cam.ReadFrame() b, err := cam.ReadFrame()
if err != nil { if err != nil {
// Camera has been stopped. // Camera has been stopped.
return nil, err return nil, func() {}, err
} }
// Frame is empty. // Frame is empty.
@@ -227,7 +227,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
n := copy(buf, b) n := copy(buf, b)
return decoder.Decode(buf[:n], p.Width, p.Height) return decoder.Decode(buf[:n], p.Width, p.Height)
} }
return nil, errEmptyFrame return nil, func() {}, errEmptyFrame
}) })
return r, nil return r, nil

View File

@@ -116,10 +116,10 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img := &image.YCbCr{} img := &image.YCbCr{}
r := video.ReaderFunc(func() (image.Image, error) { r := video.ReaderFunc(func() (image.Image, func(), error) {
b, ok := <-c.ch b, ok := <-c.ch
if !ok { if !ok {
return nil, io.EOF return nil, func() {}, io.EOF
} }
img.Y = b[:nPix] img.Y = b[:nPix]
img.Cb = b[nPix : nPix+nPix/2] img.Cb = b[nPix : nPix+nPix/2]
@@ -128,7 +128,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img.CStride = p.Width / 2 img.CStride = p.Width / 2
img.SubsampleRatio = image.YCbCrSubsampleRatio422 img.SubsampleRatio = image.YCbCrSubsampleRatio422
img.Rect = image.Rect(0, 0, p.Width, p.Height) img.Rect = image.Rect(0, 0, p.Width, p.Height)
return img, nil return img, func() {}, nil
}) })
return r, nil return r, nil
} }

View File

@@ -97,22 +97,23 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
return nil, err return nil, err
} }
reader := audio.ReaderFunc(func() (wave.Audio, error) { reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
buff, ok := <-samplesChan buff, ok := <-samplesChan
if !ok { if !ok {
stream.Close() stream.Close()
return nil, io.EOF return nil, func() {}, io.EOF
} }
a := wave.NewInt16Interleaved( a := wave.NewInt16Interleaved(
wave.ChunkInfo{ wave.ChunkInfo{
Channels: p.ChannelCount, Channels: p.ChannelCount,
Len: len(buff) / p.ChannelCount, Len: len(buff) / p.ChannelCount,
SamplingRate: p.SampleRate,
}, },
) )
copy(a.Data, buff) copy(a.Data, buff)
return a, nil return a, func() {}, nil
}) })
stream.Start() stream.Start()

View File

@@ -194,10 +194,10 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
// TODO: detect microphone device disconnection and return EOF // TODO: detect microphone device disconnection and return EOF
reader := audio.ReaderFunc(func() (wave.Audio, error) { reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
b, ok := <-m.chBuf b, ok := <-m.chBuf
if !ok { if !ok {
return nil, io.EOF return nil, func() {}, io.EOF
} }
select { select {
@@ -210,14 +210,15 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
uintptr(unsafe.Sizeof(b.waveHdr)), uintptr(unsafe.Sizeof(b.waveHdr)),
) )
if err := errWinmm[ret]; err != nil { if err := errWinmm[ret]; err != nil {
return nil, err return nil, func() {}, err
} }
} }
a := wave.NewInt16Interleaved( a := wave.NewInt16Interleaved(
wave.ChunkInfo{ wave.ChunkInfo{
Channels: p.ChannelCount, Channels: p.ChannelCount,
Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount, Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount,
SamplingRate: p.SampleRate,
}, },
) )
@@ -229,7 +230,7 @@ func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
} }
} }
return a, nil return a, func() {}, nil
}) })
return reader, nil return reader, nil
} }

View File

@@ -68,9 +68,9 @@ func (s *screen) VideoRecord(p prop.Media) (video.Reader, error) {
var dst image.RGBA var dst image.RGBA
reader := s.reader reader := s.reader
r := video.ReaderFunc(func() (image.Image, error) { r := video.ReaderFunc(func() (image.Image, func(), error) {
<-s.tick.C <-s.tick.C
return reader.Read().ToRGBA(&dst), nil return reader.Read().ToRGBA(&dst), func() {}, nil
}) })
return r, nil return r, nil
} }

View File

@@ -103,10 +103,10 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
d.tick = tick d.tick = tick
closed := d.closed closed := d.closed
r := video.ReaderFunc(func() (image.Image, error) { r := video.ReaderFunc(func() (image.Image, func(), error) {
select { select {
case <-closed: case <-closed:
return nil, io.EOF return nil, func() {}, io.EOF
default: default:
} }
@@ -130,7 +130,7 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
CStride: p.Width / 2, CStride: p.Width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422, SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, p.Width, p.Height), Rect: image.Rect(0, 0, p.Width, p.Height),
}, nil }, func() {}, nil
}) })
return r, nil return r, nil

View File

@@ -6,6 +6,7 @@ import (
"image/jpeg" "image/jpeg"
) )
func decodeMJPEG(frame []byte, width, height int) (image.Image, error) { func decodeMJPEG(frame []byte, width, height int) (image.Image, func(), error) {
return jpeg.Decode(bytes.NewReader(frame)) img, err := jpeg.Decode(bytes.NewReader(frame))
return img, func() {}, err
} }

View File

@@ -3,12 +3,12 @@ package frame
import "image" import "image"
type Decoder interface { type Decoder interface {
Decode(frame []byte, width, height int) (image.Image, error) Decode(frame []byte, width, height int) (image.Image, func(), error)
} }
// DecoderFunc is a proxy type for Decoder // DecoderFunc is a proxy type for Decoder
type decoderFunc func(frame []byte, width, height int) (image.Image, error) type decoderFunc func(frame []byte, width, height int) (image.Image, func(), error)
func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, error) { func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, func(), error) {
return f(frame, width, height) return f(frame, width, height)
} }

View File

@@ -5,13 +5,13 @@ import (
"image" "image"
) )
func decodeI420(frame []byte, width, height int) (image.Image, error) { func decodeI420(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
cbi := yi + width*height/4 cbi := yi + width*height/4
cri := cbi + width*height/4 cri := cbi + width*height/4
if cri > len(frame) { if cri > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
} }
return &image.YCbCr{ return &image.YCbCr{
@@ -22,15 +22,15 @@ func decodeI420(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420, SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }
func decodeNV21(frame []byte, width, height int) (image.Image, error) { func decodeNV21(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
ci := yi + width*height/2 ci := yi + width*height/2
if ci > len(frame) { if ci > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
} }
var cb, cr []byte var cb, cr []byte
@@ -47,5 +47,5 @@ func decodeNV21(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420, SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }

View File

@@ -12,13 +12,13 @@ import (
// void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height); // void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height);
import "C" import "C"
func decodeYUY2(frame []byte, width, height int) (image.Image, error) { func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
ci := yi / 2 ci := yi / 2
fi := yi + 2*ci fi := yi + 2*ci
if len(frame) != fi { if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
} }
y := make([]byte, yi) y := make([]byte, yi)
@@ -41,16 +41,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422, SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }
func decodeUYVY(frame []byte, width, height int) (image.Image, error) { func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
ci := yi / 2 ci := yi / 2
fi := yi + 2*ci fi := yi + 2*ci
if len(frame) != fi { if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
} }
y := make([]byte, yi) y := make([]byte, yi)
@@ -73,5 +73,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422, SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }

View File

@@ -7,13 +7,13 @@ import (
"image" "image"
) )
func decodeYUY2(frame []byte, width, height int) (image.Image, error) { func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
ci := yi / 2 ci := yi / 2
fi := yi + 2*ci fi := yi + 2*ci
if len(frame) != fi { if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
} }
y := make([]byte, yi) y := make([]byte, yi)
@@ -39,16 +39,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422, SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }
func decodeUYVY(frame []byte, width, height int) (image.Image, error) { func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height yi := width * height
ci := yi / 2 ci := yi / 2
fi := yi + 2*ci fi := yi + 2*ci
if len(frame) != fi { if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi) return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
} }
y := make([]byte, yi) y := make([]byte, yi)
@@ -74,5 +74,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2, CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422, SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
}, nil }, func() {}, nil
} }

View File

@@ -27,7 +27,7 @@ func TestDecodeYUY2(t *testing.T) {
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
} }
img, err := decodeYUY2(input, width, height) img, _, err := decodeYUY2(input, width, height)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -56,7 +56,7 @@ func TestDecodeUYVY(t *testing.T) {
Rect: image.Rect(0, 0, width, height), Rect: image.Rect(0, 0, width, height),
} }
img, err := decodeUYVY(input, width, height) img, _, err := decodeUYVY(input, width, height)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -77,7 +77,7 @@ func BenchmarkDecodeYUY2(b *testing.B) {
b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) { b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) {
input := make([]byte, sz.width*sz.height*2) input := make([]byte, sz.width*sz.height*2)
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := decodeYUY2(input, sz.width, sz.height) _, _, err := decodeYUY2(input, sz.width, sz.height)
if err != nil { if err != nil {
b.Fatal(err) b.Fatal(err)
} }

View File

@@ -5,13 +5,14 @@ import (
) )
type Reader interface { type Reader interface {
Read() (wave.Audio, error) Read() (chunk wave.Audio, release func(), err error)
} }
type ReaderFunc func() (wave.Audio, error) type ReaderFunc func() (chunk wave.Audio, release func(), err error)
func (rf ReaderFunc) Read() (wave.Audio, error) { func (rf ReaderFunc) Read() (chunk wave.Audio, release func(), err error) {
return rf() chunk, release, err = rf()
return
} }
// TransformFunc produces a new Reader that will produces a transformed audio // TransformFunc produces a new Reader that will produces a transformed audio

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core coreConfig = config.Core
} }
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) { broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read() return source.Read()
}), coreConfig) }), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader {
} }
reader := broadcaster.ioBroadcaster.NewReader(copyFn) reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
data, err := reader.Read() data, _, err := reader.Read()
chunk, _ := data.(wave.Audio) chunk, _ := data.(wave.Audio)
return chunk, err return chunk, func() {}, err
}) })
} }
// ReplaceSource replaces the underlying source. This operation is thread safe. // ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) { return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read() return source.Read()
})) }))
} }
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe. // Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader { func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source() source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
data, err := source.Read() data, _, err := source.Read()
img, _ := data.(wave.Audio) img, _ := data.(wave.Audio)
return img, err return img, func() {}, err
}) })
} }

View File

@@ -14,18 +14,18 @@ func TestBroadcast(t *testing.T) {
SamplingRate: 48000, SamplingRate: 48000,
}) })
source := ReaderFunc(func() (wave.Audio, error) { source := ReaderFunc(func() (wave.Audio, func(), error) {
return chunk, nil return chunk, func() {}, nil
}) })
broadcaster := NewBroadcaster(source, nil) broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false) readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false) readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read() actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
actualWithoutCopy2, err := readerWithoutCopy2.Read() actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -39,7 +39,7 @@ func TestBroadcast(t *testing.T) {
} }
readerWithCopy := broadcaster.NewReader(true) readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read() actualWithCopy, _, err := readerWithCopy.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -13,15 +13,15 @@ func NewBuffer(nSamples int) TransformFunc {
var inBuff wave.Audio var inBuff wave.Audio
return func(r Reader) Reader { return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
for { for {
if inBuff != nil && inBuff.ChunkInfo().Len >= nSamples { if inBuff != nil && inBuff.ChunkInfo().Len >= nSamples {
break break
} }
buff, err := r.Read() buff, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
switch b := buff.(type) { switch b := buff.(type) {
case *wave.Float32Interleaved: case *wave.Float32Interleaved:
@@ -59,7 +59,7 @@ func NewBuffer(nSamples int) TransformFunc {
ib.Size.Len += b.Size.Len ib.Size.Len += b.Size.Len
default: default:
return nil, errUnsupported return nil, func() {}, errUnsupported
} }
} }
switch ib := inBuff.(type) { switch ib := inBuff.(type) {
@@ -71,7 +71,7 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data) copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:] ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples ib.Size.Len -= nSamples
return &ibCopy, nil return &ibCopy, func() {}, nil
case *wave.Float32Interleaved: case *wave.Float32Interleaved:
ibCopy := *ib ibCopy := *ib
@@ -81,9 +81,9 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data) copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:] ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples ib.Size.Len -= nSamples
return &ibCopy, nil return &ibCopy, func() {}, nil
} }
return nil, errUnsupported return nil, func() {}, errUnsupported
}) })
} }
} }

View File

@@ -49,16 +49,16 @@ func TestBuffer(t *testing.T) {
trans := NewBuffer(3) trans := NewBuffer(3)
var iSent int var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) { r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) { if iSent < len(input) {
iSent++ iSent++
return input[iSent-1], nil return input[iSent-1], func() {}, nil
} }
return nil, io.EOF return nil, func() {}, io.EOF
})) }))
for i := 0; ; i++ { for i := 0; ; i++ {
a, err := r.Read() a, _, err := r.Read()
if err != nil { if err != nil {
if err == io.EOF && i >= len(expected) { if err == io.EOF && i >= len(expected) {
break break

View File

@@ -13,12 +13,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
return func(r Reader) Reader { return func(r Reader) Reader {
var currentProp prop.Media var currentProp prop.Media
var chunkCount uint var chunkCount uint
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
var dirty bool var dirty bool
chunk, err := r.Read() chunk, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
info := chunk.ChunkInfo() info := chunk.ChunkInfo()
@@ -32,6 +32,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
dirty = true dirty = true
} }
latency := time.Duration(chunk.ChunkInfo().Len) * time.Second / time.Nanosecond / time.Duration(currentProp.SampleRate)
if currentProp.Latency != latency {
currentProp.Latency = latency
dirty = true
}
// TODO: Also detect sample format changes? // TODO: Also detect sample format changes?
// TODO: Add audio detect changes. As of now, there's no useful property to track. // TODO: Add audio detect changes. As of now, there's no useful property to track.
@@ -40,7 +46,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
} }
chunkCount++ chunkCount++
return chunk, nil return chunk, func() {}, nil
}) })
} }
} }

View File

@@ -11,12 +11,12 @@ import (
func TestDetectChanges(t *testing.T) { func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) { buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
return wave.NewFloat32Interleaved(wave.ChunkInfo{ return wave.NewFloat32Interleaved(wave.ChunkInfo{
Len: 0, Len: 960,
Channels: p.ChannelCount, Channels: p.ChannelCount,
SamplingRate: p.SampleRate, SamplingRate: p.SampleRate,
}), nil }), func() {}, nil
}), func(newProp prop.Media) { }), func(newProp prop.Media) {
p = newProp p = newProp
} }
@@ -28,13 +28,14 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media var actual prop.Media
expected.ChannelCount = 2 expected.ChannelCount = 2
expected.SampleRate = 48000 expected.SampleRate = 48000
expected.Latency = time.Millisecond * 20
src, _ := buildSource(expected) src, _ := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) { src = DetectChanges(time.Second, func(p prop.Media) {
actual = p actual = p
detectBeforeFirstChunk = true detectBeforeFirstChunk = true
})(src) })(src)
_, err := src.Read() _, _, err := src.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -53,24 +54,22 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media var actual prop.Media
expected.ChannelCount = 2 expected.ChannelCount = 2
expected.SampleRate = 48000 expected.SampleRate = 48000
expected.Latency = 20 * time.Millisecond
src, update := buildSource(expected) src, update := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) { src = DetectChanges(time.Second, func(p prop.Media) {
actual = p actual = p
})(src) })(src)
for channelCount := 1; channelCount < 8; channelCount++ { for channelCount := 1; channelCount < 8; channelCount++ {
for sampleRate := 12000; sampleRate <= 48000; sampleRate += 4000 { expected.ChannelCount = channelCount
expected.ChannelCount = channelCount update(expected)
expected.SampleRate = sampleRate _, _, err := src.Read()
update(expected) if err != nil {
_, err := src.Read() t.Fatal(err)
if err != nil { }
t.Fatal(err)
}
if !reflect.DeepEqual(actual, expected) { if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual) t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual)
}
} }
} }
}) })

View File

@@ -8,14 +8,14 @@ import (
// NewChannelMixer creates audio transform to mix audio channels. // NewChannelMixer creates audio transform to mix audio channels.
func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc { func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
return func(r Reader) Reader { return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) { return ReaderFunc(func() (wave.Audio, func(), error) {
buff, err := r.Read() buff, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
ci := buff.ChunkInfo() ci := buff.ChunkInfo()
if ci.Channels == channels { if ci.Channels == channels {
return buff, nil return buff, func() {}, nil
} }
ci.Channels = channels ci.Channels = channels
@@ -32,9 +32,9 @@ func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
mixed = wave.NewFloat32NonInterleaved(ci) mixed = wave.NewFloat32NonInterleaved(ci)
} }
if err := mixer.Mix(mixed, buff); err != nil { if err := mixer.Mix(mixed, buff); err != nil {
return nil, err return nil, func() {}, err
} }
return mixed, nil return mixed, func() {}, nil
}) })
} }
} }

View File

@@ -34,16 +34,16 @@ func TestMixer(t *testing.T) {
trans := NewChannelMixer(1, &mixer.MonoMixer{}) trans := NewChannelMixer(1, &mixer.MonoMixer{})
var iSent int var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) { r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) { if iSent < len(input) {
iSent++ iSent++
return input[iSent-1], nil return input[iSent-1], func() {}, nil
} }
return nil, io.EOF return nil, func() {}, io.EOF
})) }))
for i := 0; ; i++ { for i := 0; ; i++ {
a, err := r.Read() a, _, err := r.Read()
if err != nil { if err != nil {
if err == io.EOF && i >= len(expected) { if err == io.EOF && i >= len(expected) {
break break

View File

@@ -127,10 +127,10 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader { func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader {
currentCount := broadcaster.buffer.lastCount() currentCount := broadcaster.buffer.lastCount()
return ReaderFunc(func() (data interface{}, err error) { return ReaderFunc(func() (data interface{}, release func(), err error) {
currentCount++ currentCount++
if push := broadcaster.buffer.acquire(currentCount); push != nil { if push := broadcaster.buffer.acquire(currentCount); push != nil {
data, err = broadcaster.source.Load().(Reader).Read() data, _, err = broadcaster.source.Load().(Reader).Read()
push(&broadcasterData{ push(&broadcasterData{
data: data, data: data,
err: err, err: err,

View File

@@ -57,7 +57,7 @@ func TestBroadcast(t *testing.T) {
frameCount := 0 frameCount := 0
frameSent := 0 frameSent := 0
lastSend := time.Now() lastSend := time.Now()
src = ReaderFunc(func() (interface{}, error) { src = ReaderFunc(func() (interface{}, func(), error) {
if pauseCond.src && frameSent == 30 { if pauseCond.src && frameSent == 30 {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
@@ -74,7 +74,7 @@ func TestBroadcast(t *testing.T) {
frame := frames[frameCount] frame := frames[frameCount]
frameCount++ frameCount++
frameSent++ frameSent++
return frame, nil return frame, func() {}, nil
}) })
broadcaster := NewBroadcaster(src, nil) broadcaster := NewBroadcaster(src, nil)
var done uint32 var done uint32
@@ -95,7 +95,7 @@ func TestBroadcast(t *testing.T) {
if pauseCond.dst && count == 30 { if pauseCond.dst && count == 30 {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
frame, err := reader.Read() frame, _, err := reader.Read()
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@@ -3,12 +3,13 @@ package io
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type // Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
// to provide strong type. // to provide strong type.
type Reader interface { type Reader interface {
Read() (interface{}, error) Read() (data interface{}, release func(), err error)
} }
// ReaderFunc is a proxy type for Reader // ReaderFunc is a proxy type for Reader
type ReaderFunc func() (interface{}, error) type ReaderFunc func() (data interface{}, release func(), err error)
func (f ReaderFunc) Read() (interface{}, error) { func (f ReaderFunc) Read() (data interface{}, release func(), err error) {
return f() data, release, err = f()
return
} }

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core coreConfig = config.Core
} }
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) { broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read() return source.Read()
}), coreConfig) }), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
} }
reader := broadcaster.ioBroadcaster.NewReader(copyFn) reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
data, err := reader.Read() data, _, err := reader.Read()
img, _ := data.(image.Image) img, _ := data.(image.Image)
return img, err return img, func() {}, err
}) })
} }
// ReplaceSource replaces the underlying source. This operation is thread safe. // ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error { func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) { return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read() return source.Read()
})) }))
} }
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe. // Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader { func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source() source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
data, err := source.Read() data, _, err := source.Read()
img, _ := data.(image.Image) img, _ := data.(image.Image)
return img, err return img, func() {}, err
}) })
} }

View File

@@ -9,18 +9,18 @@ import (
func TestBroadcast(t *testing.T) { func TestBroadcast(t *testing.T) {
resolution := image.Rect(0, 0, 1920, 1080) resolution := image.Rect(0, 0, 1920, 1080)
img := image.NewGray(resolution) img := image.NewGray(resolution)
source := ReaderFunc(func() (image.Image, error) { source := ReaderFunc(func() (image.Image, func(), error) {
return img, nil return img, func() {}, nil
}) })
broadcaster := NewBroadcaster(source, nil) broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false) readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false) readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read() actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
actualWithoutCopy2, err := readerWithoutCopy2.Read() actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -34,7 +34,7 @@ func TestBroadcast(t *testing.T) {
} }
readerWithCopy := broadcaster.NewReader(true) readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read() actualWithCopy, _, err := readerWithCopy.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -63,10 +63,10 @@ func imageToYCbCr(dst *image.YCbCr, src image.Image) {
// ToI420 converts r to a new reader that will output images in I420 format // ToI420 converts r to a new reader that will output images in I420 format
func ToI420(r Reader) Reader { func ToI420(r Reader) Reader {
var yuvImg image.YCbCr var yuvImg image.YCbCr
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
img, err := r.Read() img, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
imageToYCbCr(&yuvImg, img) imageToYCbCr(&yuvImg, img)
@@ -79,11 +79,11 @@ func ToI420(r Reader) Reader {
i422ToI420(&yuvImg) i422ToI420(&yuvImg)
case image.YCbCrSubsampleRatio420: case image.YCbCrSubsampleRatio420:
default: default:
return nil, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio) return nil, func() {}, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio)
} }
yuvImg.SubsampleRatio = image.YCbCrSubsampleRatio420 yuvImg.SubsampleRatio = image.YCbCrSubsampleRatio420
return &yuvImg, nil return &yuvImg, func() {}, nil
}) })
} }
@@ -130,13 +130,13 @@ func imageToRGBA(dst *image.RGBA, src image.Image) {
// ToRGBA converts r to a new reader that will output images in RGBA format // ToRGBA converts r to a new reader that will output images in RGBA format
func ToRGBA(r Reader) Reader { func ToRGBA(r Reader) Reader {
var dst image.RGBA var dst image.RGBA
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
img, err := r.Read() img, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
imageToRGBA(&dst, img) imageToRGBA(&dst, img)
return &dst, nil return &dst, func() {}, nil
}) })
} }

View File

@@ -144,10 +144,10 @@ func TestToI420(t *testing.T) {
for name, c := range cases { for name, c := range cases {
c := c c := c
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
r := ToI420(ReaderFunc(func() (image.Image, error) { r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return c.src, nil return c.src, func() {}, nil
})) }))
out, err := r.Read() out, _, err := r.Read()
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@@ -199,10 +199,10 @@ func TestToRGBA(t *testing.T) {
for name, c := range cases { for name, c := range cases {
c := c c := c
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) { r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return c.src, nil return c.src, func() {}, nil
})) }))
out, err := r.Read() out, _, err := r.Read()
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@@ -225,12 +225,12 @@ func BenchmarkToI420(b *testing.B) {
for name, img := range cases { for name, img := range cases {
img := img img := img
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
r := ToI420(ReaderFunc(func() (image.Image, error) { r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return img, nil return img, func() {}, nil
})) }))
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := r.Read() _, _, err := r.Read()
if err != nil { if err != nil {
b.Fatalf("Unexpected error: %v", err) b.Fatalf("Unexpected error: %v", err)
} }
@@ -253,12 +253,12 @@ func BenchmarkToRGBA(b *testing.B) {
for name, img := range cases { for name, img := range cases {
img := img img := img
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) { r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return img, nil return img, func() {}, nil
})) }))
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := r.Read() _, _, err := r.Read()
if err != nil { if err != nil {
b.Fatalf("Unexpected error: %v", err) b.Fatalf("Unexpected error: %v", err)
} }

View File

@@ -14,12 +14,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
var currentProp prop.Media var currentProp prop.Media
var lastTaken time.Time var lastTaken time.Time
var frames uint var frames uint
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
var dirty bool var dirty bool
img, err := r.Read() img, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
bounds := img.Bounds() bounds := img.Bounds()
@@ -52,7 +52,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
} }
frames++ frames++
return img, nil return img, func() {}, nil
}) })
} }
} }

View File

@@ -12,8 +12,8 @@ import (
func BenchmarkDetectChanges(b *testing.B) { func BenchmarkDetectChanges(b *testing.B) {
var src Reader var src Reader
src = ReaderFunc(func() (image.Image, error) { src = ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), nil return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), func() {}, nil
}) })
b.Run("WithoutDetectChanges", func(b *testing.B) { b.Run("WithoutDetectChanges", func(b *testing.B) {
@@ -40,8 +40,8 @@ func BenchmarkDetectChanges(b *testing.B) {
func TestDetectChanges(t *testing.T) { func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) { buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), nil return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), func() {}, nil
}), func(newProp prop.Media) { }), func(newProp prop.Media) {
p = newProp p = newProp
} }
@@ -86,7 +86,7 @@ func TestDetectChanges(t *testing.T) {
detectBeforeFirstFrame = true detectBeforeFirstFrame = true
})(src) })(src)
frame, err := src.Read() frame, _, err := src.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -113,7 +113,7 @@ func TestDetectChanges(t *testing.T) {
expected.Width = width expected.Width = width
expected.Height = height expected.Height = height
update(expected) update(expected)
frame, err := src.Read() frame, _, err := src.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@@ -143,7 +143,7 @@ func TestDetectChanges(t *testing.T) {
})(src) })(src)
for count < 3 { for count < 3 {
frame, err := src.Read() frame, _, err := src.Read()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@@ -156,10 +156,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
} }
} }
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
img, err := r.Read() img, _, err := r.Read()
if err != nil { if err != nil {
return nil, err return nil, func() {}, err
} }
switch v := img.(type) { switch v := img.(type) {
@@ -169,7 +169,7 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, rect, v, v.Rect, draw.Src, nil) scalerCached.Scale(dst, rect, v, v.Rect, draw.Src, nil)
cloned := *dst // clone metadata cloned := *dst // clone metadata
return &cloned, nil return &cloned, func() {}, nil
case *image.YCbCr: case *image.YCbCr:
ycbcrRealloc(v) ycbcrRealloc(v)
@@ -184,10 +184,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, dst.Bounds(), src, src.Bounds(), draw.Src, nil) scalerCached.Scale(dst, dst.Bounds(), src, src.Bounds(), draw.Src, nil)
cloned := *(imgScaled.(*image.YCbCr)) // clone metadata cloned := *(imgScaled.(*image.YCbCr)) // clone metadata
return &cloned, nil return &cloned, func() {}, nil
default: default:
return nil, errUnsupportedImageType return nil, func() {}, errUnsupportedImageType
} }
}) })
} }

View File

@@ -215,11 +215,11 @@ func TestScale(t *testing.T) {
c := c c := c
t.Run(name, func(t *testing.T) { t.Run(name, func(t *testing.T) {
trans := Scale(c.width, c.height, algo) trans := Scale(c.width, c.height, algo)
r := trans(ReaderFunc(func() (image.Image, error) { r := trans(ReaderFunc(func() (image.Image, func(), error) {
return c.src, nil return c.src, func() {}, nil
})) }))
for i := 0; i < 4; i++ { for i := 0; i < 4; i++ {
out, err := r.Read() out, _, err := r.Read()
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }
@@ -261,12 +261,12 @@ func BenchmarkScale(b *testing.B) {
img := img img := img
b.Run(name, func(b *testing.B) { b.Run(name, func(b *testing.B) {
trans := Scale(640, 360, algo) trans := Scale(640, 360, algo)
r := trans(ReaderFunc(func() (image.Image, error) { r := trans(ReaderFunc(func() (image.Image, func(), error) {
return img, nil return img, func() {}, nil
})) }))
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, err := r.Read() _, _, err := r.Read()
if err != nil { if err != nil {
b.Fatalf("Unexpected error: %v", err) b.Fatalf("Unexpected error: %v", err)
} }

View File

@@ -10,16 +10,16 @@ import (
func Throttle(rate float32) TransformFunc { func Throttle(rate float32) TransformFunc {
return func(r Reader) Reader { return func(r Reader) Reader {
ticker := time.NewTicker(time.Duration(int64(float64(time.Second) / float64(rate)))) ticker := time.NewTicker(time.Duration(int64(float64(time.Second) / float64(rate))))
return ReaderFunc(func() (image.Image, error) { return ReaderFunc(func() (image.Image, func(), error) {
for { for {
img, err := r.Read() img, _, err := r.Read()
if err != nil { if err != nil {
ticker.Stop() ticker.Stop()
return nil, err return nil, func() {}, err
} }
select { select {
case <-ticker.C: case <-ticker.C:
return img, nil return img, func() {}, nil
default: default:
} }
} }

View File

@@ -19,14 +19,14 @@ func TestThrottle(t *testing.T) {
var cntPush int var cntPush int
trans := Throttle(50) trans := Throttle(50)
r := trans(ReaderFunc(func() (image.Image, error) { r := trans(ReaderFunc(func() (image.Image, func(), error) {
<-ticker.C <-ticker.C
cntPush++ cntPush++
return img, nil return img, func() {}, nil
})) }))
for i := 0; i < 20; i++ { for i := 0; i < 20; i++ {
_, err := r.Read() _, _, err := r.Read()
if err != nil { if err != nil {
t.Fatalf("Unexpected error: %v", err) t.Fatalf("Unexpected error: %v", err)
} }

View File

@@ -5,13 +5,14 @@ import (
) )
type Reader interface { type Reader interface {
Read() (img image.Image, err error) Read() (img image.Image, release func(), err error)
} }
type ReaderFunc func() (img image.Image, err error) type ReaderFunc func() (img image.Image, release func(), err error)
func (rf ReaderFunc) Read() (img image.Image, err error) { func (rf ReaderFunc) Read() (img image.Image, release func(), err error) {
return rf() img, release, err = rf()
return
} }
// TransformFunc produces a new Reader that will produces a transformed video // TransformFunc produces a new Reader that will produces a transformed video

View File

@@ -4,6 +4,7 @@ import (
"math" "math"
"time" "time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media" "github.com/pion/webrtc/v2/pkg/media"
) )
@@ -11,7 +12,7 @@ type samplerFunc func(b []byte) error
// newVideoSampler creates a video sampler that uses the actual video frame rate and // newVideoSampler creates a video sampler that uses the actual video frame rate and
// the codec's clock rate to come up with a duration for each sample. // the codec's clock rate to come up with a duration for each sample.
func newVideoSampler(t LocalTrack) samplerFunc { func newVideoSampler(t *webrtc.Track) samplerFunc {
clockRate := float64(t.Codec().ClockRate) clockRate := float64(t.Codec().ClockRate)
lastTimestamp := time.Now() lastTimestamp := time.Now()
@@ -27,7 +28,7 @@ func newVideoSampler(t LocalTrack) samplerFunc {
// newAudioSampler creates a audio sampler that uses a fixed latency and // newAudioSampler creates a audio sampler that uses a fixed latency and
// the codec's clock rate to come up with a duration for each sample. // the codec's clock rate to come up with a duration for each sample.
func newAudioSampler(t LocalTrack, latency time.Duration) samplerFunc { func newAudioSampler(t *webrtc.Track, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds())) samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds()))
return samplerFunc(func(b []byte) error { return samplerFunc(func(b []byte) error {
return t.WriteSample(media.Sample{Data: b, Samples: samples}) return t.WriteSample(media.Sample{Data: b, Samples: samples})

1
source.go Normal file
View File

@@ -0,0 +1 @@
package mediadevices

439
track.go
View File

@@ -2,239 +2,326 @@ package mediadevices
import ( import (
"errors" "errors"
"fmt"
"image"
"math/rand" "math/rand"
"sync" "sync"
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/wave"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
) )
// Tracker is an interface that represent MediaStreamTrack var (
errInvalidDriverType = errors.New("invalid driver type")
errNotFoundPeerConnection = errors.New("failed to find given peer connection")
)
// Source is a generic representation of a media source
type Source interface {
ID() string
Close() error
}
// VideoSource is a specific type of media source that emits a series of video frames
type VideoSource interface {
video.Reader
Source
}
// AudioSource is a specific type of media source that emits a series of audio chunks
type AudioSource interface {
audio.Reader
Source
}
// Track is an interface that represent MediaStreamTrack
// Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack // Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack
type Tracker interface { type Track interface {
Track() *webrtc.Track Source
LocalTrack() LocalTrack
Stop()
Kind() MediaDeviceType
// OnEnded registers a handler to receive an error from the media stream track. // OnEnded registers a handler to receive an error from the media stream track.
// If the error is already occured before registering, the handler will be // If the error is already occured before registering, the handler will be
// immediately called. // immediately called.
OnEnded(func(error)) OnEnded(func(error))
Kind() MediaDeviceType
// Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind
// call will happen automatically after the SDP negotiation. Users won't need to call this manually.
Bind(*webrtc.PeerConnection) (*webrtc.Track, error)
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
// be called automatically in the future.
Unbind(*webrtc.PeerConnection) error
} }
type LocalTrack interface { type baseTrack struct {
WriteSample(s media.Sample) error Source
Codec() *webrtc.RTPCodec err error
ID() string onErrorHandler func(error)
Kind() webrtc.RTPCodecType mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
selector *CodecSelector
activePeerConnections map[*webrtc.PeerConnection]chan<- chan<- struct{}
} }
type track struct { func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack {
localTrack LocalTrack return &baseTrack{
d driver.Driver Source: source,
sample samplerFunc kind: kind,
encoder codec.ReadCloser selector: selector,
activePeerConnections: make(map[*webrtc.PeerConnection]chan<- chan<- struct{}),
onErrorHandler func(error)
err error
mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
}
func newTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*track, error) {
var encoderBuilders []encoderBuilder
var rtpCodecs []*webrtc.RTPCodec
var buildSampler func(t LocalTrack) samplerFunc
var kind MediaDeviceType
var err error
err = d.Open()
if err != nil {
return nil, err
} }
switch r := d.(type) {
case driver.VideoRecorder:
kind = VideoInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeVideo]
buildSampler = newVideoSampler
encoderBuilders, err = newVideoEncoderBuilders(r, constraints)
case driver.AudioRecorder:
kind = AudioInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeAudio]
buildSampler = func(t LocalTrack) samplerFunc {
return newAudioSampler(t, constraints.selectedMedia.Latency)
}
encoderBuilders, err = newAudioEncoderBuilders(r, constraints)
default:
err = errors.New("newTrack: invalid driver type")
}
if err != nil {
d.Close()
return nil, err
}
for _, builder := range encoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == builder.name {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
localTrack, err := opts.trackGenerator(
matchedRTPCodec.PayloadType,
rand.Uint32(),
d.ID(),
matchedRTPCodec.Type.String(),
matchedRTPCodec,
)
if err != nil {
continue
}
encoder, err := builder.build()
if err != nil {
continue
}
t := track{
localTrack: localTrack,
sample: buildSampler(localTrack),
d: d,
encoder: encoder,
kind: kind,
}
go t.start()
return &t, nil
}
d.Close()
return nil, errors.New("newTrack: failed to find a matching codec")
} }
// Kind returns track's kind // Kind returns track's kind
func (t *track) Kind() MediaDeviceType { func (track *baseTrack) Kind() MediaDeviceType {
return t.kind return track.kind
} }
// OnEnded sets an error handler. When a track has been created and started, if an // OnEnded sets an error handler. When a track has been created and started, if an
// error occurs, handler will get called with the error given to the parameter. // error occurs, handler will get called with the error given to the parameter.
func (t *track) OnEnded(handler func(error)) { func (track *baseTrack) OnEnded(handler func(error)) {
t.mu.Lock() track.mu.Lock()
t.onErrorHandler = handler track.onErrorHandler = handler
err := t.err err := track.err
t.mu.Unlock() track.mu.Unlock()
if err != nil && handler != nil { if err != nil && handler != nil {
// Already errored. // Already errored.
t.endOnce.Do(func() { track.endOnce.Do(func() {
handler(err) handler(err)
}) })
} }
} }
// onError is a callback when an error occurs // onError is a callback when an error occurs
func (t *track) onError(err error) { func (track *baseTrack) onError(err error) {
t.mu.Lock() track.mu.Lock()
t.err = err track.err = err
handler := t.onErrorHandler handler := track.onErrorHandler
t.mu.Unlock() track.mu.Unlock()
if handler != nil { if handler != nil {
t.endOnce.Do(func() { track.endOnce.Do(func() {
handler(err) handler(err)
}) })
} }
} }
// start starts the data flow from the driver all the way to the localTrack func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sampler func(*webrtc.Track) samplerFunc) (*webrtc.Track, error) {
func (t *track) start() { track.mu.Lock()
for { defer track.mu.Unlock()
buff, err := t.encoder.Read()
webrtcTrack, err := pc.NewTrack(selectedCodec.PayloadType, rand.Uint32(), track.ID(), selectedCodec.MimeType)
if err != nil {
return nil, err
}
sample := sampler(webrtcTrack)
signalCh := make(chan chan<- struct{})
track.activePeerConnections[pc] = signalCh
fmt.Println("Binding")
go func() {
var doneCh chan<- struct{}
defer func() {
encodedReader.Close()
// When there's another call to unbind, it won't block since we mark the signalCh to be closed
close(signalCh)
if doneCh != nil {
close(doneCh)
}
}()
for {
select {
case doneCh = <-signalCh:
return
default:
}
buff, _, err := encodedReader.Read()
if err != nil {
track.onError(err)
return
}
if err := sample(buff); err != nil {
track.onError(err)
return
}
}
}()
return webrtcTrack, nil
}
func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error {
track.mu.Lock()
defer track.mu.Unlock()
ch, ok := track.activePeerConnections[pc]
if !ok {
return errNotFoundPeerConnection
}
doneCh := make(chan struct{})
ch <- doneCh
<-doneCh
delete(track.activePeerConnections, pc)
return nil
}
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
if err := d.Open(); err != nil {
return nil, err
}
switch recorder := d.(type) {
case driver.VideoRecorder:
return newVideoTrackFromDriver(d, recorder, constraints, selector)
case driver.AudioRecorder:
return newAudioTrackFromDriver(d, recorder, constraints, selector)
default:
panic(errInvalidDriverType)
}
}
// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate.
type VideoTrack struct {
*baseTrack
*video.Broadcaster
}
// NewVideoTrack constructs a new VideoTrack
func NewVideoTrack(source VideoSource, selector *CodecSelector) Track {
return newVideoTrackFromReader(source, source, selector)
}
func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, VideoInput, selector)
wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) {
img, _, err = reader.Read()
if err != nil { if err != nil {
t.onError(err) base.onError(err)
return
} }
return img, func() {}, err
})
if err := t.sample(buff); err != nil { // TODO: Allow users to configure broadcaster
t.onError(err) broadcaster := video.NewBroadcaster(wrappedReader, nil)
return
} return &VideoTrack{
baseTrack: base,
Broadcaster: broadcaster,
} }
} }
// Stop stops the underlying driver and encoder // newVideoTrackFromDriver is an internal video track creation from driver
func (t *track) Stop() { func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
t.d.Close() reader, err := recorder.VideoRecord(constraints.selectedMedia)
t.encoder.Close()
}
func (t *track) Track() *webrtc.Track {
return t.localTrack.(*webrtc.Track)
}
func (t *track) LocalTrack() LocalTrack {
return t.localTrack
}
// encoderBuilder is a generic encoder builder that acts as a delegator for codec.VideoEncoderBuilder and
// codec.AudioEncoderBuilder. The idea of having a delegator is to reduce redundant codes that are being
// duplicated for managing video and audio.
type encoderBuilder struct {
name string
build func() (codec.ReadCloser, error)
}
// newVideoEncoderBuilders transforms video given by VideoRecorder with the video transformer that is passed through
// constraints and create a list of generic encoder builders
func newVideoEncoderBuilders(vr driver.VideoRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := vr.VideoRecord(constraints.selectedMedia)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if constraints.VideoTransform != nil { return newVideoTrackFromReader(d, reader, selector), nil
r = constraints.VideoTransform(r)
}
encoderBuilders := make([]encoderBuilder, len(constraints.VideoEncoderBuilders))
for i, b := range constraints.VideoEncoderBuilders {
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildVideoEncoder(r, constraints.selectedMedia)
}
}
return encoderBuilders, nil
} }
// newAudioEncoderBuilders transforms audio given by AudioRecorder with the audio transformer that is passed through // Transform transforms the underlying source by applying the given fns in serial order
// constraints and create a list of generic encoder builders func (track *VideoTrack) Transform(fns ...video.TransformFunc) {
func newAudioEncoderBuilders(ar driver.AudioRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) { src := track.Broadcaster.Source()
r, err := ar.AudioRecord(constraints.selectedMedia) track.Broadcaster.ReplaceSource(video.Merge(fns...)(src))
}
func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if constraints.AudioTransform != nil { wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo)
r = constraints.AudioTransform(r) fmt.Println(wantCodecs)
fmt.Println(&inputProp)
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
} }
encoderBuilders := make([]encoderBuilder, len(constraints.AudioEncoderBuilders)) return track.bind(pc, encodedReader, selectedCodec, newVideoSampler)
for i, b := range constraints.AudioEncoderBuilders { }
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) { func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
return b.BuildAudioEncoder(r, constraints.selectedMedia) return track.unbind(pc)
} }
}
return encoderBuilders, nil // AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
// manipulate.
type AudioTrack struct {
*baseTrack
*audio.Broadcaster
}
// NewAudioTrack constructs a new VideoTrack
func NewAudioTrack(source AudioSource, selector *CodecSelector) Track {
return newAudioTrackFromReader(source, source, selector)
}
func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, AudioInput, selector)
wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) {
chunk, _, err = reader.Read()
if err != nil {
base.onError(err)
}
return chunk, func() {}, err
})
// TODO: Allow users to configure broadcaster
broadcaster := audio.NewBroadcaster(wrappedReader, nil)
return &AudioTrack{
baseTrack: base,
Broadcaster: broadcaster,
}
}
// newAudioTrackFromDriver is an internal audio track creation from driver
func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
reader, err := recorder.AudioRecord(constraints.selectedMedia)
if err != nil {
return nil, err
}
return newAudioTrackFromReader(d, reader, selector), nil
}
// Transform transforms the underlying source by applying the given fns in serial order
func (track *AudioTrack) Transform(fns ...audio.TransformFunc) {
src := track.Broadcaster.Source()
track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src))
}
func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio)
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
}
return track.bind(pc, encodedReader, selectedCodec, func(t *webrtc.Track) samplerFunc { return newAudioSampler(t, inputProp.Latency) })
}
func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
} }

View File

@@ -10,7 +10,7 @@ func TestOnEnded(t *testing.T) {
errExpected := errors.New("an error") errExpected := errors.New("an error")
t.Run("ErrorAfterRegister", func(t *testing.T) { t.Run("ErrorAfterRegister", func(t *testing.T) {
tr := &track{} tr := &baseTrack{}
called := make(chan error, 1) called := make(chan error, 1)
tr.OnEnded(func(error) { tr.OnEnded(func(error) {
@@ -35,7 +35,7 @@ func TestOnEnded(t *testing.T) {
}) })
t.Run("ErrorBeforeRegister", func(t *testing.T) { t.Run("ErrorBeforeRegister", func(t *testing.T) {
tr := &track{} tr := &baseTrack{}
tr.onError(errExpected) tr.onError(errExpected)