Refractor track to be more DRY

Resolves https://github.com/pion/mediadevices/issues/141

 * Remove newVideoTrack and newAudioTrack
 * Add a generic encoderBuilder struct
 * Add newVideoEncoderBuilders and newAudioEncoderBuilders helpers
 * Update sampler to be functional and add support for audio sampler
 * Reduce boilerplates by using closure
This commit is contained in:
Lukas Herman
2020-04-18 15:07:41 -04:00
parent 38deddc4f0
commit 8127ce3be6
3 changed files with 157 additions and 202 deletions

View File

@@ -230,7 +230,7 @@ func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newAudioTrack(&m.MediaDevicesOptions, d, c)
return newTrack(&m.MediaDevicesOptions, d, c)
}
func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, error) {
typeFilter := driver.FilterVideoRecorder()
@@ -246,7 +246,7 @@ func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newVideoTrack(&m.MediaDevicesOptions, d, c)
return newTrack(&m.MediaDevicesOptions, d, c)
}
func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, error) {
@@ -263,7 +263,7 @@ func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newVideoTrack(&m.MediaDevicesOptions, d, c)
return newTrack(&m.MediaDevicesOptions, d, c)
}
func (m *mediaDevices) EnumerateDevices() []MediaDeviceInfo {

View File

@@ -1,31 +1,35 @@
package mediadevices
import (
"math"
"time"
"github.com/pion/webrtc/v2/pkg/media"
)
type sampler struct {
track LocalTrack
clockRate float64
lastTimestamp time.Time
type samplerFunc func(b []byte) error
// newVideoSampler creates a video sampler that uses the actual video frame rate and
// the codec's clock rate to come up with a duration for each sample.
func newVideoSampler(t LocalTrack) samplerFunc {
clockRate := float64(t.Codec().ClockRate)
lastTimestamp := time.Now()
return samplerFunc(func(b []byte) error {
now := time.Now()
duration := now.Sub(lastTimestamp).Seconds()
samples := uint32(math.Round(clockRate * duration))
lastTimestamp = now
return t.WriteSample(media.Sample{Data: b, Samples: samples})
})
}
func newSampler(track LocalTrack) *sampler {
return &sampler{
track: track,
clockRate: float64(track.Codec().ClockRate),
lastTimestamp: time.Now(),
}
}
func (s *sampler) sample(b []byte) error {
now := time.Now()
duration := now.Sub(s.lastTimestamp).Seconds()
samples := uint32(s.clockRate * duration)
s.lastTimestamp = now
sample := media.Sample{Data: b, Samples: samples}
return s.track.WriteSample(sample)
// newAudioSampler creates a audio sampler that uses a fixed latency and
// the codec's clock rate to come up with a duration for each sample.
func newAudioSampler(t LocalTrack, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds()))
return samplerFunc(func(b []byte) error {
return t.WriteSample(media.Sample{Data: b, Samples: samples})
})
}

309
track.go
View File

@@ -32,8 +32,10 @@ type LocalTrack interface {
}
type track struct {
t LocalTrack
s *sampler
localTrack LocalTrack
d driver.Driver
sample samplerFunc
encoder codec.ReadCloser
onErrorHandler func(error)
err error
@@ -41,28 +43,82 @@ type track struct {
endOnce sync.Once
}
func newTrack(selectedCodec *webrtc.RTPCodec, trackGenerator TrackGenerator, d driver.Driver) (*track, error) {
if selectedCodec == nil {
panic("codec is required")
}
func newTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*track, error) {
var encoderBuilders []encoderBuilder
var rtpCodecs []*webrtc.RTPCodec
var buildSampler func(t LocalTrack) samplerFunc
var err error
t, err := trackGenerator(
selectedCodec.PayloadType,
rand.Uint32(),
d.ID(),
selectedCodec.Type.String(),
selectedCodec,
)
err = d.Open()
if err != nil {
return nil, err
}
return &track{
t: t,
s: newSampler(t),
}, nil
switch r := d.(type) {
case driver.VideoRecorder:
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeVideo]
buildSampler = newVideoSampler
encoderBuilders, err = newVideoEncoderBuilders(r, constraints)
case driver.AudioRecorder:
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeAudio]
buildSampler = func(t LocalTrack) samplerFunc {
return newAudioSampler(t, constraints.Latency)
}
encoderBuilders, err = newAudioEncoderBuilders(r, constraints)
default:
err = fmt.Errorf("newTrack: invalid driver type")
}
if err != nil {
d.Close()
return nil, err
}
for _, builder := range encoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == builder.name {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
localTrack, err := opts.trackGenerator(
matchedRTPCodec.PayloadType,
rand.Uint32(),
d.ID(),
matchedRTPCodec.Type.String(),
matchedRTPCodec,
)
if err != nil {
continue
}
encoder, err := builder.build()
if err != nil {
continue
}
t := track{
localTrack: localTrack,
sample: buildSampler(localTrack),
d: d,
encoder: encoder,
}
go t.start()
return &t, nil
}
d.Close()
return nil, fmt.Errorf("newTrack: failed to find a matching codec")
}
// OnEnded sets an error handler. When a track has been created and started, if an
// error occurs, handler will get called with the error given to the parameter.
func (t *track) OnEnded(handler func(error)) {
t.mu.Lock()
t.onErrorHandler = handler
@@ -77,6 +133,7 @@ func (t *track) OnEnded(handler func(error)) {
}
}
// onError is a callback when an error occurs
func (t *track) onError(err error) {
t.mu.Lock()
t.err = err
@@ -90,30 +147,55 @@ func (t *track) onError(err error) {
}
}
// start starts the data flow from the driver all the way to the localTrack
func (t *track) start() {
var n int
var err error
buff := make([]byte, 1024)
for {
n, err = t.encoder.Read(buff)
if err != nil {
if e, ok := err.(*mio.InsufficientBufferError); ok {
buff = make([]byte, 2*e.RequiredSize)
continue
}
t.onError(err)
return
}
if err := t.sample(buff[:n]); err != nil {
t.onError(err)
return
}
}
}
// Stop stops the underlying driver and encoder
func (t *track) Stop() {
t.d.Close()
t.encoder.Close()
}
func (t *track) Track() *webrtc.Track {
return t.t.(*webrtc.Track)
return t.localTrack.(*webrtc.Track)
}
func (t *track) LocalTrack() LocalTrack {
return t.t
return t.localTrack
}
type videoTrack struct {
*track
d driver.Driver
constraints MediaTrackConstraints
encoder codec.ReadCloser
// encoderBuilder is a generic encoder builder that acts as a delegator for codec.VideoEncoderBuilder and
// codec.AudioEncoderBuilder. The idea of having a delegator is to reduce redundant codes that are being
// duplicated for managing video and audio.
type encoderBuilder struct {
name string
build func() (codec.ReadCloser, error)
}
var _ Tracker = &videoTrack{}
func newVideoTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*videoTrack, error) {
err := d.Open()
if err != nil {
return nil, err
}
vr := d.(driver.VideoRecorder)
// newVideoEncoderBuilders transforms video given by VideoRecorder with the video transformer that is passed through
// constraints and create a list of generic encoder builders
func newVideoEncoderBuilders(vr driver.VideoRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := vr.VideoRecord(constraints.Media)
if err != nil {
return nil, err
@@ -123,93 +205,19 @@ func newVideoTrack(opts *MediaDevicesOptions, d driver.Driver, constraints Media
r = constraints.VideoTransform(r)
}
var vt *videoTrack
rtpCodecs := opts.codecs[webrtc.RTPCodecTypeVideo]
for _, codecBuilder := range constraints.VideoEncoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == codecBuilder.Name() {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
t, err := newTrack(matchedRTPCodec, opts.trackGenerator, d)
if err != nil {
continue
}
encoder, err := codecBuilder.BuildVideoEncoder(r, constraints.Media)
if err != nil {
continue
}
vt = &videoTrack{
track: t,
d: d,
constraints: constraints,
encoder: encoder,
}
break
}
if vt == nil {
d.Close()
return nil, fmt.Errorf("failed to find a matching video codec")
}
go vt.start()
return vt, nil
}
func (vt *videoTrack) start() {
var n int
var err error
buff := make([]byte, 1024)
for {
n, err = vt.encoder.Read(buff)
if err != nil {
if e, ok := err.(*mio.InsufficientBufferError); ok {
buff = make([]byte, 2*e.RequiredSize)
continue
}
vt.track.onError(err)
return
}
if err := vt.s.sample(buff[:n]); err != nil {
vt.track.onError(err)
return
encoderBuilders := make([]encoderBuilder, len(constraints.VideoEncoderBuilders))
for i, b := range constraints.VideoEncoderBuilders {
encoderBuilders[i].name = b.Name()
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildVideoEncoder(r, constraints.Media)
}
}
return encoderBuilders, nil
}
func (vt *videoTrack) Stop() {
vt.d.Close()
vt.encoder.Close()
}
type audioTrack struct {
*track
d driver.Driver
constraints MediaTrackConstraints
encoder codec.ReadCloser
}
var _ Tracker = &audioTrack{}
func newAudioTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*audioTrack, error) {
err := d.Open()
if err != nil {
return nil, err
}
ar := d.(driver.AudioRecorder)
// newAudioEncoderBuilders transforms audio given by AudioRecorder with the audio transformer that is passed through
// constraints and create a list of generic encoder builders
func newAudioEncoderBuilders(ar driver.AudioRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := ar.AudioRecord(constraints.Media)
if err != nil {
return nil, err
@@ -219,69 +227,12 @@ func newAudioTrack(opts *MediaDevicesOptions, d driver.Driver, constraints Media
r = constraints.AudioTransform(r)
}
var at *audioTrack
rtpCodecs := opts.codecs[webrtc.RTPCodecTypeAudio]
for _, codecBuilder := range constraints.AudioEncoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == codecBuilder.Name() {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
t, err := newTrack(matchedRTPCodec, opts.trackGenerator, d)
if err != nil {
continue
}
encoder, err := codecBuilder.BuildAudioEncoder(r, constraints.Media)
if err != nil {
continue
}
at = &audioTrack{
track: t,
d: d,
constraints: constraints,
encoder: encoder,
encoderBuilders := make([]encoderBuilder, len(constraints.AudioEncoderBuilders))
for i, b := range constraints.AudioEncoderBuilders {
encoderBuilders[i].name = b.Name()
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildAudioEncoder(r, constraints.Media)
}
}
if at == nil {
d.Close()
return nil, fmt.Errorf("failed to find a matching audio codec")
}
go at.start()
return at, nil
}
func (t *audioTrack) start() {
buff := make([]byte, 1024)
sampleSize := uint32(float64(t.constraints.SampleRate) * t.constraints.Latency.Seconds())
for {
n, err := t.encoder.Read(buff)
if err != nil {
t.track.onError(err)
return
}
if err := t.t.WriteSample(media.Sample{
Data: buff[:n],
Samples: sampleSize,
}); err != nil {
t.track.onError(err)
return
}
}
}
func (t *audioTrack) Stop() {
t.d.Close()
t.encoder.Close()
return encoderBuilders, nil
}