Compare commits

...

31 Commits

Author SHA1 Message Date
Lukas Herman
dda8d2502f Use square instead 2020-11-05 21:03:22 -08:00
Lukas Herman
d593404e39 WIP 2020-11-04 22:34:17 -08:00
Lukas Herman
3ea35bebab Fix broken mediastream unit test 2020-11-02 23:05:59 -08:00
Lukas Herman
83c08e6c5f Recreate facedetection example with the new APIs 2020-11-02 23:04:58 -08:00
Lukas Herman
2f17017450 Rename rtp-send to rtp 2020-11-02 22:31:46 -08:00
Lukas Herman
7cbda134b0 Add NewRTPReader to Track interface 2020-11-02 22:28:01 -08:00
Lukas Herman
115be126ec Add documentation around Reader interfaces 2020-11-02 22:22:19 -08:00
Lukas Herman
79dcb4f1af Add video and audio RTP readers 2020-11-02 22:12:43 -08:00
Lukas Herman
5db4007e73 Enable non-webrtc sampler 2020-11-01 15:31:36 -08:00
Lukas Herman
77ebcecac6 Add codec selector by string names 2020-11-01 15:14:08 -08:00
Renovate Bot
a0d0949954 Update golang.org/x/sys commit hash to 201ba4d
Generated by Renovate Bot
2020-11-01 01:12:10 -07:00
Lukas Herman
f396092609 Default high profile for x264 when possible 2020-11-01 01:08:03 -07:00
Lukas Herman
ee6cf08c44 Use miniaudio in favor of implementing ourselves
miniaudio supports various backends. This will help reducing code
surface
2020-11-01 00:48:54 -07:00
Lukas Herman
6a211aa19f Use x264 as default in example 2020-11-01 00:38:42 -07:00
Lukas Herman
b089610c27 Fix incorrect audio latency 2020-11-01 00:32:20 -07:00
Lukas Herman
1d34ec9c5d Fix broken vpx example 2020-10-31 23:55:46 -07:00
Lukas Herman
7bd3efc8b7 Fix broken conditional build 2020-10-31 11:19:02 -07:00
Lukas Herman
8396fd7aac Add an end-to-end benchmark 2020-10-31 10:35:53 -07:00
Lukas Herman
3787158dba WIP 2020-10-31 01:12:14 -07:00
Lukas Herman
640eeb0cc0 Fix panic when printing non-reference types 2020-10-30 17:50:55 -07:00
Lukas Herman
16ceb45c25 Replace <nil> -> any in prop pretty format 2020-10-30 17:42:07 -07:00
Lukas Herman
c98b3b0909 Enhance driver discovery logging 2020-10-30 17:32:40 -07:00
Lukas Herman
e6c98a844f Remove unused fmt 2020-10-30 10:01:07 -07:00
Lukas Herman
2a70c031b8 Remove unwanted logging 2020-10-30 09:04:51 -07:00
Lukas Herman
047013be95 Fix division by 0 when sample rate is not filled 2020-10-30 08:58:46 -07:00
Lukas Herman
765318feb6 Fix webrtc example 2020-10-30 01:19:03 -07:00
Lukas Herman
af6d31fde5 Revert go.mod 2020-10-30 00:37:38 -07:00
Lukas Herman
2f5e4ee914 New mediadevices design
Changelog:
  * Better support for non-webrtc use cases
  * Enable multiple readers
  * Enhance codec selectors
  * Update APIs to reflect on the new v3 webrtc design
  * Cleaner APIs
2020-10-30 00:33:55 -07:00
Lukas Herman
1720eee38c Fix unpropagated audio sampling rate from microphones 2020-10-29 22:41:10 -07:00
Lukas Herman
00877c74a0 Add audio latency detection 2020-10-29 22:37:29 -07:00
Lukas Herman
559c6a13a1 Update readers to be memory pool friendly 2020-10-29 00:04:12 -07:00
74 changed files with 1626 additions and 1297 deletions

135
codec.go Normal file
View File

@@ -0,0 +1,135 @@
package mediadevices
import (
"errors"
"fmt"
"strings"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
)
// CodecSelector is a container of video and audio encoder builders, which later will be used
// for codec matching.
type CodecSelector struct {
videoEncoders []codec.VideoEncoderBuilder
audioEncoders []codec.AudioEncoderBuilder
}
// CodecSelectorOption is a type for specifying CodecSelector options
type CodecSelectorOption func(*CodecSelector)
// WithVideoEncoders replace current video codecs with listed encoders
func WithVideoEncoders(encoders ...codec.VideoEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.videoEncoders = encoders
}
}
// WithVideoEncoders replace current audio codecs with listed encoders
func WithAudioEncoders(encoders ...codec.AudioEncoderBuilder) CodecSelectorOption {
return func(t *CodecSelector) {
t.audioEncoders = encoders
}
}
// NewCodecSelector constructs CodecSelector with given variadic options
func NewCodecSelector(opts ...CodecSelectorOption) *CodecSelector {
var track CodecSelector
for _, opt := range opts {
opt(&track)
}
return &track
}
// Populate lets the webrtc engine be aware of supported codecs that are contained in CodecSelector
func (selector *CodecSelector) Populate(setting *webrtc.MediaEngine) {
for _, encoder := range selector.videoEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
for _, encoder := range selector.audioEncoders {
setting.RegisterCodec(encoder.RTPCodec().RTPCodec)
}
}
func (selector *CodecSelector) selectVideoCodecByNames(reader video.Reader, inputProp prop.Media, codecNames ...string) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.VideoEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range codecNames {
for _, encoder := range selector.videoEncoders {
if encoder.RTPCodec().Name == wantCodec {
encodedReader, err = encoder.BuildVideoEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectVideoCodec(reader video.Reader, inputProp prop.Media, codecs ...*webrtc.RTPCodec) (codec.ReadCloser, *codec.RTPCodec, error) {
var codecNames []string
for _, codec := range codecs {
codecNames = append(codecNames, codec.Name)
}
return selector.selectVideoCodecByNames(reader, inputProp, codecNames...)
}
func (selector *CodecSelector) selectAudioCodecByNames(reader audio.Reader, inputProp prop.Media, codecNames ...string) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.AudioEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range codecNames {
for _, encoder := range selector.audioEncoders {
if encoder.RTPCodec().Name == wantCodec {
encodedReader, err = encoder.BuildAudioEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
break outer
}
}
errReasons = append(errReasons, fmt.Sprintf("%s: %s", encoder.RTPCodec().Name, err))
}
}
if selectedEncoder == nil {
return nil, nil, errors.New(strings.Join(errReasons, "\n\n"))
}
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectAudioCodec(reader audio.Reader, inputProp prop.Media, codecs ...*webrtc.RTPCodec) (codec.ReadCloser, *codec.RTPCodec, error) {
var codecNames []string
for _, codec := range codecs {
codecNames = append(codecNames, codec.Name)
}
return selector.selectAudioCodecByNames(reader, inputProp, codecNames...)
}

Binary file not shown.

View File

@@ -0,0 +1,191 @@
package main
import (
"fmt"
"image"
"io/ioutil"
"log"
"net"
"os"
pigo "github.com/esimov/pigo/core"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use h264 video encoder
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
const (
confidenceLevel = 9.5
mtu = 1000
thickness = 5
)
var (
cascade []byte
classifier *pigo.Pigo
)
func must(err error) {
if err != nil {
panic(err)
}
}
func detectFaces(frame *image.YCbCr) []pigo.Detection {
bounds := frame.Bounds()
cascadeParams := pigo.CascadeParams{
MinSize: 100,
MaxSize: 600,
ShiftFactor: 0.15,
ScaleFactor: 1.1,
ImageParams: pigo.ImageParams{
Pixels: frame.Y, // Y in YCbCr should be enough to detect faces
Rows: bounds.Dy(),
Cols: bounds.Dx(),
Dim: bounds.Dx(),
},
}
// Run the classifier over the obtained leaf nodes and return the detection results.
// The result contains quadruplets representing the row, column, scale and detection score.
dets := classifier.RunCascade(cascadeParams, 0.0)
// Calculate the intersection over union (IoU) of two clusters.
dets = classifier.ClusterDetections(dets, 0)
return dets
}
func drawRect(frame *image.YCbCr, x0, y0, size int) {
if x0 < 0 {
x0 = 0
}
if y0 < 0 {
y0 = 0
}
width := frame.Bounds().Dx()
height := frame.Bounds().Dy()
x1 := x0 + size
y1 := y0 + size
if x1 >= width {
x1 = width - 1
}
if y1 >= height {
y1 = height - 1
}
convert := func(x, y int) int {
return y*width + x
}
for x := x0; x < x1; x++ {
for t := 0; t < thickness; t++ {
frame.Y[convert(x, y0+t)] = 0
frame.Y[convert(x, y1-t)] = 0
}
}
for y := y0; y < y1; y++ {
for t := 0; t < thickness; t++ {
frame.Y[convert(x0+t, y)] = 0
frame.Y[convert(x1-t, y)] = 0
}
}
}
func detectFace(r video.Reader) video.Reader {
return video.ReaderFunc(func() (img image.Image, release func(), err error) {
img, release, err = r.Read()
if err != nil {
return
}
yuv := img.(*image.YCbCr)
dets := detectFaces(yuv)
for _, det := range dets {
if det.Q < confidenceLevel {
continue
}
drawRect(yuv, det.Col-det.Scale/2, det.Row-det.Scale/2, det.Scale)
}
return
})
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s host:port\n", os.Args[0])
return
}
dest := os.Args[1]
// prepare face detector
var err error
cascade, err = ioutil.ReadFile("facefinder")
if err != nil {
log.Fatalf("Error reading the cascade file: %s", err)
}
p := pigo.NewPigo()
// Unpack the binary file. This will return the number of cascade trees,
// the tree depth, the threshold and the prediction from tree's leaf nodes.
classifier, err = p.Unpack(cascade)
if err != nil {
log.Fatalf("Error unpacking the cascade file: %s", err)
}
vp8Params, err := vpx.NewVP8Params()
must(err)
vp8Params.BitRate = 1_000_000 // 100kbps
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
)
mediaStream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormatExact(frame.FormatUYVY)
c.Width = prop.Int(640)
c.Height = prop.Int(480)
},
Codec: codecSelector,
})
must(err)
// since we're trying to access the raw data, we need to cast Track to its real type, *mediadevices.VideoTrack
videoTrack := mediaStream.GetVideoTracks()[0].(*mediadevices.VideoTrack)
defer videoTrack.Close()
videoTrack.Transform(detectFace)
rtpReader, err := videoTrack.NewRTPReader(vp8Params.RTPCodec().Name, mtu)
must(err)
addr, err := net.ResolveUDPAddr("udp", dest)
must(err)
conn, err := net.DialUDP("udp", nil, addr)
must(err)
buff := make([]byte, mtu)
for {
pkts, release, err := rtpReader.Read()
must(err)
for _, pkt := range pkts {
n, err := pkt.MarshalTo(buff)
must(err)
_, err = conn.Write(buff[:n])
must(err)
}
release()
}
}

77
examples/http/main.go Normal file
View File

@@ -0,0 +1,77 @@
// This is an example of using mediadevices to broadcast your camera through http.
// The example doesn't aim to be performant, but rather it strives to be simple.
package main
import (
"bytes"
"fmt"
"image/jpeg"
"io"
"log"
"mime/multipart"
"net/http"
"net/textproto"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/prop"
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
// _ "github.com/pion/mediadevices/pkg/driver/videotest"
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
)
func must(err error) {
if err != nil {
panic(err)
}
}
func main() {
s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(constraint *mediadevices.MediaTrackConstraints) {
constraint.Width = prop.Int(600)
constraint.Height = prop.Int(400)
},
})
must(err)
t := s.GetVideoTracks()[0]
videoTrack := t.(*mediadevices.VideoTrack)
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
var buf bytes.Buffer
videoReader := videoTrack.NewReader(false)
mimeWriter := multipart.NewWriter(w)
contentType := fmt.Sprintf("multipart/x-mixed-replace;boundary=%s", mimeWriter.Boundary())
w.Header().Add("Content-Type", contentType)
partHeader := make(textproto.MIMEHeader)
partHeader.Add("Content-Type", "image/jpeg")
for {
frame, release, err := videoReader.Read()
if err == io.EOF {
return
}
must(err)
err = jpeg.Encode(&buf, frame, nil)
// Since we're done with img, we need to release img so that that the original owner can reuse
// this memory.
release()
must(err)
partWriter, err := mimeWriter.CreatePart(partHeader)
must(err)
_, err = partWriter.Write(buf.Bytes())
buf.Reset()
must(err)
}
})
fmt.Println("listening on http://localhost:1313")
log.Println(http.ListenAndServe("localhost:1313", nil))
}

30
examples/rtp/README.md Normal file
View File

@@ -0,0 +1,30 @@
## Instructions
### Download rtpexample
```
go get github.com/pion/mediadevices/examples/rtp
```
### Listen RTP
Install GStreamer and run:
```
gst-launch-1.0 udpsrc port=5000 caps=application/x-rtp,encode-name=VP8 \
! rtpvp8depay ! vp8dec ! videoconvert ! autovideosink
```
Or run VLC media plyer:
```
vlc ./vp8.sdp
```
### Run rtp
Run `rtp localhost:5000`
A video should start playing in your GStreamer or VLC window.
It's not WebRTC, but pure RTP.
Congrats, you have used pion-MediaDevices! Now start building something cool

76
examples/rtp/main.go Normal file
View File

@@ -0,0 +1,76 @@
package main
import (
"fmt"
"net"
"os"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/prop"
)
const (
mtu = 1000
)
func must(err error) {
if err != nil {
panic(err)
}
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s host:port\n", os.Args[0])
return
}
dest := os.Args[1]
vp8Params, err := vpx.NewVP8Params()
must(err)
vp8Params.BitRate = 100000 // 100kbps
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
)
mediaStream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
c.Width = prop.Int(640)
c.Height = prop.Int(480)
},
Codec: codecSelector,
})
must(err)
videoTrack := mediaStream.GetVideoTracks()[0]
defer videoTrack.Close()
rtpReader, err := videoTrack.NewRTPReader(vp8Params.RTPCodec().Name, mtu)
must(err)
addr, err := net.ResolveUDPAddr("udp", dest)
must(err)
conn, err := net.DialUDP("udp", nil, addr)
must(err)
buff := make([]byte, mtu)
for {
pkts, release, err := rtpReader.Read()
must(err)
for _, pkt := range pkts {
n, err := pkt.MarshalTo(buff)
must(err)
_, err = conn.Write(buff[:n])
must(err)
}
release()
}
}

9
examples/rtp/vp8.sdp Normal file
View File

@@ -0,0 +1,9 @@
v=0
o=- 1234567890 1234567890 IN IP4 0.0.0.0
s=RTP-Send Example
i=Example
c=IN IP4 0.0.0.0
t=0 0
a=recvonly
m=video 5000 RTP/AVP 100
a=rtpmap:100 VP8/90000

View File

@@ -5,19 +5,18 @@ import (
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/examples/internal/signal"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
// This is required to use opus audio encoder
"github.com/pion/mediadevices/pkg/codec/opus"
// If you don't like vpx, you can also use x264 by importing as below
// "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
// If you don't like x264, you can also use vpx by importing as below
// "github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
// or you can also use openh264 for alternative h264 implementation
// "github.com/pion/mediadevices/pkg/codec/openh264"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
// or if you use a raspberry pi like, you can use mmal for using its hardware encoder
// "github.com/pion/mediadevices/pkg/codec/mmal"
"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder
"github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
@@ -45,7 +44,23 @@ func main() {
signal.Decode(signal.MustReadStdin(), &offer)
// Create a new RTCPeerConnection
x264Params, err := x264.NewParams()
if err != nil {
panic(err)
}
x264Params.BitRate = 500_000 // 500kbps
opusParams, err := opus.NewParams()
if err != nil {
panic(err)
}
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&x264Params),
mediadevices.WithAudioEncoders(&opusParams),
)
mediaEngine := webrtc.MediaEngine{}
codecSelector.Populate(&mediaEngine)
if err := mediaEngine.PopulateFromSDP(offer); err != nil {
panic(err)
}
@@ -61,44 +76,33 @@ func main() {
fmt.Printf("Connection State has changed %s \n", connectionState.String())
})
md := mediadevices.NewMediaDevices(peerConnection)
opusParams, err := opus.NewParams()
if err != nil {
panic(err)
}
opusParams.BitRate = 32000 // 32kbps
vp8Params, err := vpx.NewVP8Params()
if err != nil {
panic(err)
}
vp8Params.BitRate = 100000 // 100kbps
s, err := md.GetUserMedia(mediadevices.MediaStreamConstraints{
Audio: func(c *mediadevices.MediaTrackConstraints) {
c.Enabled = true
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&opusParams}
},
s, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
c.Enabled = true
c.Width = prop.Int(640)
c.Height = prop.Int(480)
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&vp8Params}
},
Audio: func(c *mediadevices.MediaTrackConstraints) {
},
Codec: codecSelector,
})
if err != nil {
panic(err)
}
for _, tracker := range s.GetTracks() {
t := tracker.Track()
tracker.OnEnded(func(err error) {
fmt.Printf("Track (ID: %s, Label: %s) ended with error: %v\n",
t.ID(), t.Label(), err)
fmt.Printf("Track (ID: %s) ended with error: %v\n",
tracker.ID(), err)
})
_, err = peerConnection.AddTransceiverFromTrack(t,
// In Pion/webrtc v3, bind will be called automatically after SDP negotiation
webrtcTrack, err := tracker.Bind(peerConnection)
if err != nil {
panic(err)
}
_, err = peerConnection.AddTransceiverFromTrack(webrtcTrack,
webrtc.RtpTransceiverInit{
Direction: webrtc.RTPTransceiverDirectionSendonly,
},

4
go.mod
View File

@@ -6,8 +6,10 @@ require (
github.com/blackjack/webcam v0.0.0-20200313125108-10ed912a8539
github.com/jfreymuth/pulse v0.0.0-20201014123913-1e525c426c93
github.com/lherman-cs/opus v0.0.2
github.com/pion/logging v0.2.2
github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v2 v2.2.26
github.com/satori/go.uuid v1.2.0
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418
)

4
go.sum
View File

@@ -109,8 +109,8 @@ golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200724161237-0e2f3a69832c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418 h1:HlFl4V6pEMziuLXyRkm5BIYq1y1GAbb02pRlWvI54OM=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

View File

@@ -0,0 +1,11 @@
package logging
import (
"github.com/pion/logging"
)
var loggerFactory = logging.NewDefaultLoggerFactory()
func NewLogger(scope string) logging.LeveledLogger {
return loggerFactory.NewLogger(scope)
}

7
logging.go Normal file
View File

@@ -0,0 +1,7 @@
package mediadevices
import (
"github.com/pion/mediadevices/internal/logging"
)
var logger = logging.NewLogger("mediadevices")

View File

@@ -7,95 +7,26 @@ import (
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
)
var errNotFound = fmt.Errorf("failed to find the best driver that fits the constraints")
// MediaDevices is an interface that's defined on https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices
type MediaDevices interface {
GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error)
GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error)
EnumerateDevices() []MediaDeviceInfo
}
// NewMediaDevices creates MediaDevices interface that provides access to connected media input devices
// like cameras and microphones, as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevices(pc *webrtc.PeerConnection, opts ...MediaDevicesOption) MediaDevices {
codecs := make(map[webrtc.RTPCodecType][]*webrtc.RTPCodec)
for _, kind := range []webrtc.RTPCodecType{
webrtc.RTPCodecTypeAudio,
webrtc.RTPCodecTypeVideo,
} {
codecs[kind] = pc.GetRegisteredRTPCodecs(kind)
}
return NewMediaDevicesFromCodecs(codecs, opts...)
}
// NewMediaDevicesFromCodecs creates MediaDevices interface from lists of the available codecs
// that provides access to connected media input devices like cameras and microphones,
// as well as screen sharing.
// In essence, it lets you obtain access to any hardware source of media data.
func NewMediaDevicesFromCodecs(codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec, opts ...MediaDevicesOption) MediaDevices {
mdo := MediaDevicesOptions{
codecs: codecs,
trackGenerator: defaultTrackGenerator,
}
for _, o := range opts {
o(&mdo)
}
return &mediaDevices{
MediaDevicesOptions: mdo,
}
}
// TrackGenerator is a function to create new track.
type TrackGenerator func(payloadType uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error)
var defaultTrackGenerator = TrackGenerator(func(pt uint8, ssrc uint32, id, label string, codec *webrtc.RTPCodec) (LocalTrack, error) {
return webrtc.NewTrack(pt, ssrc, id, label, codec)
})
type mediaDevices struct {
MediaDevicesOptions
}
// MediaDevicesOptions stores parameters used by MediaDevices.
type MediaDevicesOptions struct {
codecs map[webrtc.RTPCodecType][]*webrtc.RTPCodec
trackGenerator TrackGenerator
}
// MediaDevicesOption is a type of MediaDevices functional option.
type MediaDevicesOption func(*MediaDevicesOptions)
// WithTrackGenerator specifies a TrackGenerator to use customized track.
func WithTrackGenerator(gen TrackGenerator) MediaDevicesOption {
return func(o *MediaDevicesOptions) {
o.trackGenerator = gen
}
}
// GetDisplayMedia prompts the user to select and grant permission to capture the contents
// of a display or portion thereof (such as a window) as a MediaStream.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getDisplayMedia
func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) {
trackers := make([]Tracker, 0)
func GetDisplayMedia(constraints MediaStreamConstraints) (MediaStream, error) {
trackers := make([]Track, 0)
cleanTrackers := func() {
for _, t := range trackers {
t.Stop()
t.Close()
}
}
var videoConstraints MediaTrackConstraints
if constraints.Video != nil {
constraints.Video(&videoConstraints)
}
if videoConstraints.Enabled {
tracker, err := m.selectScreen(videoConstraints)
tracker, err := selectScreen(videoConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -116,27 +47,20 @@ func (m *mediaDevices) GetDisplayMedia(constraints MediaStreamConstraints) (Medi
// GetUserMedia prompts the user for permission to use a media input which produces a MediaStream
// with tracks containing the requested types of media.
// Reference: https://developer.mozilla.org/en-US/docs/Web/API/MediaDevices/getUserMedia
func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) {
func GetUserMedia(constraints MediaStreamConstraints) (MediaStream, error) {
// TODO: It should return media stream based on constraints
trackers := make([]Tracker, 0)
trackers := make([]Track, 0)
cleanTrackers := func() {
for _, t := range trackers {
t.Stop()
t.Close()
}
}
var videoConstraints, audioConstraints MediaTrackConstraints
if constraints.Video != nil {
constraints.Video(&videoConstraints)
}
if constraints.Audio != nil {
constraints.Audio(&audioConstraints)
}
if videoConstraints.Enabled {
tracker, err := m.selectVideo(videoConstraints)
tracker, err := selectVideo(videoConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -145,8 +69,9 @@ func (m *mediaDevices) GetUserMedia(constraints MediaStreamConstraints) (MediaSt
trackers = append(trackers, tracker)
}
if audioConstraints.Enabled {
tracker, err := m.selectAudio(audioConstraints)
if constraints.Audio != nil {
constraints.Audio(&audioConstraints)
tracker, err := selectAudio(audioConstraints, constraints.Codec)
if err != nil {
cleanTrackers()
return nil, err
@@ -195,12 +120,15 @@ func queryDriverProperties(filter driver.FilterFn) map[driver.Driver][]prop.Medi
func selectBestDriver(filter driver.FilterFn, constraints MediaTrackConstraints) (driver.Driver, MediaTrackConstraints, error) {
var bestDriver driver.Driver
var bestProp prop.Media
var foundPropertiesLog []string
minFitnessDist := math.Inf(1)
foundPropertiesLog = append(foundPropertiesLog, "\n============ Found Properties ============")
driverProperties := queryDriverProperties(filter)
for d, props := range driverProperties {
priority := float64(d.Info().Priority)
for _, p := range props {
foundPropertiesLog = append(foundPropertiesLog, p.String())
fitnessDist, ok := constraints.MediaConstraints.FitnessDistance(p)
if !ok {
continue
@@ -214,33 +142,25 @@ func selectBestDriver(filter driver.FilterFn, constraints MediaTrackConstraints)
}
}
foundPropertiesLog = append(foundPropertiesLog, "=============== Constraints ==============")
foundPropertiesLog = append(foundPropertiesLog, constraints.String())
foundPropertiesLog = append(foundPropertiesLog, "================ Best Fit ================")
if bestDriver == nil {
var foundProperties []string
for _, props := range driverProperties {
for _, p := range props {
foundProperties = append(foundProperties, fmt.Sprint(&p))
}
}
err := fmt.Errorf(`%w:
============ Found Properties ============
%s
=============== Constraints ==============
%s
`, errNotFound, strings.Join(foundProperties, "\n\n"), &constraints)
return nil, MediaTrackConstraints{}, err
foundPropertiesLog = append(foundPropertiesLog, "Not found")
logger.Debug(strings.Join(foundPropertiesLog, "\n\n"))
return nil, MediaTrackConstraints{}, errNotFound
}
foundPropertiesLog = append(foundPropertiesLog, bestProp.String())
logger.Debug(strings.Join(foundPropertiesLog, "\n\n"))
constraints.selectedMedia = prop.Media{}
constraints.selectedMedia.MergeConstraints(constraints.MediaConstraints)
constraints.selectedMedia.Merge(bestProp)
return bestDriver, constraints, nil
}
func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker, error) {
func selectAudio(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterAudioRecorder()
d, c, err := selectBestDriver(typeFilter, constraints)
@@ -248,9 +168,9 @@ func (m *mediaDevices) selectAudio(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker, error) {
func selectVideo(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder()
notScreenFilter := driver.FilterNot(driver.FilterDeviceType(driver.Screen))
filter := driver.FilterAnd(typeFilter, notScreenFilter)
@@ -260,10 +180,10 @@ func (m *mediaDevices) selectVideo(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker, error) {
func selectScreen(constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
typeFilter := driver.FilterVideoRecorder()
screenFilter := driver.FilterDeviceType(driver.Screen)
filter := driver.FilterAnd(typeFilter, screenFilter)
@@ -273,10 +193,10 @@ func (m *mediaDevices) selectScreen(constraints MediaTrackConstraints) (Tracker,
return nil, err
}
return newTrack(&m.MediaDevicesOptions, d, c)
return newTrackFromDriver(d, c, selector)
}
func (m *mediaDevices) EnumerateDevices() []MediaDeviceInfo {
func EnumerateDevices() []MediaDeviceInfo {
drivers := driver.GetManager().Query(
driver.FilterFn(func(driver.Driver) bool { return true }))
info := make([]MediaDeviceInfo, 0, len(drivers))

View File

@@ -0,0 +1,82 @@
// +build e2e
package mediadevices
import (
"image"
"sync"
"testing"
"github.com/pion/mediadevices/pkg/codec/x264"
"github.com/pion/mediadevices/pkg/frame"
)
type mockVideoSource struct {
width, height int
pool sync.Pool
decoder frame.Decoder
}
func newMockVideoSource(width, height int) *mockVideoSource {
decoder, err := frame.NewDecoder(frame.FormatYUY2)
if err != nil {
panic(err)
}
return &mockVideoSource{
width: width,
height: height,
pool: sync.Pool{
New: func() interface{} {
resolution := width * height
return make([]byte, resolution*2)
},
},
decoder: decoder,
}
}
func (source *mockVideoSource) ID() string { return "" }
func (source *mockVideoSource) Close() error { return nil }
func (source *mockVideoSource) Read() (image.Image, func(), error) {
raw := source.pool.Get().([]byte)
decoded, release, err := source.decoder.Decode(raw, source.width, source.height)
source.pool.Put(raw)
if err != nil {
return nil, nil, err
}
return decoded, release, nil
}
func BenchmarkEndToEnd(b *testing.B) {
params, err := x264.NewParams()
if err != nil {
b.Fatal(err)
}
params.BitRate = 300_000
videoSource := newMockVideoSource(1920, 1080)
track := NewVideoTrack(videoSource, nil).(*VideoTrack)
defer track.Close()
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
b.Fatal(err)
}
encodedReader, err := params.BuildVideoEncoder(reader, inputProp)
if err != nil {
b.Fatal(err)
}
defer encodedReader.Close()
for i := 0; i < b.N; i++ {
_, release, err := encodedReader.Read()
if err != nil {
b.Fatal(err)
}
release()
}
}

View File

@@ -1,91 +1,42 @@
package mediadevices
import (
"errors"
"io"
"testing"
"time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver"
_ "github.com/pion/mediadevices/pkg/driver/audiotest"
_ "github.com/pion/mediadevices/pkg/driver/videotest"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
func TestGetUserMedia(t *testing.T) {
videoParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 100000,
},
name: "MockVideo",
}
audioParams := mockParams{
BaseParams: codec.BaseParams{
BitRate: 32000,
},
name: "MockAudio",
}
md := NewMediaDevicesFromCodecs(
map[webrtc.RTPCodecType][]*webrtc.RTPCodec{
webrtc.RTPCodecTypeVideo: {
{Type: webrtc.RTPCodecTypeVideo, Name: "MockVideo", PayloadType: 1},
},
webrtc.RTPCodecTypeAudio: {
{Type: webrtc.RTPCodecTypeAudio, Name: "MockAudio", PayloadType: 2},
},
},
WithTrackGenerator(
func(_ uint8, _ uint32, id, _ string, codec *webrtc.RTPCodec) (
LocalTrack, error,
) {
return newMockTrack(codec, id), nil
},
),
)
constraints := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) {
c.Enabled = true
c.Width = prop.Int(640)
c.Height = prop.Int(480)
params := videoParams
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
},
Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
},
}
constraintsWrong := MediaStreamConstraints{
Video: func(c *MediaTrackConstraints) {
c.Enabled = true
c.Width = prop.Int(640)
c.Width = prop.IntExact(10000)
c.Height = prop.Int(480)
params := videoParams
params.BitRate = 0
c.VideoEncoderBuilders = []codec.VideoEncoderBuilder{&params}
},
Audio: func(c *MediaTrackConstraints) {
c.Enabled = true
params := audioParams
c.AudioEncoderBuilders = []codec.AudioEncoderBuilder{&params}
},
}
// GetUserMedia with broken parameters
ms, err := md.GetUserMedia(constraintsWrong)
ms, err := GetUserMedia(constraintsWrong)
if err == nil {
t.Fatal("Expected error, but got nil")
}
// GetUserMedia with correct parameters
ms, err = md.GetUserMedia(constraints)
ms, err = GetUserMedia(constraints)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -103,11 +54,11 @@ func TestGetUserMedia(t *testing.T) {
time.Sleep(50 * time.Millisecond)
for _, track := range tracks {
track.Stop()
track.Close()
}
// Stop and retry GetUserMedia
ms, err = md.GetUserMedia(constraints)
ms, err = GetUserMedia(constraints)
if err != nil {
t.Fatalf("Failed to GetUserMedia after the previsous tracks stopped: %v", err)
}
@@ -124,106 +75,10 @@ func TestGetUserMedia(t *testing.T) {
}
time.Sleep(50 * time.Millisecond)
for _, track := range tracks {
track.Stop()
track.Close()
}
}
type mockTrack struct {
codec *webrtc.RTPCodec
id string
}
func newMockTrack(codec *webrtc.RTPCodec, id string) *mockTrack {
return &mockTrack{
codec: codec,
id: id,
}
}
func (t *mockTrack) WriteSample(s media.Sample) error {
return nil
}
func (t *mockTrack) Codec() *webrtc.RTPCodec {
return t.codec
}
func (t *mockTrack) ID() string {
return t.id
}
func (t *mockTrack) Kind() webrtc.RTPCodecType {
return t.codec.Type
}
type mockParams struct {
codec.BaseParams
name string
}
func (params *mockParams) RTPCodec() *codec.RTPCodec {
rtpCodec := codec.NewRTPH264Codec(90000)
rtpCodec.Name = params.name
return rtpCodec
}
func (params *mockParams) BuildVideoEncoder(r video.Reader, p prop.Media) (codec.ReadCloser, error) {
if params.BitRate == 0 {
// This is a dummy error to test the failure condition.
return nil, errors.New("wrong codec parameter")
}
return &mockVideoCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
func (params *mockParams) BuildAudioEncoder(r audio.Reader, p prop.Media) (codec.ReadCloser, error) {
return &mockAudioCodec{
r: r,
closed: make(chan struct{}),
}, nil
}
type mockCodec struct{}
func (e *mockCodec) SetBitRate(b int) error {
return nil
}
func (e *mockCodec) ForceKeyFrame() error {
return nil
}
type mockVideoCodec struct {
mockCodec
r video.Reader
closed chan struct{}
}
func (m *mockVideoCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockVideoCodec) Close() error { return nil }
type mockAudioCodec struct {
mockCodec
r audio.Reader
closed chan struct{}
}
func (m *mockAudioCodec) Read() ([]byte, error) {
if _, err := m.r.Read(); err != nil {
return nil, err
}
return make([]byte, 20), nil
}
func (m *mockAudioCodec) Close() error { return nil }
func TestSelectBestDriverConstraintsResultIsSetProperly(t *testing.T) {
filterFn := driver.FilterVideoRecorder()
drivers := driver.GetManager().Query(filterFn)

View File

@@ -7,80 +7,80 @@ import (
// MediaStream is an interface that represents a collection of existing tracks.
type MediaStream interface {
// GetAudioTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getaudiotracks
GetAudioTracks() []Tracker
GetAudioTracks() []Track
// GetVideoTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-getvideotracks
GetVideoTracks() []Tracker
GetVideoTracks() []Track
// GetTracks implements https://w3c.github.io/mediacapture-main/#dom-mediastream-gettracks
GetTracks() []Tracker
GetTracks() []Track
// AddTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-addtrack
AddTrack(t Tracker)
AddTrack(t Track)
// RemoveTrack implements https://w3c.github.io/mediacapture-main/#dom-mediastream-removetrack
RemoveTrack(t Tracker)
RemoveTrack(t Track)
}
type mediaStream struct {
trackers map[Tracker]struct{}
l sync.RWMutex
tracks map[Track]struct{}
l sync.RWMutex
}
const trackTypeDefault MediaDeviceType = 0
// NewMediaStream creates a MediaStream interface that's defined in
// https://w3c.github.io/mediacapture-main/#dom-mediastream
func NewMediaStream(trackers ...Tracker) (MediaStream, error) {
m := mediaStream{trackers: make(map[Tracker]struct{})}
func NewMediaStream(tracks ...Track) (MediaStream, error) {
m := mediaStream{tracks: make(map[Track]struct{})}
for _, tracker := range trackers {
if _, ok := m.trackers[tracker]; !ok {
m.trackers[tracker] = struct{}{}
for _, track := range tracks {
if _, ok := m.tracks[track]; !ok {
m.tracks[track] = struct{}{}
}
}
return &m, nil
}
func (m *mediaStream) GetAudioTracks() []Tracker {
func (m *mediaStream) GetAudioTracks() []Track {
return m.queryTracks(AudioInput)
}
func (m *mediaStream) GetVideoTracks() []Tracker {
func (m *mediaStream) GetVideoTracks() []Track {
return m.queryTracks(VideoInput)
}
func (m *mediaStream) GetTracks() []Tracker {
func (m *mediaStream) GetTracks() []Track {
return m.queryTracks(trackTypeDefault)
}
// queryTracks returns all tracks that are the same kind as t.
// If t is 0, which is the default, queryTracks will return all the tracks.
func (m *mediaStream) queryTracks(t MediaDeviceType) []Tracker {
func (m *mediaStream) queryTracks(t MediaDeviceType) []Track {
m.l.RLock()
defer m.l.RUnlock()
result := make([]Tracker, 0)
for tracker := range m.trackers {
if tracker.Kind() == t || t == trackTypeDefault {
result = append(result, tracker)
result := make([]Track, 0)
for track := range m.tracks {
if track.Kind() == t || t == trackTypeDefault {
result = append(result, track)
}
}
return result
}
func (m *mediaStream) AddTrack(t Tracker) {
func (m *mediaStream) AddTrack(t Track) {
m.l.Lock()
defer m.l.Unlock()
if _, ok := m.trackers[t]; ok {
if _, ok := m.tracks[t]; ok {
return
}
m.trackers[t] = struct{}{}
m.tracks[t] = struct{}{}
}
func (m *mediaStream) RemoveTrack(t Tracker) {
func (m *mediaStream) RemoveTrack(t Track) {
m.l.Lock()
defer m.l.Unlock()
delete(m.trackers, t)
delete(m.tracks, t)
}

View File

@@ -10,17 +10,14 @@ type mockMediaStreamTrack struct {
kind MediaDeviceType
}
func (track *mockMediaStreamTrack) Track() *webrtc.Track {
return nil
func (track *mockMediaStreamTrack) ID() string {
return ""
}
func (track *mockMediaStreamTrack) LocalTrack() LocalTrack {
func (track *mockMediaStreamTrack) Close() error {
return nil
}
func (track *mockMediaStreamTrack) Stop() {
}
func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
return track.kind
}
@@ -28,8 +25,20 @@ func (track *mockMediaStreamTrack) Kind() MediaDeviceType {
func (track *mockMediaStreamTrack) OnEnded(handler func(error)) {
}
func (track *mockMediaStreamTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
return nil, nil
}
func (track *mockMediaStreamTrack) Unbind(pc *webrtc.PeerConnection) error {
return nil
}
func (track *mockMediaStreamTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
return nil, nil
}
func TestMediaStreamFilters(t *testing.T) {
audioTracks := []Tracker{
audioTracks := []Track{
&mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput},
&mockMediaStreamTrack{AudioInput},
@@ -37,7 +46,7 @@ func TestMediaStreamFilters(t *testing.T) {
&mockMediaStreamTrack{AudioInput},
}
videoTracks := []Tracker{
videoTracks := []Track{
&mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput},
&mockMediaStreamTrack{VideoInput},
@@ -49,7 +58,7 @@ func TestMediaStreamFilters(t *testing.T) {
t.Fatal(err)
}
expect := func(t *testing.T, actual, expected []Tracker) {
expect := func(t *testing.T, actual, expected []Track) {
if len(actual) != len(expected) {
t.Fatalf("%s: Expected to get %d trackers, but got %d trackers", t.Name(), len(expected), len(actual))
}

View File

@@ -1,40 +1,18 @@
package mediadevices
import (
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
type MediaStreamConstraints struct {
Audio MediaOption
Video MediaOption
Codec *CodecSelector
}
// MediaTrackConstraints represents https://w3c.github.io/mediacapture-main/#dom-mediatrackconstraints
type MediaTrackConstraints struct {
prop.MediaConstraints
Enabled bool
// VideoEncoderBuilders are codec builders that are used for encoding the video
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
VideoEncoderBuilders []codec.VideoEncoderBuilder
// AudioEncoderBuilders are codec builders that are used for encoding the audio
// and later being used for sending the appropriate RTP payload type.
//
// If one encoder builder fails to build the codec, the next builder will be used,
// repeating until a codec builds. If no builders build successfully, an error is returned.
AudioEncoderBuilders []codec.AudioEncoderBuilder
// VideoTransform will be used to transform the video that's coming from the driver.
// So, basically it'll look like following: driver -> VideoTransform -> codec
VideoTransform video.TransformFunc
// AudioTransform will be used to transform the audio that's coming from the driver.
// So, basically it'll look like following: driver -> AudioTransform -> code
AudioTransform audio.TransformFunc
selectedMedia prop.Media
}

View File

@@ -15,7 +15,7 @@ func detectCurrentVideoProp(broadcaster *video.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = video.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()
return currentProp, err
}
@@ -29,7 +29,7 @@ func detectCurrentAudioProp(broadcaster *audio.Broadcaster) (prop.Media, error)
// in any case.
metaReader := broadcaster.NewReader(false)
metaReader = audio.DetectChanges(0, func(p prop.Media) { currentProp = p })(metaReader)
_, err := metaReader.Read()
_, _, err := metaReader.Read()
return currentProp, err
}

View File

@@ -17,12 +17,12 @@ func TestDetectCurrentVideoProp(t *testing.T) {
second.Pix[0] = 2
isFirst := true
source := video.ReaderFunc(func() (image.Image, error) {
source := video.ReaderFunc(func() (image.Image, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})
@@ -42,7 +42,7 @@ func TestDetectCurrentVideoProp(t *testing.T) {
}
reader := broadcaster.NewReader(false)
img, err := reader.Read()
img, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}
@@ -65,12 +65,12 @@ func TestDetectCurrentAudioProp(t *testing.T) {
second.Data[0] = 2
isFirst := true
source := audio.ReaderFunc(func() (wave.Audio, error) {
source := audio.ReaderFunc(func() (wave.Audio, func(), error) {
if isFirst {
isFirst = true
return first, nil
return first, func() {}, nil
} else {
return second, nil
return second, func() {}, nil
}
})
@@ -86,7 +86,7 @@ func TestDetectCurrentAudioProp(t *testing.T) {
}
reader := broadcaster.NewReader(false)
chunk, err := reader.Read()
chunk, _, err := reader.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -110,12 +110,12 @@ func (rc *ReadCloser) dataCb(data []byte) {
// Read reads raw data, the format is determined by the media type and property:
// - For video, each call will return a frame.
// - For audio, each call will return a chunk which its size configured by Latency
func (rc *ReadCloser) Read() ([]byte, error) {
func (rc *ReadCloser) Read() ([]byte, func(), error) {
data, ok := <-rc.dataChan
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
return data, nil
return data, func() {}, nil
}
// Close closes the capturing session, and no data will flow anymore

View File

@@ -58,7 +58,7 @@ type VideoEncoderBuilder interface {
// ReadCloser is an io.ReadCloser with methods for rate limiting: SetBitRate and ForceKeyFrame
type ReadCloser interface {
Read() ([]byte, error)
Read() (b []byte, release func(), err error)
Close() error
// SetBitRate sets current target bitrate, lower bitrate means smaller data will be transmitted
// but this also means that the quality will also be lower.

View File

@@ -55,17 +55,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imgReal := img.(*image.YCbCr)
var y, cb, cr C.Slice
@@ -79,7 +79,7 @@ func (e *encoder) Read() ([]byte, error) {
var encodedBuffer *C.MMAL_BUFFER_HEADER_T
status := C.enc_encode(&e.engine, y, cb, cr, &encodedBuffer)
if status.code != 0 {
return nil, statusToErr(&status)
return nil, func() {}, statusToErr(&status)
}
// GoBytes copies the C array to Go slice. After this, it's safe to release the C array
@@ -87,7 +87,7 @@ func (e *encoder) Read() ([]byte, error) {
// Release the buffer so that mmal can reuse this memory
C.mmal_buffer_header_release(encodedBuffer)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -50,17 +50,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
}, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -74,11 +74,11 @@ func (e *encoder) Read() ([]byte, error) {
width: C.int(bounds.Max.X - bounds.Min.X),
}, &rv)
if err := errResult(rv); err != nil {
return nil, fmt.Errorf("failed in encoding: %v", err)
return nil, func() {}, fmt.Errorf("failed in encoding: %v", err)
}
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, nil
return encoded, func() {}, nil
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -72,22 +72,22 @@ func newEncoder(r audio.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
buff, err := e.reader.Read()
func (e *encoder) Read() ([]byte, func(), error) {
buff, _, err := e.reader.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
encoded := make([]byte, 1024)
switch b := buff.(type) {
case *wave.Int16Interleaved:
n, err := e.engine.Encode(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
case *wave.Float32Interleaved:
n, err := e.engine.EncodeFloat32(b.Data, encoded)
return encoded[:n:n], err
return encoded[:n:n], func() {}, err
default:
return nil, errors.New("unknown type of audio buffer")
return nil, func() {}, errors.New("unknown type of audio buffer")
}
}

View File

@@ -64,7 +64,6 @@ import (
"unsafe"
"github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
@@ -296,17 +295,17 @@ func newVP8Encoder(r video.Reader, p prop.Media, params ParamsVP8) (codec.ReadCl
return e, nil
}
func (e *encoderVP8) Read() ([]byte, error) {
func (e *encoderVP8) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -348,7 +347,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
}
}
if e.picParam.reconstructed_frame == C.VA_INVALID_SURFACE {
return nil, errors.New("no available surface")
return nil, func() {}, errors.New("no available surface")
}
C.setForceKFFlagVP8(&e.picParam, 0)
@@ -416,7 +415,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
C.size_t(uintptr(p.src)),
&id,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
}
buffs = append(buffs, id)
}
@@ -426,17 +425,17 @@ func (e *encoderVP8) Read() ([]byte, error) {
e.display, e.ctxID,
e.surfs[surfaceVP8Input],
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Upload image
var vaImg C.VAImage
var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP8Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// TODO: use vaImg.pitches to support padding
C.memcpy(
@@ -452,10 +451,10 @@ func (e *encoderVP8) Read() ([]byte, error) {
unsafe.Pointer(&yuvImg.Cr[0]), C.size_t(len(yuvImg.Cr)),
)
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaRenderPicture(
@@ -463,38 +462,38 @@ func (e *encoderVP8) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput
C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaEndPicture(
e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Load encoded data
for retry := 3; retry >= 0; retry-- {
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
}
var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
}
if surfStat == C.VASurfaceReady {
break
}
if retry == 0 {
return nil, fmt.Errorf("failed to sync surface: %d", surfStat)
return nil, func() {}, fmt.Errorf("failed to sync surface: %d", surfStat)
}
}
var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if seg.status&C.VA_CODED_BUF_STATUS_SLICE_OVERFLOW_MASK != 0 {
return nil, errors.New("buffer size too small")
return nil, func() {}, errors.New("buffer size too small")
}
if cap(e.frame) < int(seg.size) {
@@ -507,13 +506,13 @@ func (e *encoderVP8) Read() ([]byte, error) {
)
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// Destroy buffers
for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
}
}
@@ -538,7 +537,7 @@ func (e *encoderVP8) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoderVP8) SetBitRate(b int) error {

View File

@@ -47,7 +47,6 @@ import (
"unsafe"
"github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
@@ -285,17 +284,17 @@ func newVP9Encoder(r video.Reader, p prop.Media, params ParamsVP9) (codec.ReadCl
return e, nil
}
func (e *encoderVP9) Read() ([]byte, error) {
func (e *encoderVP9) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -379,7 +378,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.size_t(uintptr(p.src)),
&id,
); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to create buffer: %s", C.GoString(C.vaErrorStr(s)))
}
buffs = append(buffs, id)
}
@@ -389,17 +388,17 @@ func (e *encoderVP9) Read() ([]byte, error) {
e.display, e.ctxID,
e.surfs[surfaceVP9Input],
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to begin picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Upload image
var vaImg C.VAImage
var rawBuf unsafe.Pointer
if s := C.vaDeriveImage(e.display, e.surfs[surfaceVP9Input], &vaImg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to derive image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaMapBuffer(e.display, vaImg.buf, &rawBuf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// TODO: use vaImg.pitches to support padding
C.copyI420toNV12(
@@ -410,10 +409,10 @@ func (e *encoderVP9) Read() ([]byte, error) {
C.uint(len(yuvImg.Y)),
)
if s := C.vaUnmapBuffer(e.display, vaImg.buf); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaDestroyImage(e.display, vaImg.image_id); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy image: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaRenderPicture(
@@ -421,27 +420,27 @@ func (e *encoderVP9) Read() ([]byte, error) {
&buffs[1], // 0 is for ouput
C.int(len(buffs)-1),
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to render picture: %s", C.GoString(C.vaErrorStr(s)))
}
if s := C.vaEndPicture(
e.display, e.ctxID,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to end picture: %s", C.GoString(C.vaErrorStr(s)))
}
// Load encoded data
if s := C.vaSyncSurface(e.display, e.picParam.reconstructed_frame); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to sync surface: %s", C.GoString(C.vaErrorStr(s)))
}
var surfStat C.VASurfaceStatus
if s := C.vaQuerySurfaceStatus(
e.display, e.picParam.reconstructed_frame, &surfStat,
); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to query surface status: %s", C.GoString(C.vaErrorStr(s)))
}
var seg *C.VACodedBufferSegment
if s := C.vaMapBufferSeg(e.display, buffs[0], &seg); s != C.VA_STATUS_SUCCESS {
return nil, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to map buffer: %s", C.GoString(C.vaErrorStr(s)))
}
if cap(e.frame) < int(seg.size) {
e.frame = make([]byte, int(seg.size))
@@ -453,13 +452,13 @@ func (e *encoderVP9) Read() ([]byte, error) {
)
if s := C.vaUnmapBuffer(e.display, buffs[0]); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to unmap buffer: %s", C.GoString(C.vaErrorStr(s)))
}
// Destroy buffers
for _, b := range buffs {
if s := C.vaDestroyBuffer(e.display, b); s != C.VA_STATUS_SUCCESS {
return 0, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
return nil, func() {}, fmt.Errorf("failed to destroy buffer: %s", C.GoString(C.vaErrorStr(s)))
}
}
@@ -473,7 +472,7 @@ func (e *encoderVP9) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoderVP9) SetBitRate(b int) error {

View File

@@ -204,17 +204,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params, codecIface *C.vpx_c
}, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
bounds := yuvImg.Bounds()
@@ -230,7 +230,7 @@ func (e *encoder) Read() ([]byte, error) {
if e.cfg.g_w != C.uint(width) || e.cfg.g_h != C.uint(height) {
e.cfg.g_w, e.cfg.g_h = C.uint(width), C.uint(height)
if ec := C.vpx_codec_enc_config_set(e.codec, e.cfg); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
return nil, func() {}, fmt.Errorf("vpx_codec_enc_config_set failed (%d)", ec)
}
e.raw.w, e.raw.h = C.uint(width), C.uint(height)
e.raw.r_w, e.raw.r_h = C.uint(width), C.uint(height)
@@ -243,7 +243,7 @@ func (e *encoder) Read() ([]byte, error) {
C.long(t-e.tStart), C.ulong(t-e.tLastFrame), C.long(flags), C.ulong(e.deadline),
(*C.uchar)(&yuvImg.Y[0]), (*C.uchar)(&yuvImg.Cb[0]), (*C.uchar)(&yuvImg.Cr[0]),
); ec != C.VPX_CODEC_OK {
return nil, fmt.Errorf("vpx_codec_encode failed (%d)", ec)
return nil, func() {}, fmt.Errorf("vpx_codec_encode failed (%d)", ec)
}
e.frameIndex++
@@ -264,7 +264,7 @@ func (e *encoder) Read() ([]byte, error) {
encoded := make([]byte, len(e.frame))
copy(encoded, e.frame)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -47,7 +47,7 @@ Encoder *enc_new(x264_param_t param, char *preset, int *rc) {
e->param.b_repeat_headers = 1;
e->param.b_annexb = 1;
if (x264_param_apply_profile(&e->param, "baseline") < 0) {
if (x264_param_apply_profile(&e->param, "high") < 0) {
*rc = ERR_APPLY_PROFILE;
goto fail;
}
@@ -95,4 +95,4 @@ void enc_close(Encoder *e, int *rc) {
x264_encoder_close(e->h);
x264_picture_clean(&e->pic_in);
free(e);
}
}

View File

@@ -94,17 +94,17 @@ func newEncoder(r video.Reader, p prop.Media, params Params) (codec.ReadCloser,
return &e, nil
}
func (e *encoder) Read() ([]byte, error) {
func (e *encoder) Read() ([]byte, func(), error) {
e.mu.Lock()
defer e.mu.Unlock()
if e.closed {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img, err := e.r.Read()
img, _, err := e.r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
yuvImg := img.(*image.YCbCr)
@@ -117,11 +117,11 @@ func (e *encoder) Read() ([]byte, error) {
&rc,
)
if err := errFromC(rc); err != nil {
return nil, err
return nil, func() {}, err
}
encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
return encoded, err
return encoded, func() {}, err
}
func (e *encoder) SetBitRate(b int) error {

View File

@@ -52,10 +52,10 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
closed := d.closed
reader := audio.ReaderFunc(func() (wave.Audio, error) {
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
select {
case <-closed:
return nil, io.EOF
return nil, func() {}, io.EOF
default:
}
@@ -78,7 +78,7 @@ func (d *dummy) AudioRecord(p prop.Media) (audio.Reader, error) {
a.SetFloat32(i, ch, wave.Float32Sample(sin[phase]))
}
}
return a, nil
return a, func() {}, nil
})
return reader, nil
}

View File

@@ -56,10 +56,10 @@ func (cam *camera) VideoRecord(property prop.Media) (video.Reader, error) {
if err != nil {
return nil, err
}
r := video.ReaderFunc(func() (image.Image, error) {
frame, err := rc.Read()
r := video.ReaderFunc(func() (image.Image, func(), error) {
frame, _, err := rc.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
return decoder.Decode(frame, property.Width, property.Height)
})

View File

@@ -182,7 +182,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
ctx, cancel := context.WithCancel(context.Background())
c.cancel = cancel
var buf []byte
r := video.ReaderFunc(func() (img image.Image, err error) {
r := video.ReaderFunc(func() (img image.Image, release func(), err error) {
// Lock to avoid accessing the buffer after StopStreaming()
c.mutex.Lock()
defer c.mutex.Unlock()
@@ -191,23 +191,23 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
for i := 0; i < maxEmptyFrameCount; i++ {
if ctx.Err() != nil {
// Return EOF if the camera is already closed.
return nil, io.EOF
return nil, func() {}, io.EOF
}
err := cam.WaitForFrame(5) // 5 seconds
switch err.(type) {
case nil:
case *webcam.Timeout:
return nil, errReadTimeout
return nil, func() {}, errReadTimeout
default:
// Camera has been stopped.
return nil, err
return nil, func() {}, err
}
b, err := cam.ReadFrame()
if err != nil {
// Camera has been stopped.
return nil, err
return nil, func() {}, err
}
// Frame is empty.
@@ -227,7 +227,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
n := copy(buf, b)
return decoder.Decode(buf[:n], p.Width, p.Height)
}
return nil, errEmptyFrame
return nil, func() {}, errEmptyFrame
})
return r, nil

View File

@@ -116,10 +116,10 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img := &image.YCbCr{}
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
b, ok := <-c.ch
if !ok {
return nil, io.EOF
return nil, func() {}, io.EOF
}
img.Y = b[:nPix]
img.Cb = b[nPix : nPix+nPix/2]
@@ -128,7 +128,7 @@ func (c *camera) VideoRecord(p prop.Media) (video.Reader, error) {
img.CStride = p.Width / 2
img.SubsampleRatio = image.YCbCrSubsampleRatio422
img.Rect = image.Rect(0, 0, p.Width, p.Height)
return img, nil
return img, func() {}, nil
})
return r, nil
}

View File

@@ -1 +1,204 @@
package microphone
import (
"encoding/binary"
"errors"
"fmt"
"io"
"time"
"unsafe"
"github.com/gen2brain/malgo"
"github.com/pion/mediadevices/internal/logging"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
const (
maxDeviceIDLength = 20
// TODO: should replace this with a more flexible approach
sampleRateStep = 1000
initialBufferSize = 1024
)
var logger = logging.NewLogger("mediadevices/driver/microphone")
var ctx *malgo.AllocatedContext
var hostEndian binary.ByteOrder
var (
errUnsupportedFormat = errors.New("the provided audio format is not supported")
)
type microphone struct {
malgo.DeviceInfo
chunkChan chan []byte
}
func init() {
var err error
/*
backends := []malgo.Backend{
malgo.BackendPulseaudio,
malgo.BackendAlsa,
}
*/
ctx, err = malgo.InitContext(nil, malgo.ContextConfig{}, func(message string) {
logger.Debugf("%v\n", message)
})
if err != nil {
panic(err)
}
devices, err := ctx.Devices(malgo.Capture)
if err != nil {
panic(err)
}
for _, device := range devices {
// TODO: Detect default device and prioritize it
driver.GetManager().Register(newMicrophone(device), driver.Info{
Label: device.ID.String(),
DeviceType: driver.Microphone,
})
}
// Decide which endian
switch v := *(*uint16)(unsafe.Pointer(&([]byte{0x12, 0x34}[0]))); v {
case 0x1234:
hostEndian = binary.BigEndian
case 0x3412:
hostEndian = binary.LittleEndian
default:
panic(fmt.Sprintf("failed to determine host endianness: %x", v))
}
}
func newMicrophone(info malgo.DeviceInfo) *microphone {
return &microphone{
DeviceInfo: info,
}
}
func (m *microphone) Open() error {
m.chunkChan = make(chan []byte, 1)
return nil
}
func (m *microphone) Close() error {
if m.chunkChan != nil {
close(m.chunkChan)
m.chunkChan = nil
}
return nil
}
func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) {
var config malgo.DeviceConfig
var callbacks malgo.DeviceCallbacks
decoder, err := wave.NewDecoder(&wave.RawFormat{
SampleSize: inputProp.SampleSize,
IsFloat: inputProp.IsFloat,
Interleaved: inputProp.IsInterleaved,
})
if err != nil {
return nil, err
}
config.DeviceType = malgo.Capture
config.PerformanceProfile = malgo.LowLatency
config.Capture.Channels = uint32(inputProp.ChannelCount)
config.SampleRate = uint32(inputProp.SampleRate)
if inputProp.SampleSize == 4 && inputProp.IsFloat {
config.Capture.Format = malgo.FormatF32
} else if inputProp.SampleSize == 2 && !inputProp.IsFloat {
config.Capture.Format = malgo.FormatS16
} else {
return nil, errUnsupportedFormat
}
onRecvChunk := func(_, chunk []byte, framecount uint32) {
m.chunkChan <- chunk
}
callbacks.Data = onRecvChunk
device, err := malgo.InitDevice(ctx.Context, config, callbacks)
if err != nil {
return nil, err
}
err = device.Start()
if err != nil {
return nil, err
}
return audio.ReaderFunc(func() (wave.Audio, func(), error) {
chunk, ok := <-m.chunkChan
if !ok {
device.Stop()
device.Uninit()
return nil, func() {}, io.EOF
}
decodedChunk, err := decoder.Decode(hostEndian, chunk, inputProp.ChannelCount)
// FIXME: the decoder should also fill this information
decodedChunk.(*wave.Float32Interleaved).Size.SamplingRate = inputProp.SampleRate
return decodedChunk, func() {}, err
}), nil
}
func (m *microphone) Properties() []prop.Media {
var supportedProps []prop.Media
logger.Debug("Querying properties")
var isBigEndian bool
// miniaudio only uses the host endian
if hostEndian == binary.BigEndian {
isBigEndian = true
}
for ch := m.MinChannels; ch <= m.MaxChannels; ch++ {
for sampleRate := m.MinSampleRate; sampleRate <= m.MaxSampleRate; sampleRate += sampleRateStep {
for i := 0; i < int(m.FormatCount); i++ {
format := m.Formats[i]
supportedProp := prop.Media{
Audio: prop.Audio{
ChannelCount: int(ch),
SampleRate: int(sampleRate),
IsBigEndian: isBigEndian,
// miniaudio only supports interleaved at the moment
IsInterleaved: true,
},
}
switch malgo.FormatType(format) {
case malgo.FormatF32:
supportedProp.SampleSize = 4
supportedProp.IsFloat = true
case malgo.FormatS16:
supportedProp.SampleSize = 2
supportedProp.IsFloat = false
}
supportedProps = append(supportedProps, supportedProp)
}
}
}
// FIXME: remove this hardcoded value. Malgo doesn't support "ma_context_get_device_info" API yet. The above iterations
// will always return nothing as of now
supportedProps = append(supportedProps, prop.Media{
Audio: prop.Audio{
Latency: time.Millisecond * 20,
ChannelCount: 1,
SampleRate: 48000,
SampleSize: 4,
IsFloat: true,
IsBigEndian: isBigEndian,
IsInterleaved: true,
},
})
return supportedProps
}

View File

@@ -1,137 +0,0 @@
package microphone
import (
"io"
"time"
"github.com/jfreymuth/pulse"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
type microphone struct {
c *pulse.Client
id string
samplesChan chan<- []int16
}
func init() {
pa, err := pulse.NewClient()
if err != nil {
// No pulseaudio
return
}
defer pa.Close()
sources, err := pa.ListSources()
if err != nil {
panic(err)
}
defaultSource, err := pa.DefaultSource()
if err != nil {
panic(err)
}
for _, source := range sources {
priority := driver.PriorityNormal
if defaultSource.ID() == source.ID() {
priority = driver.PriorityHigh
}
driver.GetManager().Register(&microphone{id: source.ID()}, driver.Info{
Label: source.ID(),
DeviceType: driver.Microphone,
Priority: priority,
})
}
}
func (m *microphone) Open() error {
var err error
m.c, err = pulse.NewClient()
if err != nil {
return err
}
return nil
}
func (m *microphone) Close() error {
if m.samplesChan != nil {
close(m.samplesChan)
m.samplesChan = nil
}
m.c.Close()
return nil
}
func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
var options []pulse.RecordOption
if p.ChannelCount == 1 {
options = append(options, pulse.RecordMono)
} else {
options = append(options, pulse.RecordStereo)
}
latency := p.Latency.Seconds()
src, err := m.c.SourceByID(m.id)
if err != nil {
return nil, err
}
options = append(options,
pulse.RecordSampleRate(p.SampleRate),
pulse.RecordLatency(latency),
pulse.RecordSource(src),
)
samplesChan := make(chan []int16, 1)
handler := func(b []int16) (int, error) {
samplesChan <- b
return len(b), nil
}
stream, err := m.c.NewRecord(pulse.Int16Writer(handler), options...)
if err != nil {
return nil, err
}
reader := audio.ReaderFunc(func() (wave.Audio, error) {
buff, ok := <-samplesChan
if !ok {
stream.Close()
return nil, io.EOF
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: len(buff) / p.ChannelCount,
},
)
copy(a.Data, buff)
return a, nil
})
stream.Start()
m.samplesChan = samplesChan
return reader, nil
}
func (m *microphone) Properties() []prop.Media {
// TODO: Get actual properties
monoProp := prop.Media{
Audio: prop.Audio{
SampleRate: 48000,
Latency: time.Millisecond * 20,
ChannelCount: 1,
},
}
stereoProp := monoProp
stereoProp.ChannelCount = 2
return []prop.Media{monoProp, stereoProp}
}

View File

@@ -1,347 +0,0 @@
package microphone
import (
"errors"
"golang.org/x/sys/windows"
"io"
"time"
"unsafe"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
const (
// bufferNumber * prop.Audio.Latency is the maximum blockable duration
// to get data without dropping chunks.
bufferNumber = 5
)
// Windows APIs
var (
winmm = windows.NewLazySystemDLL("Winmm.dll")
waveInOpen = winmm.NewProc("waveInOpen")
waveInStart = winmm.NewProc("waveInStart")
waveInStop = winmm.NewProc("waveInStop")
waveInReset = winmm.NewProc("waveInReset")
waveInClose = winmm.NewProc("waveInClose")
waveInPrepareHeader = winmm.NewProc("waveInPrepareHeader")
waveInAddBuffer = winmm.NewProc("waveInAddBuffer")
waveInUnprepareHeader = winmm.NewProc("waveInUnprepareHeader")
)
type buffer struct {
waveHdr
data []int16
}
func newBuffer(samples int) *buffer {
b := make([]int16, samples)
return &buffer{
waveHdr: waveHdr{
// Sharing Go memory with Windows C API without reference.
// Make sure that the lifetime of the buffer struct is longer
// than the final access from cbWaveIn.
lpData: uintptr(unsafe.Pointer(&b[0])),
dwBufferLength: uint32(samples * 2),
},
data: b,
}
}
type microphone struct {
hWaveIn windows.Pointer
buf map[uintptr]*buffer
chBuf chan *buffer
closed chan struct{}
}
func init() {
// TODO: enum devices
driver.GetManager().Register(&microphone{}, driver.Info{
Label: "default",
DeviceType: driver.Microphone,
})
}
func (m *microphone) Open() error {
m.chBuf = make(chan *buffer)
m.buf = make(map[uintptr]*buffer)
m.closed = make(chan struct{})
return nil
}
func (m *microphone) cbWaveIn(hWaveIn windows.Pointer, uMsg uint, dwInstance, dwParam1, dwParam2 *int32) uintptr {
switch uMsg {
case MM_WIM_DATA:
b := m.buf[uintptr(unsafe.Pointer(dwParam1))]
m.chBuf <- b
case MM_WIM_OPEN:
case MM_WIM_CLOSE:
close(m.chBuf)
}
return 0
}
func (m *microphone) Close() error {
if m.hWaveIn == nil {
return nil
}
close(m.closed)
ret, _, _ := waveInStop.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
// All enqueued buffers are marked done by waveInReset.
ret, _, _ = waveInReset.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
for _, buf := range m.buf {
// Detach buffers from waveIn API.
ret, _, _ := waveInUnprepareHeader.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return err
}
}
// Now, it's ready to free the buffers.
// As microphone struct still has reference to the buffers,
// they will be GC-ed once microphone is reopened or unreferenced.
ret, _, _ = waveInClose.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
<-m.chBuf
m.hWaveIn = nil
return nil
}
func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
for i := 0; i < bufferNumber; i++ {
b := newBuffer(
int(uint64(p.Latency) * uint64(p.SampleRate) / uint64(time.Second)),
)
// Map the buffer by its data head address to restore access to the Go struct
// in callback function. Don't resize the buffer after it.
m.buf[uintptr(unsafe.Pointer(&b.waveHdr))] = b
}
waveFmt := &waveFormatEx{
wFormatTag: WAVE_FORMAT_PCM,
nChannels: uint16(p.ChannelCount),
nSamplesPerSec: uint32(p.SampleRate),
nAvgBytesPerSec: uint32(p.SampleRate * p.ChannelCount * 2),
nBlockAlign: uint16(p.ChannelCount * 2),
wBitsPerSample: 16,
}
ret, _, _ := waveInOpen.Call(
uintptr(unsafe.Pointer(&m.hWaveIn)),
WAVE_MAPPER,
uintptr(unsafe.Pointer(waveFmt)),
windows.NewCallback(m.cbWaveIn),
0,
CALLBACK_FUNCTION,
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
for _, buf := range m.buf {
// Attach buffers to waveIn API.
ret, _, _ := waveInPrepareHeader.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
}
for _, buf := range m.buf {
// Enqueue buffers.
ret, _, _ := waveInAddBuffer.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
}
ret, _, _ = waveInStart.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
// TODO: detect microphone device disconnection and return EOF
reader := audio.ReaderFunc(func() (wave.Audio, error) {
b, ok := <-m.chBuf
if !ok {
return nil, io.EOF
}
select {
case <-m.closed:
default:
// Re-enqueue used buffer.
ret, _, _ := waveInAddBuffer.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&b.waveHdr)),
uintptr(unsafe.Sizeof(b.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount,
},
)
j := 0
for i := 0; i < a.Size.Len; i++ {
for ch := 0; ch < a.Size.Channels; ch++ {
a.SetInt16(i, ch, wave.Int16Sample(b.data[j]))
j++
}
}
return a, nil
})
return reader, nil
}
func (m *microphone) Properties() []prop.Media {
// TODO: Get actual properties
monoProp := prop.Media{
Audio: prop.Audio{
SampleRate: 48000,
Latency: time.Millisecond * 20,
ChannelCount: 1,
},
}
stereoProp := monoProp
stereoProp.ChannelCount = 2
return []prop.Media{monoProp, stereoProp}
}
// Windows API structures
type waveFormatEx struct {
wFormatTag uint16
nChannels uint16
nSamplesPerSec uint32
nAvgBytesPerSec uint32
nBlockAlign uint16
wBitsPerSample uint16
cbSize uint16
}
type waveHdr struct {
lpData uintptr
dwBufferLength uint32
dwBytesRecorded uint32
dwUser *uint32
dwFlags uint32
dwLoops uint32
lpNext *waveHdr
reserved *uint32
}
// Windows consts
const (
MMSYSERR_NOERROR = 0
MMSYSERR_ERROR = 1
MMSYSERR_BADDEVICEID = 2
MMSYSERR_NOTENABLED = 3
MMSYSERR_ALLOCATED = 4
MMSYSERR_INVALHANDLE = 5
MMSYSERR_NODRIVER = 6
MMSYSERR_NOMEM = 7
MMSYSERR_NOTSUPPORTED = 8
MMSYSERR_BADERRNUM = 9
MMSYSERR_INVALFLAG = 10
MMSYSERR_INVALPARAM = 11
MMSYSERR_HANDLEBUSY = 12
MMSYSERR_INVALIDALIAS = 13
MMSYSERR_BADDB = 14
MMSYSERR_KEYNOTFOUND = 15
MMSYSERR_READERROR = 16
MMSYSERR_WRITEERROR = 17
MMSYSERR_DELETEERROR = 18
MMSYSERR_VALNOTFOUND = 19
MMSYSERR_NODRIVERCB = 20
WAVERR_BADFORMAT = 32
WAVERR_STILLPLAYING = 33
WAVERR_UNPREPARED = 34
WAVERR_SYNC = 35
WAVE_MAPPER = 0xFFFF
WAVE_FORMAT_PCM = 1
CALLBACK_NULL = 0
CALLBACK_WINDOW = 0x10000
CALLBACK_TASK = 0x20000
CALLBACK_FUNCTION = 0x30000
CALLBACK_THREAD = CALLBACK_TASK
CALLBACK_EVENT = 0x50000
MM_WIM_OPEN = 0x3BE
MM_WIM_CLOSE = 0x3BF
MM_WIM_DATA = 0x3C0
)
var errWinmm = map[uintptr]error{
MMSYSERR_NOERROR: nil,
MMSYSERR_ERROR: errors.New("error"),
MMSYSERR_BADDEVICEID: errors.New("bad device id"),
MMSYSERR_NOTENABLED: errors.New("not enabled"),
MMSYSERR_ALLOCATED: errors.New("already allocated"),
MMSYSERR_INVALHANDLE: errors.New("invalid handler"),
MMSYSERR_NODRIVER: errors.New("no driver"),
MMSYSERR_NOMEM: errors.New("no memory"),
MMSYSERR_NOTSUPPORTED: errors.New("not supported"),
MMSYSERR_BADERRNUM: errors.New("band error number"),
MMSYSERR_INVALFLAG: errors.New("invalid flag"),
MMSYSERR_INVALPARAM: errors.New("invalid param"),
MMSYSERR_HANDLEBUSY: errors.New("handle busy"),
MMSYSERR_INVALIDALIAS: errors.New("invalid alias"),
MMSYSERR_BADDB: errors.New("bad db"),
MMSYSERR_KEYNOTFOUND: errors.New("key not found"),
MMSYSERR_READERROR: errors.New("read error"),
MMSYSERR_WRITEERROR: errors.New("write error"),
MMSYSERR_DELETEERROR: errors.New("delete error"),
MMSYSERR_VALNOTFOUND: errors.New("value not found"),
MMSYSERR_NODRIVERCB: errors.New("no driver cb"),
WAVERR_BADFORMAT: errors.New("bad format"),
WAVERR_STILLPLAYING: errors.New("still playing"),
WAVERR_UNPREPARED: errors.New("unprepared"),
WAVERR_SYNC: errors.New("sync"),
}

View File

@@ -68,9 +68,9 @@ func (s *screen) VideoRecord(p prop.Media) (video.Reader, error) {
var dst image.RGBA
reader := s.reader
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
<-s.tick.C
return reader.Read().ToRGBA(&dst), nil
return reader.Read().ToRGBA(&dst), func() {}, nil
})
return r, nil
}

View File

@@ -103,10 +103,10 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
d.tick = tick
closed := d.closed
r := video.ReaderFunc(func() (image.Image, error) {
r := video.ReaderFunc(func() (image.Image, func(), error) {
select {
case <-closed:
return nil, io.EOF
return nil, func() {}, io.EOF
default:
}
@@ -130,7 +130,7 @@ func (d *dummy) VideoRecord(p prop.Media) (video.Reader, error) {
CStride: p.Width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, p.Width, p.Height),
}, nil
}, func() {}, nil
})
return r, nil

View File

@@ -6,6 +6,7 @@ import (
"image/jpeg"
)
func decodeMJPEG(frame []byte, width, height int) (image.Image, error) {
return jpeg.Decode(bytes.NewReader(frame))
func decodeMJPEG(frame []byte, width, height int) (image.Image, func(), error) {
img, err := jpeg.Decode(bytes.NewReader(frame))
return img, func() {}, err
}

View File

@@ -3,12 +3,12 @@ package frame
import "image"
type Decoder interface {
Decode(frame []byte, width, height int) (image.Image, error)
Decode(frame []byte, width, height int) (image.Image, func(), error)
}
// DecoderFunc is a proxy type for Decoder
type decoderFunc func(frame []byte, width, height int) (image.Image, error)
type decoderFunc func(frame []byte, width, height int) (image.Image, func(), error)
func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, error) {
func (f decoderFunc) Decode(frame []byte, width, height int) (image.Image, func(), error) {
return f(frame, width, height)
}

View File

@@ -5,13 +5,13 @@ import (
"image"
)
func decodeI420(frame []byte, width, height int) (image.Image, error) {
func decodeI420(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
cbi := yi + width*height/4
cri := cbi + width*height/4
if cri > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
}
return &image.YCbCr{
@@ -22,15 +22,15 @@ func decodeI420(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeNV21(frame []byte, width, height int) (image.Image, error) {
func decodeNV21(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi + width*height/2
if ci > len(frame) {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
}
var cb, cr []byte
@@ -47,5 +47,5 @@ func decodeNV21(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -12,13 +12,13 @@ import (
// void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height);
import "C"
func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -41,16 +41,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -73,5 +73,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -7,13 +7,13 @@ import (
"image"
)
func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -39,16 +39,16 @@ func decodeYUY2(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
@@ -74,5 +74,5 @@ func decodeUYVY(frame []byte, width, height int) (image.Image, error) {
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, nil
}, func() {}, nil
}

View File

@@ -27,7 +27,7 @@ func TestDecodeYUY2(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, err := decodeYUY2(input, width, height)
img, _, err := decodeYUY2(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -56,7 +56,7 @@ func TestDecodeUYVY(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, err := decodeUYVY(input, width, height)
img, _, err := decodeUYVY(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -77,7 +77,7 @@ func BenchmarkDecodeYUY2(b *testing.B) {
b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) {
input := make([]byte, sz.width*sz.height*2)
for i := 0; i < b.N; i++ {
_, err := decodeYUY2(input, sz.width, sz.height)
_, _, err := decodeYUY2(input, sz.width, sz.height)
if err != nil {
b.Fatal(err)
}

View File

@@ -5,13 +5,22 @@ import (
)
type Reader interface {
Read() (wave.Audio, error)
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (chunk wave.Audio, release func(), err error)
}
type ReaderFunc func() (wave.Audio, error)
type ReaderFunc func() (chunk wave.Audio, release func(), err error)
func (rf ReaderFunc) Read() (wave.Audio, error) {
return rf()
func (rf ReaderFunc) Read() (chunk wave.Audio, release func(), err error) {
chunk, release, err = rf()
return
}
// TransformFunc produces a new Reader that will produces a transformed audio

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core
}
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyChunk bool) Reader {
}
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (wave.Audio, error) {
data, err := reader.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
data, _, err := reader.Read()
chunk, _ := data.(wave.Audio)
return chunk, err
return chunk, func() {}, err
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}))
}
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (wave.Audio, error) {
data, err := source.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
data, _, err := source.Read()
img, _ := data.(wave.Audio)
return img, err
return img, func() {}, err
})
}

View File

@@ -14,18 +14,18 @@ func TestBroadcast(t *testing.T) {
SamplingRate: 48000,
})
source := ReaderFunc(func() (wave.Audio, error) {
return chunk, nil
source := ReaderFunc(func() (wave.Audio, func(), error) {
return chunk, func() {}, nil
})
broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read()
actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil {
t.Fatal(err)
}
actualWithoutCopy2, err := readerWithoutCopy2.Read()
actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil {
t.Fatal(err)
}
@@ -39,7 +39,7 @@ func TestBroadcast(t *testing.T) {
}
readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read()
actualWithCopy, _, err := readerWithCopy.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -13,15 +13,15 @@ func NewBuffer(nSamples int) TransformFunc {
var inBuff wave.Audio
return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
for {
if inBuff != nil && inBuff.ChunkInfo().Len >= nSamples {
break
}
buff, err := r.Read()
buff, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
switch b := buff.(type) {
case *wave.Float32Interleaved:
@@ -59,7 +59,7 @@ func NewBuffer(nSamples int) TransformFunc {
ib.Size.Len += b.Size.Len
default:
return nil, errUnsupported
return nil, func() {}, errUnsupported
}
}
switch ib := inBuff.(type) {
@@ -71,7 +71,7 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples
return &ibCopy, nil
return &ibCopy, func() {}, nil
case *wave.Float32Interleaved:
ibCopy := *ib
@@ -81,9 +81,9 @@ func NewBuffer(nSamples int) TransformFunc {
copy(ibCopy.Data, ib.Data)
ib.Data = ib.Data[n:]
ib.Size.Len -= nSamples
return &ibCopy, nil
return &ibCopy, func() {}, nil
}
return nil, errUnsupported
return nil, func() {}, errUnsupported
})
}
}

View File

@@ -49,16 +49,16 @@ func TestBuffer(t *testing.T) {
trans := NewBuffer(3)
var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) {
r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) {
iSent++
return input[iSent-1], nil
return input[iSent-1], func() {}, nil
}
return nil, io.EOF
return nil, func() {}, io.EOF
}))
for i := 0; ; i++ {
a, err := r.Read()
a, _, err := r.Read()
if err != nil {
if err == io.EOF && i >= len(expected) {
break

View File

@@ -13,12 +13,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
return func(r Reader) Reader {
var currentProp prop.Media
var chunkCount uint
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
var dirty bool
chunk, err := r.Read()
chunk, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
info := chunk.ChunkInfo()
@@ -32,6 +32,15 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
dirty = true
}
var latency time.Duration
if currentProp.SampleRate != 0 {
latency = time.Duration(chunk.ChunkInfo().Len) * time.Second / time.Nanosecond / time.Duration(currentProp.SampleRate)
}
if currentProp.Latency != latency {
currentProp.Latency = latency
dirty = true
}
// TODO: Also detect sample format changes?
// TODO: Add audio detect changes. As of now, there's no useful property to track.
@@ -40,7 +49,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
}
chunkCount++
return chunk, nil
return chunk, func() {}, nil
})
}
}

View File

@@ -11,12 +11,12 @@ import (
func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (wave.Audio, error) {
return ReaderFunc(func() (wave.Audio, func(), error) {
return wave.NewFloat32Interleaved(wave.ChunkInfo{
Len: 0,
Len: 960,
Channels: p.ChannelCount,
SamplingRate: p.SampleRate,
}), nil
}), func() {}, nil
}), func(newProp prop.Media) {
p = newProp
}
@@ -28,13 +28,14 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media
expected.ChannelCount = 2
expected.SampleRate = 48000
expected.Latency = time.Millisecond * 20
src, _ := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
actual = p
detectBeforeFirstChunk = true
})(src)
_, err := src.Read()
_, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -53,24 +54,22 @@ func TestDetectChanges(t *testing.T) {
var actual prop.Media
expected.ChannelCount = 2
expected.SampleRate = 48000
expected.Latency = 20 * time.Millisecond
src, update := buildSource(expected)
src = DetectChanges(time.Second, func(p prop.Media) {
actual = p
})(src)
for channelCount := 1; channelCount < 8; channelCount++ {
for sampleRate := 12000; sampleRate <= 48000; sampleRate += 4000 {
expected.ChannelCount = channelCount
expected.SampleRate = sampleRate
update(expected)
_, err := src.Read()
if err != nil {
t.Fatal(err)
}
expected.ChannelCount = channelCount
update(expected)
_, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual)
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Received an unexpected prop\nExpected:\n%v\nActual:\n%v\n", expected, actual)
}
}
})

View File

@@ -8,14 +8,14 @@ import (
// NewChannelMixer creates audio transform to mix audio channels.
func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
return func(r Reader) Reader {
return ReaderFunc(func() (wave.Audio, error) {
buff, err := r.Read()
return ReaderFunc(func() (wave.Audio, func(), error) {
buff, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
ci := buff.ChunkInfo()
if ci.Channels == channels {
return buff, nil
return buff, func() {}, nil
}
ci.Channels = channels
@@ -32,9 +32,9 @@ func NewChannelMixer(channels int, mixer mixer.ChannelMixer) TransformFunc {
mixed = wave.NewFloat32NonInterleaved(ci)
}
if err := mixer.Mix(mixed, buff); err != nil {
return nil, err
return nil, func() {}, err
}
return mixed, nil
return mixed, func() {}, nil
})
}
}

View File

@@ -34,16 +34,16 @@ func TestMixer(t *testing.T) {
trans := NewChannelMixer(1, &mixer.MonoMixer{})
var iSent int
r := trans(ReaderFunc(func() (wave.Audio, error) {
r := trans(ReaderFunc(func() (wave.Audio, func(), error) {
if iSent < len(input) {
iSent++
return input[iSent-1], nil
return input[iSent-1], func() {}, nil
}
return nil, io.EOF
return nil, func() {}, io.EOF
}))
for i := 0; ; i++ {
a, err := r.Read()
a, _, err := r.Read()
if err != nil {
if err == io.EOF && i >= len(expected) {
break

View File

@@ -127,10 +127,10 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
func (broadcaster *Broadcaster) NewReader(copyFn func(interface{}) interface{}) Reader {
currentCount := broadcaster.buffer.lastCount()
return ReaderFunc(func() (data interface{}, err error) {
return ReaderFunc(func() (data interface{}, release func(), err error) {
currentCount++
if push := broadcaster.buffer.acquire(currentCount); push != nil {
data, err = broadcaster.source.Load().(Reader).Read()
data, _, err = broadcaster.source.Load().(Reader).Read()
push(&broadcasterData{
data: data,
err: err,

View File

@@ -57,7 +57,7 @@ func TestBroadcast(t *testing.T) {
frameCount := 0
frameSent := 0
lastSend := time.Now()
src = ReaderFunc(func() (interface{}, error) {
src = ReaderFunc(func() (interface{}, func(), error) {
if pauseCond.src && frameSent == 30 {
time.Sleep(time.Second)
}
@@ -74,7 +74,7 @@ func TestBroadcast(t *testing.T) {
frame := frames[frameCount]
frameCount++
frameSent++
return frame, nil
return frame, func() {}, nil
})
broadcaster := NewBroadcaster(src, nil)
var done uint32
@@ -95,7 +95,7 @@ func TestBroadcast(t *testing.T) {
if pauseCond.dst && count == 30 {
time.Sleep(time.Second)
}
frame, err := reader.Read()
frame, _, err := reader.Read()
if err != nil {
t.Error(err)
}

View File

@@ -3,12 +3,21 @@ package io
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
// to provide strong type.
type Reader interface {
Read() (interface{}, error)
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (data interface{}, release func(), err error)
}
// ReaderFunc is a proxy type for Reader
type ReaderFunc func() (interface{}, error)
type ReaderFunc func() (data interface{}, release func(), err error)
func (f ReaderFunc) Read() (interface{}, error) {
return f()
func (f ReaderFunc) Read() (data interface{}, release func(), err error) {
data, release, err = f()
return
}

View File

@@ -27,7 +27,7 @@ func NewBroadcaster(source Reader, config *BroadcasterConfig) *Broadcaster {
coreConfig = config.Core
}
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, error) {
broadcaster := io.NewBroadcaster(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}), coreConfig)
@@ -51,16 +51,16 @@ func (broadcaster *Broadcaster) NewReader(copyFrame bool) Reader {
}
reader := broadcaster.ioBroadcaster.NewReader(copyFn)
return ReaderFunc(func() (image.Image, error) {
data, err := reader.Read()
return ReaderFunc(func() (image.Image, func(), error) {
data, _, err := reader.Read()
img, _ := data.(image.Image)
return img, err
return img, func() {}, err
})
}
// ReplaceSource replaces the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, error) {
return broadcaster.ioBroadcaster.ReplaceSource(io.ReaderFunc(func() (interface{}, func(), error) {
return source.Read()
}))
}
@@ -68,9 +68,9 @@ func (broadcaster *Broadcaster) ReplaceSource(source Reader) error {
// Source retrieves the underlying source. This operation is thread safe.
func (broadcaster *Broadcaster) Source() Reader {
source := broadcaster.ioBroadcaster.Source()
return ReaderFunc(func() (image.Image, error) {
data, err := source.Read()
return ReaderFunc(func() (image.Image, func(), error) {
data, _, err := source.Read()
img, _ := data.(image.Image)
return img, err
return img, func() {}, err
})
}

View File

@@ -9,18 +9,18 @@ import (
func TestBroadcast(t *testing.T) {
resolution := image.Rect(0, 0, 1920, 1080)
img := image.NewGray(resolution)
source := ReaderFunc(func() (image.Image, error) {
return img, nil
source := ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
})
broadcaster := NewBroadcaster(source, nil)
readerWithoutCopy1 := broadcaster.NewReader(false)
readerWithoutCopy2 := broadcaster.NewReader(false)
actualWithoutCopy1, err := readerWithoutCopy1.Read()
actualWithoutCopy1, _, err := readerWithoutCopy1.Read()
if err != nil {
t.Fatal(err)
}
actualWithoutCopy2, err := readerWithoutCopy2.Read()
actualWithoutCopy2, _, err := readerWithoutCopy2.Read()
if err != nil {
t.Fatal(err)
}
@@ -34,7 +34,7 @@ func TestBroadcast(t *testing.T) {
}
readerWithCopy := broadcaster.NewReader(true)
actualWithCopy, err := readerWithCopy.Read()
actualWithCopy, _, err := readerWithCopy.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -63,10 +63,10 @@ func imageToYCbCr(dst *image.YCbCr, src image.Image) {
// ToI420 converts r to a new reader that will output images in I420 format
func ToI420(r Reader) Reader {
var yuvImg image.YCbCr
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imageToYCbCr(&yuvImg, img)
@@ -79,11 +79,11 @@ func ToI420(r Reader) Reader {
i422ToI420(&yuvImg)
case image.YCbCrSubsampleRatio420:
default:
return nil, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio)
return nil, func() {}, fmt.Errorf("unsupported pixel format: %s", yuvImg.SubsampleRatio)
}
yuvImg.SubsampleRatio = image.YCbCrSubsampleRatio420
return &yuvImg, nil
return &yuvImg, func() {}, nil
})
}
@@ -130,13 +130,13 @@ func imageToRGBA(dst *image.RGBA, src image.Image) {
// ToRGBA converts r to a new reader that will output images in RGBA format
func ToRGBA(r Reader) Reader {
var dst image.RGBA
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
imageToRGBA(&dst, img)
return &dst, nil
return &dst, func() {}, nil
})
}

View File

@@ -144,10 +144,10 @@ func TestToI420(t *testing.T) {
for name, c := range cases {
c := c
t.Run(name, func(t *testing.T) {
r := ToI420(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -199,10 +199,10 @@ func TestToRGBA(t *testing.T) {
for name, c := range cases {
c := c
t.Run(name, func(t *testing.T) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -225,12 +225,12 @@ func BenchmarkToI420(b *testing.B) {
for name, img := range cases {
img := img
b.Run(name, func(b *testing.B) {
r := ToI420(ReaderFunc(func() (image.Image, error) {
return img, nil
r := ToI420(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}
@@ -253,12 +253,12 @@ func BenchmarkToRGBA(b *testing.B) {
for name, img := range cases {
img := img
b.Run(name, func(b *testing.B) {
r := ToRGBA(ReaderFunc(func() (image.Image, error) {
return img, nil
r := ToRGBA(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

View File

@@ -14,12 +14,12 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
var currentProp prop.Media
var lastTaken time.Time
var frames uint
return ReaderFunc(func() (image.Image, error) {
return ReaderFunc(func() (image.Image, func(), error) {
var dirty bool
img, err := r.Read()
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
bounds := img.Bounds()
@@ -52,7 +52,7 @@ func DetectChanges(interval time.Duration, onChange func(prop.Media)) TransformF
}
frames++
return img, nil
return img, func() {}, nil
})
}
}

View File

@@ -12,8 +12,8 @@ import (
func BenchmarkDetectChanges(b *testing.B) {
var src Reader
src = ReaderFunc(func() (image.Image, error) {
return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), nil
src = ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, 1920, 1080)), func() {}, nil
})
b.Run("WithoutDetectChanges", func(b *testing.B) {
@@ -40,8 +40,8 @@ func BenchmarkDetectChanges(b *testing.B) {
func TestDetectChanges(t *testing.T) {
buildSource := func(p prop.Media) (Reader, func(prop.Media)) {
return ReaderFunc(func() (image.Image, error) {
return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), nil
return ReaderFunc(func() (image.Image, func(), error) {
return image.NewRGBA(image.Rect(0, 0, p.Width, p.Height)), func() {}, nil
}), func(newProp prop.Media) {
p = newProp
}
@@ -86,7 +86,7 @@ func TestDetectChanges(t *testing.T) {
detectBeforeFirstFrame = true
})(src)
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -113,7 +113,7 @@ func TestDetectChanges(t *testing.T) {
expected.Width = width
expected.Height = height
update(expected)
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}
@@ -143,7 +143,7 @@ func TestDetectChanges(t *testing.T) {
})(src)
for count < 3 {
frame, err := src.Read()
frame, _, err := src.Read()
if err != nil {
t.Fatal(err)
}

View File

@@ -156,10 +156,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
}
}
return ReaderFunc(func() (image.Image, error) {
img, err := r.Read()
return ReaderFunc(func() (image.Image, func(), error) {
img, _, err := r.Read()
if err != nil {
return nil, err
return nil, func() {}, err
}
switch v := img.(type) {
@@ -169,7 +169,7 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, rect, v, v.Rect, draw.Src, nil)
cloned := *dst // clone metadata
return &cloned, nil
return &cloned, func() {}, nil
case *image.YCbCr:
ycbcrRealloc(v)
@@ -184,10 +184,10 @@ func Scale(width, height int, scaler Scaler) TransformFunc {
scalerCached.Scale(dst, dst.Bounds(), src, src.Bounds(), draw.Src, nil)
cloned := *(imgScaled.(*image.YCbCr)) // clone metadata
return &cloned, nil
return &cloned, func() {}, nil
default:
return nil, errUnsupportedImageType
return nil, func() {}, errUnsupportedImageType
}
})
}

View File

@@ -215,11 +215,11 @@ func TestScale(t *testing.T) {
c := c
t.Run(name, func(t *testing.T) {
trans := Scale(c.width, c.height, algo)
r := trans(ReaderFunc(func() (image.Image, error) {
return c.src, nil
r := trans(ReaderFunc(func() (image.Image, func(), error) {
return c.src, func() {}, nil
}))
for i := 0; i < 4; i++ {
out, err := r.Read()
out, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@@ -261,12 +261,12 @@ func BenchmarkScale(b *testing.B) {
img := img
b.Run(name, func(b *testing.B) {
trans := Scale(640, 360, algo)
r := trans(ReaderFunc(func() (image.Image, error) {
return img, nil
r := trans(ReaderFunc(func() (image.Image, func(), error) {
return img, func() {}, nil
}))
for i := 0; i < b.N; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
b.Fatalf("Unexpected error: %v", err)
}

View File

@@ -10,16 +10,16 @@ import (
func Throttle(rate float32) TransformFunc {
return func(r Reader) Reader {
ticker := time.NewTicker(time.Duration(int64(float64(time.Second) / float64(rate))))
return ReaderFunc(func() (image.Image, error) {
return ReaderFunc(func() (image.Image, func(), error) {
for {
img, err := r.Read()
img, _, err := r.Read()
if err != nil {
ticker.Stop()
return nil, err
return nil, func() {}, err
}
select {
case <-ticker.C:
return img, nil
return img, func() {}, nil
default:
}
}

View File

@@ -19,14 +19,14 @@ func TestThrottle(t *testing.T) {
var cntPush int
trans := Throttle(50)
r := trans(ReaderFunc(func() (image.Image, error) {
r := trans(ReaderFunc(func() (image.Image, func(), error) {
<-ticker.C
cntPush++
return img, nil
return img, func() {}, nil
}))
for i := 0; i < 20; i++ {
_, err := r.Read()
_, _, err := r.Read()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@@ -5,13 +5,22 @@ import (
)
type Reader interface {
Read() (img image.Image, err error)
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (img image.Image, release func(), err error)
}
type ReaderFunc func() (img image.Image, err error)
type ReaderFunc func() (img image.Image, release func(), err error)
func (rf ReaderFunc) Read() (img image.Image, err error) {
return rf()
func (rf ReaderFunc) Read() (img image.Image, release func(), err error) {
img, release, err = rf()
return
}
// TransformFunc produces a new Reader that will produces a transformed video

View File

@@ -42,10 +42,17 @@ func prettifyStruct(i interface{}) string {
value := obj.Field(i)
padding := strings.Repeat(" ", level)
if value.Kind() == reflect.Struct {
switch value.Kind() {
case reflect.Struct:
rows = append(rows, fmt.Sprintf("%s%v:", padding, field.Name))
addRows(level+1, value)
} else {
case reflect.Chan, reflect.Func, reflect.Interface, reflect.Map, reflect.Ptr, reflect.Slice:
if value.IsNil() {
rows = append(rows, fmt.Sprintf("%s%v: any", padding, field.Name))
} else {
rows = append(rows, fmt.Sprintf("%s%v: %v", padding, field.Name, value))
}
default:
rows = append(rows, fmt.Sprintf("%s%v: %v", padding, field.Name, value))
}
}

21
rtpreader.go Normal file
View File

@@ -0,0 +1,21 @@
package mediadevices
import "github.com/pion/rtp"
type RTPReadCloser interface {
Read() (pkts []*rtp.Packet, release func(), err error)
Close() error
}
type rtpReadCloserImpl struct {
readFn func() ([]*rtp.Packet, func(), error)
closeFn func() error
}
func (r *rtpReadCloserImpl) Read() ([]*rtp.Packet, func(), error) {
return r.readFn()
}
func (r *rtpReadCloserImpl) Close() error {
return r.closeFn()
}

View File

@@ -3,33 +3,30 @@ package mediadevices
import (
"math"
"time"
"github.com/pion/webrtc/v2/pkg/media"
)
type samplerFunc func(b []byte) error
type samplerFunc func() uint32
// newVideoSampler creates a video sampler that uses the actual video frame rate and
// the codec's clock rate to come up with a duration for each sample.
func newVideoSampler(t LocalTrack) samplerFunc {
clockRate := float64(t.Codec().ClockRate)
func newVideoSampler(clockRate uint32) samplerFunc {
clockRateFloat := float64(clockRate)
lastTimestamp := time.Now()
return samplerFunc(func(b []byte) error {
return samplerFunc(func() uint32 {
now := time.Now()
duration := now.Sub(lastTimestamp).Seconds()
samples := uint32(math.Round(clockRate * duration))
samples := uint32(math.Round(clockRateFloat * duration))
lastTimestamp = now
return t.WriteSample(media.Sample{Data: b, Samples: samples})
return samples
})
}
// newAudioSampler creates a audio sampler that uses a fixed latency and
// the codec's clock rate to come up with a duration for each sample.
func newAudioSampler(t LocalTrack, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds()))
return samplerFunc(func(b []byte) error {
return t.WriteSample(media.Sample{Data: b, Samples: samples})
func newAudioSampler(clockRate uint32, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(clockRate) * latency.Seconds()))
return samplerFunc(func() uint32 {
return samples
})
}

1
source.go Normal file
View File

@@ -0,0 +1 @@
package mediadevices

514
track.go
View File

@@ -2,239 +2,403 @@ package mediadevices
import (
"errors"
"image"
"math/rand"
"sync"
"github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/wave"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
// Tracker is an interface that represent MediaStreamTrack
var (
errInvalidDriverType = errors.New("invalid driver type")
errNotFoundPeerConnection = errors.New("failed to find given peer connection")
)
// Source is a generic representation of a media source
type Source interface {
ID() string
Close() error
}
// VideoSource is a specific type of media source that emits a series of video frames
type VideoSource interface {
video.Reader
Source
}
// AudioSource is a specific type of media source that emits a series of audio chunks
type AudioSource interface {
audio.Reader
Source
}
// Track is an interface that represent MediaStreamTrack
// Reference: https://w3c.github.io/mediacapture-main/#mediastreamtrack
type Tracker interface {
Track() *webrtc.Track
LocalTrack() LocalTrack
Stop()
Kind() MediaDeviceType
type Track interface {
Source
// OnEnded registers a handler to receive an error from the media stream track.
// If the error is already occured before registering, the handler will be
// immediately called.
OnEnded(func(error))
Kind() MediaDeviceType
// Bind binds the current track source to the given peer connection. In Pion/webrtc v3, the bind
// call will happen automatically after the SDP negotiation. Users won't need to call this manually.
Bind(*webrtc.PeerConnection) (*webrtc.Track, error)
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
// be called automatically in the future.
Unbind(*webrtc.PeerConnection) error
// NewRTPReader creates a new reader from the source. The reader will encode the source, and packetize
// the encoded data in RTP format with given mtu size.
NewRTPReader(codecName string, mtu int) (RTPReadCloser, error)
}
type LocalTrack interface {
WriteSample(s media.Sample) error
Codec() *webrtc.RTPCodec
ID() string
Kind() webrtc.RTPCodecType
type baseTrack struct {
Source
err error
onErrorHandler func(error)
mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
selector *CodecSelector
activePeerConnections map[*webrtc.PeerConnection]chan<- chan<- struct{}
}
type track struct {
localTrack LocalTrack
d driver.Driver
sample samplerFunc
encoder codec.ReadCloser
onErrorHandler func(error)
err error
mu sync.Mutex
endOnce sync.Once
kind MediaDeviceType
}
func newTrack(opts *MediaDevicesOptions, d driver.Driver, constraints MediaTrackConstraints) (*track, error) {
var encoderBuilders []encoderBuilder
var rtpCodecs []*webrtc.RTPCodec
var buildSampler func(t LocalTrack) samplerFunc
var kind MediaDeviceType
var err error
err = d.Open()
if err != nil {
return nil, err
func newBaseTrack(source Source, kind MediaDeviceType, selector *CodecSelector) *baseTrack {
return &baseTrack{
Source: source,
kind: kind,
selector: selector,
activePeerConnections: make(map[*webrtc.PeerConnection]chan<- chan<- struct{}),
}
switch r := d.(type) {
case driver.VideoRecorder:
kind = VideoInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeVideo]
buildSampler = newVideoSampler
encoderBuilders, err = newVideoEncoderBuilders(r, constraints)
case driver.AudioRecorder:
kind = AudioInput
rtpCodecs = opts.codecs[webrtc.RTPCodecTypeAudio]
buildSampler = func(t LocalTrack) samplerFunc {
return newAudioSampler(t, constraints.selectedMedia.Latency)
}
encoderBuilders, err = newAudioEncoderBuilders(r, constraints)
default:
err = errors.New("newTrack: invalid driver type")
}
if err != nil {
d.Close()
return nil, err
}
for _, builder := range encoderBuilders {
var matchedRTPCodec *webrtc.RTPCodec
for _, rtpCodec := range rtpCodecs {
if rtpCodec.Name == builder.name {
matchedRTPCodec = rtpCodec
break
}
}
if matchedRTPCodec == nil {
continue
}
localTrack, err := opts.trackGenerator(
matchedRTPCodec.PayloadType,
rand.Uint32(),
d.ID(),
matchedRTPCodec.Type.String(),
matchedRTPCodec,
)
if err != nil {
continue
}
encoder, err := builder.build()
if err != nil {
continue
}
t := track{
localTrack: localTrack,
sample: buildSampler(localTrack),
d: d,
encoder: encoder,
kind: kind,
}
go t.start()
return &t, nil
}
d.Close()
return nil, errors.New("newTrack: failed to find a matching codec")
}
// Kind returns track's kind
func (t *track) Kind() MediaDeviceType {
return t.kind
func (track *baseTrack) Kind() MediaDeviceType {
return track.kind
}
// OnEnded sets an error handler. When a track has been created and started, if an
// error occurs, handler will get called with the error given to the parameter.
func (t *track) OnEnded(handler func(error)) {
t.mu.Lock()
t.onErrorHandler = handler
err := t.err
t.mu.Unlock()
func (track *baseTrack) OnEnded(handler func(error)) {
track.mu.Lock()
track.onErrorHandler = handler
err := track.err
track.mu.Unlock()
if err != nil && handler != nil {
// Already errored.
t.endOnce.Do(func() {
track.endOnce.Do(func() {
handler(err)
})
}
}
// onError is a callback when an error occurs
func (t *track) onError(err error) {
t.mu.Lock()
t.err = err
handler := t.onErrorHandler
t.mu.Unlock()
func (track *baseTrack) onError(err error) {
track.mu.Lock()
track.err = err
handler := track.onErrorHandler
track.mu.Unlock()
if handler != nil {
t.endOnce.Do(func() {
track.endOnce.Do(func() {
handler(err)
})
}
}
// start starts the data flow from the driver all the way to the localTrack
func (t *track) start() {
for {
buff, err := t.encoder.Read()
func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sample samplerFunc) (*webrtc.Track, error) {
track.mu.Lock()
defer track.mu.Unlock()
webrtcTrack, err := pc.NewTrack(selectedCodec.PayloadType, rand.Uint32(), track.ID(), selectedCodec.MimeType)
if err != nil {
return nil, err
}
signalCh := make(chan chan<- struct{})
track.activePeerConnections[pc] = signalCh
go func() {
var doneCh chan<- struct{}
defer func() {
encodedReader.Close()
// When there's another call to unbind, it won't block since we mark the signalCh to be closed
close(signalCh)
if doneCh != nil {
close(doneCh)
}
}()
for {
select {
case doneCh = <-signalCh:
return
default:
}
buff, _, err := encodedReader.Read()
if err != nil {
track.onError(err)
return
}
sampleCount := sample()
err = webrtcTrack.WriteSample(media.Sample{
Data: buff,
Samples: sampleCount,
})
if err != nil {
track.onError(err)
return
}
}
}()
return webrtcTrack, nil
}
func (track *baseTrack) unbind(pc *webrtc.PeerConnection) error {
track.mu.Lock()
defer track.mu.Unlock()
ch, ok := track.activePeerConnections[pc]
if !ok {
return errNotFoundPeerConnection
}
doneCh := make(chan struct{})
ch <- doneCh
<-doneCh
delete(track.activePeerConnections, pc)
return nil
}
func newTrackFromDriver(d driver.Driver, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
if err := d.Open(); err != nil {
return nil, err
}
switch recorder := d.(type) {
case driver.VideoRecorder:
return newVideoTrackFromDriver(d, recorder, constraints, selector)
case driver.AudioRecorder:
return newAudioTrackFromDriver(d, recorder, constraints, selector)
default:
panic(errInvalidDriverType)
}
}
// VideoTrack is a specific track type that contains video source which allows multiple readers to access, and manipulate.
type VideoTrack struct {
*baseTrack
*video.Broadcaster
}
// NewVideoTrack constructs a new VideoTrack
func NewVideoTrack(source VideoSource, selector *CodecSelector) Track {
return newVideoTrackFromReader(source, source, selector)
}
func newVideoTrackFromReader(source Source, reader video.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, VideoInput, selector)
wrappedReader := video.ReaderFunc(func() (img image.Image, release func(), err error) {
img, _, err = reader.Read()
if err != nil {
t.onError(err)
return
base.onError(err)
}
return img, func() {}, err
})
if err := t.sample(buff); err != nil {
t.onError(err)
return
}
// TODO: Allow users to configure broadcaster
broadcaster := video.NewBroadcaster(wrappedReader, nil)
return &VideoTrack{
baseTrack: base,
Broadcaster: broadcaster,
}
}
// Stop stops the underlying driver and encoder
func (t *track) Stop() {
t.d.Close()
t.encoder.Close()
}
func (t *track) Track() *webrtc.Track {
return t.localTrack.(*webrtc.Track)
}
func (t *track) LocalTrack() LocalTrack {
return t.localTrack
}
// encoderBuilder is a generic encoder builder that acts as a delegator for codec.VideoEncoderBuilder and
// codec.AudioEncoderBuilder. The idea of having a delegator is to reduce redundant codes that are being
// duplicated for managing video and audio.
type encoderBuilder struct {
name string
build func() (codec.ReadCloser, error)
}
// newVideoEncoderBuilders transforms video given by VideoRecorder with the video transformer that is passed through
// constraints and create a list of generic encoder builders
func newVideoEncoderBuilders(vr driver.VideoRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := vr.VideoRecord(constraints.selectedMedia)
// newVideoTrackFromDriver is an internal video track creation from driver
func newVideoTrackFromDriver(d driver.Driver, recorder driver.VideoRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
reader, err := recorder.VideoRecord(constraints.selectedMedia)
if err != nil {
return nil, err
}
if constraints.VideoTransform != nil {
r = constraints.VideoTransform(r)
}
encoderBuilders := make([]encoderBuilder, len(constraints.VideoEncoderBuilders))
for i, b := range constraints.VideoEncoderBuilders {
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildVideoEncoder(r, constraints.selectedMedia)
}
}
return encoderBuilders, nil
return newVideoTrackFromReader(d, reader, selector), nil
}
// newAudioEncoderBuilders transforms audio given by AudioRecorder with the audio transformer that is passed through
// constraints and create a list of generic encoder builders
func newAudioEncoderBuilders(ar driver.AudioRecorder, constraints MediaTrackConstraints) ([]encoderBuilder, error) {
r, err := ar.AudioRecord(constraints.selectedMedia)
// Transform transforms the underlying source by applying the given fns in serial order
func (track *VideoTrack) Transform(fns ...video.TransformFunc) {
src := track.Broadcaster.Source()
track.Broadcaster.ReplaceSource(video.Merge(fns...)(src))
}
func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
}
if constraints.AudioTransform != nil {
r = constraints.AudioTransform(r)
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo)
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(reader, inputProp, wantCodecs...)
if err != nil {
return nil, err
}
encoderBuilders := make([]encoderBuilder, len(constraints.AudioEncoderBuilders))
for i, b := range constraints.AudioEncoderBuilders {
encoderBuilders[i].name = b.RTPCodec().Name
encoderBuilders[i].build = func() (codec.ReadCloser, error) {
return b.BuildAudioEncoder(r, constraints.selectedMedia)
}
}
return encoderBuilders, nil
return track.bind(pc, encodedReader, selectedCodec, newVideoSampler(selectedCodec.ClockRate))
}
func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
sample := newVideoSampler(selectedCodec.ClockRate)
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
return &rtpReadCloserImpl{
readFn: func() ([]*rtp.Packet, func(), error) {
encoded, release, err := encodedReader.Read()
if err != nil {
encodedReader.Close()
track.onError(err)
return nil, func() {}, err
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
// manipulate.
type AudioTrack struct {
*baseTrack
*audio.Broadcaster
}
// NewAudioTrack constructs a new VideoTrack
func NewAudioTrack(source AudioSource, selector *CodecSelector) Track {
return newAudioTrackFromReader(source, source, selector)
}
func newAudioTrackFromReader(source Source, reader audio.Reader, selector *CodecSelector) Track {
base := newBaseTrack(source, AudioInput, selector)
wrappedReader := audio.ReaderFunc(func() (chunk wave.Audio, release func(), err error) {
chunk, _, err = reader.Read()
if err != nil {
base.onError(err)
}
return chunk, func() {}, err
})
// TODO: Allow users to configure broadcaster
broadcaster := audio.NewBroadcaster(wrappedReader, nil)
return &AudioTrack{
baseTrack: base,
Broadcaster: broadcaster,
}
}
// newAudioTrackFromDriver is an internal audio track creation from driver
func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, constraints MediaTrackConstraints, selector *CodecSelector) (Track, error) {
reader, err := recorder.AudioRecord(constraints.selectedMedia)
if err != nil {
return nil, err
}
// FIXME: The current audio detection and audio encoder can only work with a static latency. Since the latency from the driver
// can fluctuate, we need to stabilize it. Maybe there's a better way for doing this?
reader = audio.NewBuffer(int(constraints.selectedMedia.Latency.Seconds() * float64(constraints.selectedMedia.SampleRate)))(reader)
return newAudioTrackFromReader(d, reader, selector), nil
}
// Transform transforms the underlying source by applying the given fns in serial order
func (track *AudioTrack) Transform(fns ...audio.TransformFunc) {
src := track.Broadcaster.Source()
track.Broadcaster.ReplaceSource(audio.Merge(fns...)(src))
}
func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio)
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(reader, inputProp, wantCodecs...)
if err != nil {
return nil, err
}
return track.bind(pc, encodedReader, selectedCodec, newAudioSampler(selectedCodec.ClockRate, inputProp.Latency))
}
func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *AudioTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, selectedCodec, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
sample := newVideoSampler(selectedCodec.ClockRate)
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
return &rtpReadCloserImpl{
readFn: func() ([]*rtp.Packet, func(), error) {
encoded, release, err := encodedReader.Read()
if err != nil {
encodedReader.Close()
track.onError(err)
return nil, func() {}, err
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}

View File

@@ -10,7 +10,7 @@ func TestOnEnded(t *testing.T) {
errExpected := errors.New("an error")
t.Run("ErrorAfterRegister", func(t *testing.T) {
tr := &track{}
tr := &baseTrack{}
called := make(chan error, 1)
tr.OnEnded(func(error) {
@@ -35,7 +35,7 @@ func TestOnEnded(t *testing.T) {
})
t.Run("ErrorBeforeRegister", func(t *testing.T) {
tr := &track{}
tr := &baseTrack{}
tr.onError(errExpected)