mirror of
https://github.com/pion/mediadevices.git
synced 2025-09-27 04:46:10 +08:00
367 lines
10 KiB
Go
367 lines
10 KiB
Go
package mediadevices
|
|
|
|
import (
|
|
"errors"
|
|
"image"
|
|
"math/rand"
|
|
"sync"
|
|
|
|
"github.com/pion/mediadevices/pkg/codec"
|
|
"github.com/pion/mediadevices/pkg/driver"
|
|
"github.com/pion/mediadevices/pkg/io/audio"
|
|
"github.com/pion/mediadevices/pkg/io/video"
|
|
"github.com/pion/mediadevices/pkg/wave"
|
|
"github.com/pion/rtp"
|
|
"github.com/pion/webrtc/v2"
|
|
"github.com/pion/webrtc/v2/pkg/media"
|
|
)
|
|
|
|
var (
|
|
errInvalidDriverType = errors.New("invalid driver type")
|
|
errNotFoundPeerConnection = errors.New("failed to find given peer connection")
|
|
)
|
|
|
|
// Source is a generic representation of a media source
|
|
type Source interface {
|
|
ID() string
|
|
Close() error
|
|
}
|
|
|
|
// VideoSource is a specific type of media source that emits a series of video frames
|
|
type VideoSource interface {
|
|
video.Reader
|
|
Source
|
|
}
|
|
|
|
// AudioSource is a specific type of media source that emits a series of audio chunks
|
|
type AudioSource interface {
|
|
audio.Reader
|
|
Source
|
|
}
|
|
|
|
// Track is an interface that represent MediaStreamTrack
|
|
// Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack
|
|
type Track interface {
|
|
Source
|
|
// OnEnded registers a handler to receive an error from the media stream track.
|
|
// If the error is already occured before registering, the handler will be
|
|
// immediately called.
|
|
OnEnded(func(error))
|
|
Kind() MediaDeviceType
|
|
// Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind
|
|
// call will happen automatically after the SDP negotiation. Users won't need to call this manually.
|
|
Bind(*webrtc.PeerConnection) (*webrtc.Track, error)
|
|
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
|
|
// be called automatically in the future.
|
|
Unbind(*webrtc.PeerConnection) error
|
|
}
|
|
|
|
type baseTrack struct {
|
|
Source
|
|
err error
|
|
onErrorHandler func(error)
|
|
mu sync.Mutex
|
|
endOnce sync.Once
|
|
kind MediaDeviceType
|
|
selector *CodecSelector
|
|
activePeerConnections map[*webrtc.PeerConnection]chan<- chan<- struct{}
|
|
}
|
|
|
|
func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack {
|
|
return &baseTrack{
|
|
Source: source,
|
|
kind: kind,
|
|
selector: selector,
|
|
activePeerConnections: make(map[*webrtc.PeerConnection]chan<- chan<- struct{}),
|
|
}
|
|
}
|
|
|
|
// Kind returns track's kind
|
|
func (track *baseTrack) Kind() MediaDeviceType {
|
|
return track.kind
|
|
}
|
|
|
|
// OnEnded sets an error handler. When a track has been created and started, if an
|
|
// error occurs, handler will get called with the error given to the parameter.
|
|
func (track *baseTrack) OnEnded(handler func(error)) {
|
|
track.mu.Lock()
|
|
track.onErrorHandler = handler
|
|
err := track.err
|
|
track.mu.Unlock()
|
|
|
|
if err != nil && handler != nil {
|
|
// Already errored.
|
|
track.endOnce.Do(func() {
|
|
handler(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
// onError is a callback when an error occurs
|
|
func (track *baseTrack) onError(err error) {
|
|
track.mu.Lock()
|
|
track.err = err
|
|
handler := track.onErrorHandler
|
|
track.mu.Unlock()
|
|
|
|
if handler != nil {
|
|
track.endOnce.Do(func() {
|
|
handler(err)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sample samplerFunc) (*webrtc.Track, error) {
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
webrtcTrack, err := pc.NewTrack(selectedCodec.PayloadType, rand.Uint32(), track.ID(), selectedCodec.MimeType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
signalCh := make(chan chan<- struct{})
|
|
track.activePeerConnections[pc] = signalCh
|
|
|
|
go func() {
|
|
var doneCh chan<- struct{}
|
|
defer func() {
|
|
encodedReader.Close()
|
|
|
|
// When there's another call to unbind, it won't block since we mark the signalCh to be closed
|
|
close(signalCh)
|
|
if doneCh != nil {
|
|
close(doneCh)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case doneCh = <-signalCh:
|
|
return
|
|
default:
|
|
}
|
|
|
|
buff, _, err := encodedReader.Read()
|
|
if err != nil {
|
|
track.onError(err)
|
|
return
|
|
}
|
|
|
|
sampleCount := sample()
|
|
err = webrtcTrack.WriteSample(media.Sample{
|
|
Data: buff,
|
|
Samples: sampleCount,
|
|
})
|
|
if err != nil {
|
|
track.onError(err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
return webrtcTrack, nil
|
|
}
|
|
|
|
func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error {
|
|
track.mu.Lock()
|
|
defer track.mu.Unlock()
|
|
|
|
ch, ok := track.activePeerConnections[pc]
|
|
if !ok {
|
|
return errNotFoundPeerConnection
|
|
}
|
|
|
|
doneCh := make(chan struct{})
|
|
ch <- doneCh
|
|
<-doneCh
|
|
delete(track.activePeerConnections, pc)
|
|
return nil
|
|
}
|
|
|
|
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
if err := d.Open(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
switch recorder := d.(type) {
|
|
case driver.VideoRecorder:
|
|
return newVideoTrackFromDriver(d, recorder, constraints, selector)
|
|
case driver.AudioRecorder:
|
|
return newAudioTrackFromDriver(d, recorder, constraints, selector)
|
|
default:
|
|
panic(errInvalidDriverType)
|
|
}
|
|
}
|
|
|
|
// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate.
|
|
type VideoTrack struct {
|
|
*baseTrack
|
|
*video.Broadcaster
|
|
}
|
|
|
|
// NewVideoTrack constructs a new VideoTrack
|
|
func NewVideoTrack(source VideoSource, selector *CodecSelector) Track {
|
|
return newVideoTrackFromReader(source, source, selector)
|
|
}
|
|
|
|
func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track {
|
|
base := newBaseTrack(source, VideoInput, selector)
|
|
wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) {
|
|
img, _, err = reader.Read()
|
|
if err != nil {
|
|
base.onError(err)
|
|
}
|
|
return img, func() {}, err
|
|
})
|
|
|
|
// TODO: Allow users to configure broadcaster
|
|
broadcaster := video.NewBroadcaster(wrappedReader, nil)
|
|
|
|
return &VideoTrack{
|
|
baseTrack: base,
|
|
Broadcaster: broadcaster,
|
|
}
|
|
}
|
|
|
|
// newVideoTrackFromDriver is an internal video track creation from driver
|
|
func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
reader, err := recorder.VideoRecord(constraints.selectedMedia)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return newVideoTrackFromReader(d, reader, selector), nil
|
|
}
|
|
|
|
// Transform transforms the underlying source by applying the given fns in serial order
|
|
func (track *VideoTrack) Transform(fns ...video.TransformFunc) {
|
|
src := track.Broadcaster.Source()
|
|
track.Broadcaster.ReplaceSource(video.Merge(fns...)(src))
|
|
}
|
|
|
|
func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
|
|
reader := track.NewReader(false)
|
|
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo)
|
|
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(reader, inputProp, wantCodecs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return track.bind(pc, encodedReader, selectedCodec, newVideoSampler(selectedCodec.ClockRate))
|
|
}
|
|
|
|
func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
|
|
return track.unbind(pc)
|
|
}
|
|
|
|
func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReader, error) {
|
|
reader := track.NewReader(false)
|
|
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sample := newVideoSampler(selectedCodec.ClockRate)
|
|
|
|
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
|
|
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
|
|
|
|
return &rtpReaderImpl{
|
|
readFn: func() ([]*rtp.Packet, func(), error) {
|
|
encoded, release, err := encodedReader.Read()
|
|
if err != nil {
|
|
encodedReader.Close()
|
|
track.onError(err)
|
|
return nil, func() {}, err
|
|
}
|
|
defer release()
|
|
|
|
samples := sample()
|
|
pkts := packetizer.Packetize(encoded, samples)
|
|
return pkts, release, err
|
|
},
|
|
closeFn: encodedReader.Close,
|
|
}, nil
|
|
}
|
|
|
|
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
|
|
// manipulate.
|
|
type AudioTrack struct {
|
|
*baseTrack
|
|
*audio.Broadcaster
|
|
}
|
|
|
|
// NewAudioTrack constructs a new VideoTrack
|
|
func NewAudioTrack(source AudioSource, selector *CodecSelector) Track {
|
|
return newAudioTrackFromReader(source, source, selector)
|
|
}
|
|
|
|
func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track {
|
|
base := newBaseTrack(source, AudioInput, selector)
|
|
wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) {
|
|
chunk, _, err = reader.Read()
|
|
if err != nil {
|
|
base.onError(err)
|
|
}
|
|
return chunk, func() {}, err
|
|
})
|
|
|
|
// TODO: Allow users to configure broadcaster
|
|
broadcaster := audio.NewBroadcaster(wrappedReader, nil)
|
|
|
|
return &AudioTrack{
|
|
baseTrack: base,
|
|
Broadcaster: broadcaster,
|
|
}
|
|
}
|
|
|
|
// newAudioTrackFromDriver is an internal audio track creation from driver
|
|
func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
|
|
reader, err := recorder.AudioRecord(constraints.selectedMedia)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// FIXME: The current audio detection and audio encoder can only work with a static latency. Since the latency from the driver
|
|
// can fluctuate, we need to stabilize it. Maybe there's a better way for doing this?
|
|
reader = audio.NewBuffer(int(constraints.selectedMedia.Latency.Seconds() * float64(constraints.selectedMedia.SampleRate)))(reader)
|
|
return newAudioTrackFromReader(d, reader, selector), nil
|
|
}
|
|
|
|
// Transform transforms the underlying source by applying the given fns in serial order
|
|
func (track *AudioTrack) Transform(fns ...audio.TransformFunc) {
|
|
src := track.Broadcaster.Source()
|
|
track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src))
|
|
}
|
|
|
|
func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
|
|
reader := track.NewReader(false)
|
|
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio)
|
|
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(reader, inputProp, wantCodecs...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return track.bind(pc, encodedReader, selectedCodec, newAudioSampler(selectedCodec.ClockRate, inputProp.Latency))
|
|
}
|
|
|
|
func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
|
|
return track.unbind(pc)
|
|
}
|