Compare commits

..

1 Commits

Author SHA1 Message Date
Lukas Herman
7df20d004e Add pool optimization to frame decoders 2020-10-30 21:12:19 -07:00
28 changed files with 736 additions and 944 deletions

View File

@@ -58,16 +58,17 @@ func (selector *CodecSelector) Populate(setting *webrtc.MediaEngine) {
}
}
func (selector *CodecSelector) selectVideoCodecByNames(reader video.Reader, inputProp prop.Media, codecNames ...string) (codec.ReadCloser, *codec.RTPCodec, error) {
func (selector *CodecSelector) selectVideoCodec(wantCodecs []*webrtc.RTPCodec, reader video.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.VideoEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range codecNames {
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.videoEncoders {
if encoder.RTPCodec().Name == wantCodec {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildVideoEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
@@ -86,26 +87,17 @@ outer:
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectVideoCodec(reader video.Reader, inputProp prop.Media, codecs ...*webrtc.RTPCodec) (codec.ReadCloser, *codec.RTPCodec, error) {
var codecNames []string
for _, codec := range codecs {
codecNames = append(codecNames, codec.Name)
}
return selector.selectVideoCodecByNames(reader, inputProp, codecNames...)
}
func (selector *CodecSelector) selectAudioCodecByNames(reader audio.Reader, inputProp prop.Media, codecNames ...string) (codec.ReadCloser, *codec.RTPCodec, error) {
func (selector *CodecSelector) selectAudioCodec(wantCodecs []*webrtc.RTPCodec, reader audio.Reader, inputProp prop.Media) (codec.ReadCloser, *codec.RTPCodec, error) {
var selectedEncoder codec.AudioEncoderBuilder
var encodedReader codec.ReadCloser
var errReasons []string
var err error
outer:
for _, wantCodec := range codecNames {
for _, wantCodec := range wantCodecs {
name := wantCodec.Name
for _, encoder := range selector.audioEncoders {
if encoder.RTPCodec().Name == wantCodec {
if encoder.RTPCodec().Name == name {
encodedReader, err = encoder.BuildAudioEncoder(reader, inputProp)
if err == nil {
selectedEncoder = encoder
@@ -123,13 +115,3 @@ outer:
return encodedReader, selectedEncoder.RTPCodec(), nil
}
func (selector *CodecSelector) selectAudioCodec(reader audio.Reader, inputProp prop.Media, codecs ...*webrtc.RTPCodec) (codec.ReadCloser, *codec.RTPCodec, error) {
var codecNames []string
for _, codec := range codecs {
codecNames = append(codecNames, codec.Name)
}
return selector.selectAudioCodecByNames(reader, inputProp, codecNames...)
}

Binary file not shown.

View File

@@ -1,191 +0,0 @@
package main
import (
"fmt"
"image"
"io/ioutil"
"log"
"net"
"os"
pigo "github.com/esimov/pigo/core"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use h264 video encoder
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/prop"
)
const (
confidenceLevel = 9.5
mtu = 1000
thickness = 5
)
var (
cascade []byte
classifier *pigo.Pigo
)
func must(err error) {
if err != nil {
panic(err)
}
}
func detectFaces(frame *image.YCbCr) []pigo.Detection {
bounds := frame.Bounds()
cascadeParams := pigo.CascadeParams{
MinSize: 100,
MaxSize: 600,
ShiftFactor: 0.15,
ScaleFactor: 1.1,
ImageParams: pigo.ImageParams{
Pixels: frame.Y, // Y in YCbCr should be enough to detect faces
Rows: bounds.Dy(),
Cols: bounds.Dx(),
Dim: bounds.Dx(),
},
}
// Run the classifier over the obtained leaf nodes and return the detection results.
// The result contains quadruplets representing the row, column, scale and detection score.
dets := classifier.RunCascade(cascadeParams, 0.0)
// Calculate the intersection over union (IoU) of two clusters.
dets = classifier.ClusterDetections(dets, 0)
return dets
}
func drawRect(frame *image.YCbCr, x0, y0, size int) {
if x0 < 0 {
x0 = 0
}
if y0 < 0 {
y0 = 0
}
width := frame.Bounds().Dx()
height := frame.Bounds().Dy()
x1 := x0 + size
y1 := y0 + size
if x1 >= width {
x1 = width - 1
}
if y1 >= height {
y1 = height - 1
}
convert := func(x, y int) int {
return y*width + x
}
for x := x0; x < x1; x++ {
for t := 0; t < thickness; t++ {
frame.Y[convert(x, y0+t)] = 0
frame.Y[convert(x, y1-t)] = 0
}
}
for y := y0; y < y1; y++ {
for t := 0; t < thickness; t++ {
frame.Y[convert(x0+t, y)] = 0
frame.Y[convert(x1-t, y)] = 0
}
}
}
func detectFace(r video.Reader) video.Reader {
return video.ReaderFunc(func() (img image.Image, release func(), err error) {
img, release, err = r.Read()
if err != nil {
return
}
yuv := img.(*image.YCbCr)
dets := detectFaces(yuv)
for _, det := range dets {
if det.Q < confidenceLevel {
continue
}
drawRect(yuv, det.Col-det.Scale/2, det.Row-det.Scale/2, det.Scale)
}
return
})
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s host:port\n", os.Args[0])
return
}
dest := os.Args[1]
// prepare face detector
var err error
cascade, err = ioutil.ReadFile("facefinder")
if err != nil {
log.Fatalf("Error reading the cascade file: %s", err)
}
p := pigo.NewPigo()
// Unpack the binary file. This will return the number of cascade trees,
// the tree depth, the threshold and the prediction from tree's leaf nodes.
classifier, err = p.Unpack(cascade)
if err != nil {
log.Fatalf("Error unpacking the cascade file: %s", err)
}
vp8Params, err := vpx.NewVP8Params()
must(err)
vp8Params.BitRate = 1_000_000 // 100kbps
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
)
mediaStream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormatExact(frame.FormatUYVY)
c.Width = prop.Int(640)
c.Height = prop.Int(480)
},
Codec: codecSelector,
})
must(err)
// since we're trying to access the raw data, we need to cast Track to its real type, *mediadevices.VideoTrack
videoTrack := mediaStream.GetVideoTracks()[0].(*mediadevices.VideoTrack)
defer videoTrack.Close()
videoTrack.Transform(detectFace)
rtpReader, err := videoTrack.NewRTPReader(vp8Params.RTPCodec().Name, mtu)
must(err)
addr, err := net.ResolveUDPAddr("udp", dest)
must(err)
conn, err := net.DialUDP("udp", nil, addr)
must(err)
buff := make([]byte, mtu)
for {
pkts, release, err := rtpReader.Read()
must(err)
for _, pkt := range pkts {
n, err := pkt.MarshalTo(buff)
must(err)
_, err = conn.Write(buff[:n])
must(err)
}
release()
}
}

View File

@@ -1,30 +0,0 @@
## Instructions
### Download rtpexample
```
go get github.com/pion/mediadevices/examples/rtp
```
### Listen RTP
Install GStreamer and run:
```
gst-launch-1.0 udpsrc port=5000 caps=application/x-rtp,encode-name=VP8 \
! rtpvp8depay ! vp8dec ! videoconvert ! autovideosink
```
Or run VLC media plyer:
```
vlc ./vp8.sdp
```
### Run rtp
Run `rtp localhost:5000`
A video should start playing in your GStreamer or VLC window.
It's not WebRTC, but pure RTP.
Congrats, you have used pion-MediaDevices! Now start building something cool

View File

@@ -1,76 +0,0 @@
package main
import (
"fmt"
"net"
"os"
"github.com/pion/mediadevices"
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
_ "github.com/pion/mediadevices/pkg/driver/camera" // This is required to register camera adapter
"github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/prop"
)
const (
mtu = 1000
)
func must(err error) {
if err != nil {
panic(err)
}
}
func main() {
if len(os.Args) != 2 {
fmt.Printf("usage: %s host:port\n", os.Args[0])
return
}
dest := os.Args[1]
vp8Params, err := vpx.NewVP8Params()
must(err)
vp8Params.BitRate = 100000 // 100kbps
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&vp8Params),
)
mediaStream, err := mediadevices.GetUserMedia(mediadevices.MediaStreamConstraints{
Video: func(c *mediadevices.MediaTrackConstraints) {
c.FrameFormat = prop.FrameFormat(frame.FormatYUY2)
c.Width = prop.Int(640)
c.Height = prop.Int(480)
},
Codec: codecSelector,
})
must(err)
videoTrack := mediaStream.GetVideoTracks()[0]
defer videoTrack.Close()
rtpReader, err := videoTrack.NewRTPReader(vp8Params.RTPCodec().Name, mtu)
must(err)
addr, err := net.ResolveUDPAddr("udp", dest)
must(err)
conn, err := net.DialUDP("udp", nil, addr)
must(err)
buff := make([]byte, mtu)
for {
pkts, release, err := rtpReader.Read()
must(err)
for _, pkt := range pkts {
n, err := pkt.MarshalTo(buff)
must(err)
_, err = conn.Write(buff[:n])
must(err)
}
release()
}
}

View File

@@ -1,9 +0,0 @@
v=0
o=- 1234567890 1234567890 IN IP4 0.0.0.0
s=RTP-Send Example
i=Example
c=IN IP4 0.0.0.0
t=0 0
a=recvonly
m=video 5000 RTP/AVP 100
a=rtpmap:100 VP8/90000

View File

@@ -9,14 +9,14 @@ import (
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/webrtc/v2"
// If you don't like x264, you can also use vpx by importing as below
// "github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
// If you don't like vpx, you can also use x264 by importing as below
// "github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
// or you can also use openh264 for alternative h264 implementation
// "github.com/pion/mediadevices/pkg/codec/openh264"
// or if you use a raspberry pi like, you can use mmal for using its hardware encoder
// "github.com/pion/mediadevices/pkg/codec/mmal"
"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use opus audio encoder
"github.com/pion/mediadevices/pkg/codec/x264" // This is required to use h264 video encoder
"github.com/pion/mediadevices/pkg/codec/opus" // This is required to use VP8/VP9 video encoder
"github.com/pion/mediadevices/pkg/codec/vpx" // This is required to use VP8/VP9 video encoder
// Note: If you don't have a camera or microphone or your adapters are not supported,
// you can always swap your adapters with our dummy adapters below.
@@ -44,18 +44,18 @@ func main() {
signal.Decode(signal.MustReadStdin(), &offer)
// Create a new RTCPeerConnection
x264Params, err := x264.NewParams()
vp8Params, err := vpx.NewVP8Params()
if err != nil {
panic(err)
}
x264Params.BitRate = 500_000 // 500kbps
vp8Params.BitRate = 300_000 // 300kbps
opusParams, err := opus.NewParams()
if err != nil {
panic(err)
}
codecSelector := mediadevices.NewCodecSelector(
mediadevices.WithVideoEncoders(&x264Params),
mediadevices.WithVideoEncoders(&vp8Params),
mediadevices.WithAudioEncoders(&opusParams),
)

3
go.mod
View File

@@ -7,9 +7,8 @@ require (
github.com/jfreymuth/pulse v0.0.0-20201014123913-1e525c426c93
github.com/lherman-cs/opus v0.0.2
github.com/pion/logging v0.2.2
github.com/pion/rtp v1.6.0
github.com/pion/webrtc/v2 v2.2.26
github.com/satori/go.uuid v1.2.0
golang.org/x/image v0.0.0-20200927104501-e162460cd6b5
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f
)

4
go.sum
View File

@@ -109,8 +109,8 @@ golang.org/x/sys v0.0.0-20190228124157-a34e9553db1e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200724161237-0e2f3a69832c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418 h1:HlFl4V6pEMziuLXyRkm5BIYq1y1GAbb02pRlWvI54OM=
golang.org/x/sys v0.0.0-20201029080932-201ba4db2418/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=

View File

@@ -1,82 +0,0 @@
// +build e2e
package mediadevices
import (
"image"
"sync"
"testing"
"github.com/pion/mediadevices/pkg/codec/x264"
"github.com/pion/mediadevices/pkg/frame"
)
type mockVideoSource struct {
width, height int
pool sync.Pool
decoder frame.Decoder
}
func newMockVideoSource(width, height int) *mockVideoSource {
decoder, err := frame.NewDecoder(frame.FormatYUY2)
if err != nil {
panic(err)
}
return &mockVideoSource{
width: width,
height: height,
pool: sync.Pool{
New: func() interface{} {
resolution := width * height
return make([]byte, resolution*2)
},
},
decoder: decoder,
}
}
func (source *mockVideoSource) ID() string { return "" }
func (source *mockVideoSource) Close() error { return nil }
func (source *mockVideoSource) Read() (image.Image, func(), error) {
raw := source.pool.Get().([]byte)
decoded, release, err := source.decoder.Decode(raw, source.width, source.height)
source.pool.Put(raw)
if err != nil {
return nil, nil, err
}
return decoded, release, nil
}
func BenchmarkEndToEnd(b *testing.B) {
params, err := x264.NewParams()
if err != nil {
b.Fatal(err)
}
params.BitRate = 300_000
videoSource := newMockVideoSource(1920, 1080)
track := NewVideoTrack(videoSource, nil).(*VideoTrack)
defer track.Close()
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
b.Fatal(err)
}
encodedReader, err := params.BuildVideoEncoder(reader, inputProp)
if err != nil {
b.Fatal(err)
}
defer encodedReader.Close()
for i := 0; i < b.N; i++ {
_, release, err := encodedReader.Read()
if err != nil {
b.Fatal(err)
}
release()
}
}

View File

@@ -33,10 +33,6 @@ func (track *mockMediaStreamTrack) Unbind(pc *webrtc.PeerConnection) error {
return nil
}
func (track *mockMediaStreamTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
return nil, nil
}
func TestMediaStreamFilters(t *testing.T) {
audioTracks := []Track{
&mockMediaStreamTrack{AudioInput},

View File

@@ -47,7 +47,7 @@ Encoder *enc_new(x264_param_t param, char *preset, int *rc) {
e->param.b_repeat_headers = 1;
e->param.b_annexb = 1;
if (x264_param_apply_profile(&e->param, "high") < 0) {
if (x264_param_apply_profile(&e->param, "baseline") < 0) {
*rc = ERR_APPLY_PROFILE;
goto fail;
}
@@ -95,4 +95,4 @@ void enc_close(Encoder *e, int *rc) {
x264_encoder_close(e->h);
x264_picture_clean(&e->pic_in);
free(e);
}
}

View File

@@ -1,204 +1 @@
package microphone
import (
"encoding/binary"
"errors"
"fmt"
"io"
"time"
"unsafe"
"github.com/gen2brain/malgo"
"github.com/pion/mediadevices/internal/logging"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
const (
maxDeviceIDLength = 20
// TODO: should replace this with a more flexible approach
sampleRateStep = 1000
initialBufferSize = 1024
)
var logger = logging.NewLogger("mediadevices/driver/microphone")
var ctx *malgo.AllocatedContext
var hostEndian binary.ByteOrder
var (
errUnsupportedFormat = errors.New("the provided audio format is not supported")
)
type microphone struct {
malgo.DeviceInfo
chunkChan chan []byte
}
func init() {
var err error
/*
backends := []malgo.Backend{
malgo.BackendPulseaudio,
malgo.BackendAlsa,
}
*/
ctx, err = malgo.InitContext(nil, malgo.ContextConfig{}, func(message string) {
logger.Debugf("%v\n", message)
})
if err != nil {
panic(err)
}
devices, err := ctx.Devices(malgo.Capture)
if err != nil {
panic(err)
}
for _, device := range devices {
// TODO: Detect default device and prioritize it
driver.GetManager().Register(newMicrophone(device), driver.Info{
Label: device.ID.String(),
DeviceType: driver.Microphone,
})
}
// Decide which endian
switch v := *(*uint16)(unsafe.Pointer(&([]byte{0x12, 0x34}[0]))); v {
case 0x1234:
hostEndian = binary.BigEndian
case 0x3412:
hostEndian = binary.LittleEndian
default:
panic(fmt.Sprintf("failed to determine host endianness: %x", v))
}
}
func newMicrophone(info malgo.DeviceInfo) *microphone {
return &microphone{
DeviceInfo: info,
}
}
func (m *microphone) Open() error {
m.chunkChan = make(chan []byte, 1)
return nil
}
func (m *microphone) Close() error {
if m.chunkChan != nil {
close(m.chunkChan)
m.chunkChan = nil
}
return nil
}
func (m *microphone) AudioRecord(inputProp prop.Media) (audio.Reader, error) {
var config malgo.DeviceConfig
var callbacks malgo.DeviceCallbacks
decoder, err := wave.NewDecoder(&wave.RawFormat{
SampleSize: inputProp.SampleSize,
IsFloat: inputProp.IsFloat,
Interleaved: inputProp.IsInterleaved,
})
if err != nil {
return nil, err
}
config.DeviceType = malgo.Capture
config.PerformanceProfile = malgo.LowLatency
config.Capture.Channels = uint32(inputProp.ChannelCount)
config.SampleRate = uint32(inputProp.SampleRate)
if inputProp.SampleSize == 4 && inputProp.IsFloat {
config.Capture.Format = malgo.FormatF32
} else if inputProp.SampleSize == 2 && !inputProp.IsFloat {
config.Capture.Format = malgo.FormatS16
} else {
return nil, errUnsupportedFormat
}
onRecvChunk := func(_, chunk []byte, framecount uint32) {
m.chunkChan <- chunk
}
callbacks.Data = onRecvChunk
device, err := malgo.InitDevice(ctx.Context, config, callbacks)
if err != nil {
return nil, err
}
err = device.Start()
if err != nil {
return nil, err
}
return audio.ReaderFunc(func() (wave.Audio, func(), error) {
chunk, ok := <-m.chunkChan
if !ok {
device.Stop()
device.Uninit()
return nil, func() {}, io.EOF
}
decodedChunk, err := decoder.Decode(hostEndian, chunk, inputProp.ChannelCount)
// FIXME: the decoder should also fill this information
decodedChunk.(*wave.Float32Interleaved).Size.SamplingRate = inputProp.SampleRate
return decodedChunk, func() {}, err
}), nil
}
func (m *microphone) Properties() []prop.Media {
var supportedProps []prop.Media
logger.Debug("Querying properties")
var isBigEndian bool
// miniaudio only uses the host endian
if hostEndian == binary.BigEndian {
isBigEndian = true
}
for ch := m.MinChannels; ch <= m.MaxChannels; ch++ {
for sampleRate := m.MinSampleRate; sampleRate <= m.MaxSampleRate; sampleRate += sampleRateStep {
for i := 0; i < int(m.FormatCount); i++ {
format := m.Formats[i]
supportedProp := prop.Media{
Audio: prop.Audio{
ChannelCount: int(ch),
SampleRate: int(sampleRate),
IsBigEndian: isBigEndian,
// miniaudio only supports interleaved at the moment
IsInterleaved: true,
},
}
switch malgo.FormatType(format) {
case malgo.FormatF32:
supportedProp.SampleSize = 4
supportedProp.IsFloat = true
case malgo.FormatS16:
supportedProp.SampleSize = 2
supportedProp.IsFloat = false
}
supportedProps = append(supportedProps, supportedProp)
}
}
}
// FIXME: remove this hardcoded value. Malgo doesn't support "ma_context_get_device_info" API yet. The above iterations
// will always return nothing as of now
supportedProps = append(supportedProps, prop.Media{
Audio: prop.Audio{
Latency: time.Millisecond * 20,
ChannelCount: 1,
SampleRate: 48000,
SampleSize: 4,
IsFloat: true,
IsBigEndian: isBigEndian,
IsInterleaved: true,
},
})
return supportedProps
}

View File

@@ -0,0 +1,138 @@
package microphone
import (
"io"
"time"
"github.com/jfreymuth/pulse"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
type microphone struct {
c *pulse.Client
id string
samplesChan chan<- []int16
}
func init() {
pa, err := pulse.NewClient()
if err != nil {
// No pulseaudio
return
}
defer pa.Close()
sources, err := pa.ListSources()
if err != nil {
panic(err)
}
defaultSource, err := pa.DefaultSource()
if err != nil {
panic(err)
}
for _, source := range sources {
priority := driver.PriorityNormal
if defaultSource.ID() == source.ID() {
priority = driver.PriorityHigh
}
driver.GetManager().Register(&microphone{id: source.ID()}, driver.Info{
Label: source.ID(),
DeviceType: driver.Microphone,
Priority: priority,
})
}
}
func (m *microphone) Open() error {
var err error
m.c, err = pulse.NewClient()
if err != nil {
return err
}
return nil
}
func (m *microphone) Close() error {
if m.samplesChan != nil {
close(m.samplesChan)
m.samplesChan = nil
}
m.c.Close()
return nil
}
func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
var options []pulse.RecordOption
if p.ChannelCount == 1 {
options = append(options, pulse.RecordMono)
} else {
options = append(options, pulse.RecordStereo)
}
latency := p.Latency.Seconds()
src, err := m.c.SourceByID(m.id)
if err != nil {
return nil, err
}
options = append(options,
pulse.RecordSampleRate(p.SampleRate),
pulse.RecordLatency(latency),
pulse.RecordSource(src),
)
samplesChan := make(chan []int16, 1)
handler := func(b []int16) (int, error) {
samplesChan <- b
return len(b), nil
}
stream, err := m.c.NewRecord(pulse.Int16Writer(handler), options...)
if err != nil {
return nil, err
}
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
buff, ok := <-samplesChan
if !ok {
stream.Close()
return nil, func() {}, io.EOF
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: len(buff) / p.ChannelCount,
SamplingRate: p.SampleRate,
},
)
copy(a.Data, buff)
return a, func() {}, nil
})
stream.Start()
m.samplesChan = samplesChan
return reader, nil
}
func (m *microphone) Properties() []prop.Media {
// TODO: Get actual properties
monoProp := prop.Media{
Audio: prop.Audio{
SampleRate: 48000,
Latency: time.Millisecond * 20,
ChannelCount: 1,
},
}
stereoProp := monoProp
stereoProp.ChannelCount = 2
return []prop.Media{monoProp, stereoProp}
}

View File

@@ -0,0 +1,348 @@
package microphone
import (
"errors"
"golang.org/x/sys/windows"
"io"
"time"
"unsafe"
"github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/prop"
"github.com/pion/mediadevices/pkg/wave"
)
const (
// bufferNumber * prop.Audio.Latency is the maximum blockable duration
// to get data without dropping chunks.
bufferNumber = 5
)
// Windows APIs
var (
winmm = windows.NewLazySystemDLL("Winmm.dll")
waveInOpen = winmm.NewProc("waveInOpen")
waveInStart = winmm.NewProc("waveInStart")
waveInStop = winmm.NewProc("waveInStop")
waveInReset = winmm.NewProc("waveInReset")
waveInClose = winmm.NewProc("waveInClose")
waveInPrepareHeader = winmm.NewProc("waveInPrepareHeader")
waveInAddBuffer = winmm.NewProc("waveInAddBuffer")
waveInUnprepareHeader = winmm.NewProc("waveInUnprepareHeader")
)
type buffer struct {
waveHdr
data []int16
}
func newBuffer(samples int) *buffer {
b := make([]int16, samples)
return &buffer{
waveHdr: waveHdr{
// Sharing Go memory with Windows C API without reference.
// Make sure that the lifetime of the buffer struct is longer
// than the final access from cbWaveIn.
lpData: uintptr(unsafe.Pointer(&b[0])),
dwBufferLength: uint32(samples * 2),
},
data: b,
}
}
type microphone struct {
hWaveIn windows.Pointer
buf map[uintptr]*buffer
chBuf chan *buffer
closed chan struct{}
}
func init() {
// TODO: enum devices
driver.GetManager().Register(&microphone{}, driver.Info{
Label: "default",
DeviceType: driver.Microphone,
})
}
func (m *microphone) Open() error {
m.chBuf = make(chan *buffer)
m.buf = make(map[uintptr]*buffer)
m.closed = make(chan struct{})
return nil
}
func (m *microphone) cbWaveIn(hWaveIn windows.Pointer, uMsg uint, dwInstance, dwParam1, dwParam2 *int32) uintptr {
switch uMsg {
case MM_WIM_DATA:
b := m.buf[uintptr(unsafe.Pointer(dwParam1))]
m.chBuf <- b
case MM_WIM_OPEN:
case MM_WIM_CLOSE:
close(m.chBuf)
}
return 0
}
func (m *microphone) Close() error {
if m.hWaveIn == nil {
return nil
}
close(m.closed)
ret, _, _ := waveInStop.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
// All enqueued buffers are marked done by waveInReset.
ret, _, _ = waveInReset.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
for _, buf := range m.buf {
// Detach buffers from waveIn API.
ret, _, _ := waveInUnprepareHeader.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return err
}
}
// Now, it's ready to free the buffers.
// As microphone struct still has reference to the buffers,
// they will be GC-ed once microphone is reopened or unreferenced.
ret, _, _ = waveInClose.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return err
}
<-m.chBuf
m.hWaveIn = nil
return nil
}
func (m *microphone) AudioRecord(p prop.Media) (audio.Reader, error) {
for i := 0; i < bufferNumber; i++ {
b := newBuffer(
int(uint64(p.Latency) * uint64(p.SampleRate) / uint64(time.Second)),
)
// Map the buffer by its data head address to restore access to the Go struct
// in callback function. Don't resize the buffer after it.
m.buf[uintptr(unsafe.Pointer(&b.waveHdr))] = b
}
waveFmt := &waveFormatEx{
wFormatTag: WAVE_FORMAT_PCM,
nChannels: uint16(p.ChannelCount),
nSamplesPerSec: uint32(p.SampleRate),
nAvgBytesPerSec: uint32(p.SampleRate * p.ChannelCount * 2),
nBlockAlign: uint16(p.ChannelCount * 2),
wBitsPerSample: 16,
}
ret, _, _ := waveInOpen.Call(
uintptr(unsafe.Pointer(&m.hWaveIn)),
WAVE_MAPPER,
uintptr(unsafe.Pointer(waveFmt)),
windows.NewCallback(m.cbWaveIn),
0,
CALLBACK_FUNCTION,
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
for _, buf := range m.buf {
// Attach buffers to waveIn API.
ret, _, _ := waveInPrepareHeader.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
}
for _, buf := range m.buf {
// Enqueue buffers.
ret, _, _ := waveInAddBuffer.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&buf.waveHdr)),
uintptr(unsafe.Sizeof(buf.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
}
ret, _, _ = waveInStart.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
)
if err := errWinmm[ret]; err != nil {
return nil, err
}
// TODO: detect microphone device disconnection and return EOF
reader := audio.ReaderFunc(func() (wave.Audio, func(), error) {
b, ok := <-m.chBuf
if !ok {
return nil, func() {}, io.EOF
}
select {
case <-m.closed:
default:
// Re-enqueue used buffer.
ret, _, _ := waveInAddBuffer.Call(
uintptr(unsafe.Pointer(m.hWaveIn)),
uintptr(unsafe.Pointer(&b.waveHdr)),
uintptr(unsafe.Sizeof(b.waveHdr)),
)
if err := errWinmm[ret]; err != nil {
return nil, func() {}, err
}
}
a := wave.NewInt16Interleaved(
wave.ChunkInfo{
Channels: p.ChannelCount,
Len: (int(b.waveHdr.dwBytesRecorded) / 2) / p.ChannelCount,
SamplingRate: p.SampleRate,
},
)
j := 0
for i := 0; i < a.Size.Len; i++ {
for ch := 0; ch < a.Size.Channels; ch++ {
a.SetInt16(i, ch, wave.Int16Sample(b.data[j]))
j++
}
}
return a, func() {}, nil
})
return reader, nil
}
func (m *microphone) Properties() []prop.Media {
// TODO: Get actual properties
monoProp := prop.Media{
Audio: prop.Audio{
SampleRate: 48000,
Latency: time.Millisecond * 20,
ChannelCount: 1,
},
}
stereoProp := monoProp
stereoProp.ChannelCount = 2
return []prop.Media{monoProp, stereoProp}
}
// Windows API structures
type waveFormatEx struct {
wFormatTag uint16
nChannels uint16
nSamplesPerSec uint32
nAvgBytesPerSec uint32
nBlockAlign uint16
wBitsPerSample uint16
cbSize uint16
}
type waveHdr struct {
lpData uintptr
dwBufferLength uint32
dwBytesRecorded uint32
dwUser *uint32
dwFlags uint32
dwLoops uint32
lpNext *waveHdr
reserved *uint32
}
// Windows consts
const (
MMSYSERR_NOERROR = 0
MMSYSERR_ERROR = 1
MMSYSERR_BADDEVICEID = 2
MMSYSERR_NOTENABLED = 3
MMSYSERR_ALLOCATED = 4
MMSYSERR_INVALHANDLE = 5
MMSYSERR_NODRIVER = 6
MMSYSERR_NOMEM = 7
MMSYSERR_NOTSUPPORTED = 8
MMSYSERR_BADERRNUM = 9
MMSYSERR_INVALFLAG = 10
MMSYSERR_INVALPARAM = 11
MMSYSERR_HANDLEBUSY = 12
MMSYSERR_INVALIDALIAS = 13
MMSYSERR_BADDB = 14
MMSYSERR_KEYNOTFOUND = 15
MMSYSERR_READERROR = 16
MMSYSERR_WRITEERROR = 17
MMSYSERR_DELETEERROR = 18
MMSYSERR_VALNOTFOUND = 19
MMSYSERR_NODRIVERCB = 20
WAVERR_BADFORMAT = 32
WAVERR_STILLPLAYING = 33
WAVERR_UNPREPARED = 34
WAVERR_SYNC = 35
WAVE_MAPPER = 0xFFFF
WAVE_FORMAT_PCM = 1
CALLBACK_NULL = 0
CALLBACK_WINDOW = 0x10000
CALLBACK_TASK = 0x20000
CALLBACK_FUNCTION = 0x30000
CALLBACK_THREAD = CALLBACK_TASK
CALLBACK_EVENT = 0x50000
MM_WIM_OPEN = 0x3BE
MM_WIM_CLOSE = 0x3BF
MM_WIM_DATA = 0x3C0
)
var errWinmm = map[uintptr]error{
MMSYSERR_NOERROR: nil,
MMSYSERR_ERROR: errors.New("error"),
MMSYSERR_BADDEVICEID: errors.New("bad device id"),
MMSYSERR_NOTENABLED: errors.New("not enabled"),
MMSYSERR_ALLOCATED: errors.New("already allocated"),
MMSYSERR_INVALHANDLE: errors.New("invalid handler"),
MMSYSERR_NODRIVER: errors.New("no driver"),
MMSYSERR_NOMEM: errors.New("no memory"),
MMSYSERR_NOTSUPPORTED: errors.New("not supported"),
MMSYSERR_BADERRNUM: errors.New("band error number"),
MMSYSERR_INVALFLAG: errors.New("invalid flag"),
MMSYSERR_INVALPARAM: errors.New("invalid param"),
MMSYSERR_HANDLEBUSY: errors.New("handle busy"),
MMSYSERR_INVALIDALIAS: errors.New("invalid alias"),
MMSYSERR_BADDB: errors.New("bad db"),
MMSYSERR_KEYNOTFOUND: errors.New("key not found"),
MMSYSERR_READERROR: errors.New("read error"),
MMSYSERR_WRITEERROR: errors.New("write error"),
MMSYSERR_DELETEERROR: errors.New("delete error"),
MMSYSERR_VALNOTFOUND: errors.New("value not found"),
MMSYSERR_NODRIVERCB: errors.New("no driver cb"),
WAVERR_BADFORMAT: errors.New("bad format"),
WAVERR_STILLPLAYING: errors.New("still playing"),
WAVERR_UNPREPARED: errors.New("unprepared"),
WAVERR_SYNC: errors.New("sync"),
}

8
pkg/frame/buffer.go Normal file
View File

@@ -0,0 +1,8 @@
package frame
import "image"
type buffer struct {
image image.Image
raw []uint8
}

View File

@@ -6,7 +6,9 @@ import (
"image/jpeg"
)
func decodeMJPEG(frame []byte, width, height int) (image.Image, func(), error) {
img, err := jpeg.Decode(bytes.NewReader(frame))
return img, func() {}, err
func decodeMJPEG() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
img, err := jpeg.Decode(bytes.NewReader(frame))
return img, func() {}, err
}
}

View File

@@ -5,22 +5,22 @@ import (
)
func NewDecoder(f Format) (Decoder, error) {
var decoder decoderFunc
var buildDecoder func() decoderFunc
switch f {
case FormatI420:
decoder = decodeI420
buildDecoder = decodeI420
case FormatNV21:
decoder = decodeNV21
buildDecoder = decodeNV21
case FormatYUY2:
decoder = decodeYUY2
buildDecoder = decodeYUY2
case FormatUYVY:
decoder = decodeUYVY
buildDecoder = decodeUYVY
case FormatMJPEG:
decoder = decodeMJPEG
buildDecoder = decodeMJPEG
default:
return nil, fmt.Errorf("%s is not supported", f)
}
return decoder, nil
return buildDecoder(), nil
}

View File

@@ -5,47 +5,51 @@ import (
"image"
)
func decodeI420(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
cbi := yi + width*height/4
cri := cbi + width*height/4
func decodeI420() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
cbi := yi + width*height/4
cri := cbi + width*height/4
if cri > len(frame) {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
if cri > len(frame) {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), cri)
}
return &image.YCbCr{
Y: frame[:yi],
YStride: width,
Cb: frame[yi:cbi],
Cr: frame[cbi:cri],
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
return &image.YCbCr{
Y: frame[:yi],
YStride: width,
Cb: frame[yi:cbi],
Cr: frame[cbi:cri],
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
func decodeNV21(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi + width*height/2
func decodeNV21() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi + width*height/2
if ci > len(frame) {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
if ci > len(frame) {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), ci)
}
var cb, cr []byte
for i := yi; i < ci; i += 2 {
cb = append(cb, frame[i])
cr = append(cr, frame[i+1])
}
return &image.YCbCr{
Y: frame[:yi],
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
var cb, cr []byte
for i := yi; i < ci; i += 2 {
cb = append(cb, frame[i])
cr = append(cr, frame[i+1])
}
return &image.YCbCr{
Y: frame[:yi],
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio420,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}

View File

@@ -12,66 +12,70 @@ import (
// void decodeUYVYCGO(uint8_t* y, uint8_t* cb, uint8_t* cr, uint8_t* uyvy, int width, int height);
import "C"
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
func decodeYUY2() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
C.decodeYUY2CGO(
(*C.uchar)(&y[0]),
(*C.uchar)(&cb[0]),
(*C.uchar)(&cr[0]),
(*C.uchar)(&frame[0]),
C.int(width), C.int(height),
)
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
C.decodeYUY2CGO(
(*C.uchar)(&y[0]),
(*C.uchar)(&cb[0]),
(*C.uchar)(&cr[0]),
(*C.uchar)(&frame[0]),
C.int(width), C.int(height),
)
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
func decodeUYVY() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
C.decodeUYVYCGO(
(*C.uchar)(&y[0]),
(*C.uchar)(&cb[0]),
(*C.uchar)(&cr[0]),
(*C.uchar)(&frame[0]),
C.int(width), C.int(height),
)
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
C.decodeUYVYCGO(
(*C.uchar)(&y[0]),
(*C.uchar)(&cb[0]),
(*C.uchar)(&cr[0]),
(*C.uchar)(&frame[0]),
C.int(width), C.int(height),
)
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}

View File

@@ -5,74 +5,97 @@ package frame
import (
"fmt"
"image"
"sync"
)
func decodeYUY2(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
func decodeYUY2() decoderFunc {
pool := sync.Pool{
New: func() interface{} {
return &buffer{
image: &image.YCbCr{},
}
},
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
return func(frame []byte, width, height int) (image.Image, func(), error) {
buff := pool.Get().(*buffer)
fast := 0
slow := 0
for i := 0; i < fi; i += 4 {
y[fast] = frame[i]
cb[slow] = frame[i+1]
y[fast+1] = frame[i+2]
cr[slow] = frame[i+3]
fast += 2
slow++
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
pool.Put(buff)
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
if len(buff.raw) < fi {
need := fi - len(buff.raw)
buff.raw = append(buff.raw, make([]uint8, need)...)
}
y := buff.raw[:yi:yi]
cb := buff.raw[yi : yi+ci : yi+ci]
cr := buff.raw[yi+ci : fi : fi]
fast := 0
slow := 0
for i := 0; i < fi; i += 4 {
y[fast] = frame[i]
cb[slow] = frame[i+1]
y[fast+1] = frame[i+2]
cr[slow] = frame[i+3]
fast += 2
slow++
}
img := buff.image.(*image.YCbCr)
img.Y = y
img.YStride = width
img.Cb = cb
img.Cr = cr
img.CStride = width / 2
img.SubsampleRatio = image.YCbCrSubsampleRatio422
img.Rect.Min.X = 0
img.Rect.Min.Y = 0
img.Rect.Max.X = width
img.Rect.Max.Y = height
return img, func() { pool.Put(buff) }, nil
}
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
func decodeUYVY(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
func decodeUYVY() decoderFunc {
return func(frame []byte, width, height int) (image.Image, func(), error) {
yi := width * height
ci := yi / 2
fi := yi + 2*ci
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
if len(frame) != fi {
return nil, func() {}, fmt.Errorf("frame length (%d) less than expected (%d)", len(frame), fi)
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
fast := 0
slow := 0
for i := 0; i < fi; i += 4 {
cb[slow] = frame[i]
y[fast] = frame[i+1]
cr[slow] = frame[i+2]
y[fast+1] = frame[i+3]
fast += 2
slow++
}
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}
y := make([]byte, yi)
cb := make([]byte, ci)
cr := make([]byte, ci)
fast := 0
slow := 0
for i := 0; i < fi; i += 4 {
cb[slow] = frame[i]
y[fast] = frame[i+1]
cr[slow] = frame[i+2]
y[fast+1] = frame[i+3]
fast += 2
slow++
}
return &image.YCbCr{
Y: y,
YStride: width,
Cb: cb,
Cr: cr,
CStride: width / 2,
SubsampleRatio: image.YCbCrSubsampleRatio422,
Rect: image.Rect(0, 0, width, height),
}, func() {}, nil
}

View File

@@ -27,7 +27,7 @@ func TestDecodeYUY2(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, _, err := decodeYUY2(input, width, height)
img, _, err := decodeYUY2()(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -56,7 +56,7 @@ func TestDecodeUYVY(t *testing.T) {
Rect: image.Rect(0, 0, width, height),
}
img, _, err := decodeUYVY(input, width, height)
img, _, err := decodeUYVY()(input, width, height)
if err != nil {
t.Fatal(err)
}
@@ -76,11 +76,13 @@ func BenchmarkDecodeYUY2(b *testing.B) {
sz := sz
b.Run(fmt.Sprintf("%dx%d", sz.width, sz.height), func(b *testing.B) {
input := make([]byte, sz.width*sz.height*2)
decode := decodeYUY2()
for i := 0; i < b.N; i++ {
_, _, err := decodeYUY2(input, sz.width, sz.height)
_, release, err := decode(input, sz.width, sz.height)
if err != nil {
b.Fatal(err)
}
release()
}
})
}

View File

@@ -5,14 +5,6 @@ import (
)
type Reader interface {
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (chunk wave.Audio, release func(), err error)
}

View File

@@ -3,14 +3,6 @@ package io
// Reader is a generic data reader. In the future, interface{} should be replaced by a generic type
// to provide strong type.
type Reader interface {
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (data interface{}, release func(), err error)
}

View File

@@ -5,14 +5,6 @@ import (
)
type Reader interface {
// Read reads data from the source. The caller is responsible to release the memory that's associated
// with data by calling the given release function. When err is not nil, the caller MUST NOT call release
// as data is going to be nil (no memory was given). Otherwise, the caller SHOULD call release after
// using the data. The caller is NOT REQUIRED to call release, as this is only a part of memory management
// optimization. If release is not called, the source is forced to allocate a new memory, which also means
// there will be new allocations during streaming, and old unused memory will become garbage. As a consequence,
// these garbage will put a lot of pressure to the garbage collector and makes it to run more often and finish
// slower as the heap memory usage increases and more garbage to collect.
Read() (img image.Image, release func(), err error)
}

View File

@@ -1,21 +0,0 @@
package mediadevices
import "github.com/pion/rtp"
type RTPReadCloser interface {
Read() (pkts []*rtp.Packet, release func(), err error)
Close() error
}
type rtpReadCloserImpl struct {
readFn func() ([]*rtp.Packet, func(), error)
closeFn func() error
}
func (r *rtpReadCloserImpl) Read() ([]*rtp.Packet, func(), error) {
return r.readFn()
}
func (r *rtpReadCloserImpl) Close() error {
return r.closeFn()
}

View File

@@ -3,30 +3,34 @@ package mediadevices
import (
"math"
"time"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
type samplerFunc func() uint32
type samplerFunc func(b []byte) error
// newVideoSampler creates a video sampler that uses the actual video frame rate and
// the codec's clock rate to come up with a duration for each sample.
func newVideoSampler(clockRate uint32) samplerFunc {
clockRateFloat := float64(clockRate)
func newVideoSampler(t *webrtc.Track) samplerFunc {
clockRate := float64(t.Codec().ClockRate)
lastTimestamp := time.Now()
return samplerFunc(func() uint32 {
return samplerFunc(func(b []byte) error {
now := time.Now()
duration := now.Sub(lastTimestamp).Seconds()
samples := uint32(math.Round(clockRateFloat * duration))
samples := uint32(math.Round(clockRate * duration))
lastTimestamp = now
return samples
return t.WriteSample(media.Sample{Data: b, Samples: samples})
})
}
// newAudioSampler creates a audio sampler that uses a fixed latency and
// the codec's clock rate to come up with a duration for each sample.
func newAudioSampler(clockRate uint32, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(clockRate) * latency.Seconds()))
return samplerFunc(func() uint32 {
return samples
func newAudioSampler(t *webrtc.Track, latency time.Duration) samplerFunc {
samples := uint32(math.Round(float64(t.Codec().ClockRate) * latency.Seconds()))
return samplerFunc(func(b []byte) error {
return t.WriteSample(media.Sample{Data: b, Samples: samples})
})
}

View File

@@ -11,9 +11,7 @@ import (
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/mediadevices/pkg/wave"
"github.com/pion/rtp"
"github.com/pion/webrtc/v2"
"github.com/pion/webrtc/v2/pkg/media"
)
var (
@@ -54,9 +52,6 @@ type Track interface {
// Unbind is the clean up operation that should be called after Bind. Similar to Bind, unbind will
// be called automatically in the future.
Unbind(*webrtc.PeerConnection) error
// NewRTPReader creates a new reader from the source. The reader will encode the source, and packetize
// the encoded data in RTP format with given mtu size.
NewRTPReader(codecName string, mtu int) (RTPReadCloser, error)
}
type baseTrack struct {
@@ -114,7 +109,7 @@ func (track *baseTrack) onError(err error) {
}
}
func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sample samplerFunc) (*webrtc.Track, error) {
func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.ReadCloser, selectedCodec *codec.RTPCodec, sampler func(*webrtc.Track) samplerFunc) (*webrtc.Track, error) {
track.mu.Lock()
defer track.mu.Unlock()
@@ -123,6 +118,7 @@ func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.Read
return nil, err
}
sample := sampler(webrtcTrack)
signalCh := make(chan chan<- struct{})
track.activePeerConnections[pc] = signalCh
@@ -151,12 +147,7 @@ func (track *baseTrack) bind(pc *webrtc.PeerConnection, encodedReader codec.Read
return
}
sampleCount := sample()
err = webrtcTrack.WriteSample(media.Sample{
Data: buff,
Samples: sampleCount,
})
if err != nil {
if err := sample(buff); err != nil {
track.onError(err)
return
}
@@ -251,53 +242,18 @@ func (track *VideoTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error)
}
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeVideo)
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(reader, inputProp, wantCodecs...)
encodedReader, selectedCodec, err := track.selector.selectVideoCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
}
return track.bind(pc, encodedReader, selectedCodec, newVideoSampler(selectedCodec.ClockRate))
return track.bind(pc, encodedReader, selectedCodec, newVideoSampler)
}
func (track *VideoTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *VideoTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentVideoProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, selectedCodec, err := track.selector.selectVideoCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
sample := newVideoSampler(selectedCodec.ClockRate)
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
return &rtpReadCloserImpl{
readFn: func() ([]*rtp.Packet, func(), error) {
encoded, release, err := encodedReader.Read()
if err != nil {
encodedReader.Close()
track.onError(err)
return nil, func() {}, err
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}
// AudioTrack is a specific track type that contains audio source which allows multiple readers to access, and
// manipulate.
type AudioTrack struct {
@@ -336,9 +292,6 @@ func newAudioTrackFromDriver(d driver.Driver, recorder driver.AudioRecorder, con
return nil, err
}
// FIXME: The current audio detection and audio encoder can only work with a static latency. Since the latency from the driver
// can fluctuate, we need to stabilize it. Maybe there's a better way for doing this?
reader = audio.NewBuffer(int(constraints.selectedMedia.Latency.Seconds() * float64(constraints.selectedMedia.SampleRate)))(reader)
return newAudioTrackFromReader(d, reader, selector), nil
}
@@ -356,49 +309,14 @@ func (track *AudioTrack) Bind(pc *webrtc.PeerConnection) (*webrtc.Track, error)
}
wantCodecs := pc.GetRegisteredRTPCodecs(webrtc.RTPCodecTypeAudio)
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(reader, inputProp, wantCodecs...)
encodedReader, selectedCodec, err := track.selector.selectAudioCodec(wantCodecs, reader, inputProp)
if err != nil {
return nil, err
}
return track.bind(pc, encodedReader, selectedCodec, newAudioSampler(selectedCodec.ClockRate, inputProp.Latency))
return track.bind(pc, encodedReader, selectedCodec, func(t *webrtc.Track) samplerFunc { return newAudioSampler(t, inputProp.Latency) })
}
func (track *AudioTrack) Unbind(pc *webrtc.PeerConnection) error {
return track.unbind(pc)
}
func (track *AudioTrack) NewRTPReader(codecName string, mtu int) (RTPReadCloser, error) {
reader := track.NewReader(false)
inputProp, err := detectCurrentAudioProp(track.Broadcaster)
if err != nil {
return nil, err
}
encodedReader, selectedCodec, err := track.selector.selectAudioCodecByNames(reader, inputProp, codecName)
if err != nil {
return nil, err
}
sample := newVideoSampler(selectedCodec.ClockRate)
// FIXME: not sure the best way to get unique ssrc. We probably should have a global keeper that can generate a random ID and does book keeping?
packetizer := rtp.NewPacketizer(mtu, selectedCodec.PayloadType, rand.Uint32(), selectedCodec.Payloader, rtp.NewRandomSequencer(), selectedCodec.ClockRate)
return &rtpReadCloserImpl{
readFn: func() ([]*rtp.Packet, func(), error) {
encoded, release, err := encodedReader.Read()
if err != nil {
encodedReader.Close()
track.onError(err)
return nil, func() {}, err
}
defer release()
samples := sample()
pkts := packetizer.Packetize(encoded, samples)
return pkts, release, err
},
closeFn: encodedReader.Close,
}, nil
}