Convert video APIs to use video.Reader

This commit is contained in:
Lukas Herman
2020-01-21 23:11:24 -08:00
parent 3671db8e62
commit e1365d8811
9 changed files with 135 additions and 96 deletions

View File

@@ -1,15 +1,11 @@
package codec package codec
import ( import (
"github.com/pion/mediadevices/pkg/io/audio"
"image"
"io" "io"
)
type VideoEncoder interface { "github.com/pion/mediadevices/pkg/io/audio"
Encode(img image.Image) ([]byte, error) "github.com/pion/mediadevices/pkg/io/video"
Close() error )
}
type VideoSetting struct { type VideoSetting struct {
Width, Height int Width, Height int
@@ -17,7 +13,7 @@ type VideoSetting struct {
FrameRate float32 FrameRate float32
} }
type VideoEncoderBuilder func(s VideoSetting) (VideoEncoder, error) type VideoEncoderBuilder func(r video.Reader, s VideoSetting) (io.ReadCloser, error)
type AudioSetting struct { type AudioSetting struct {
InSampleRate, OutSampleRate int InSampleRate, OutSampleRate int

View File

@@ -11,7 +11,10 @@ import "C"
import ( import (
"fmt" "fmt"
"io"
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/mediadevices/pkg/io/video"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
"image" "image"
"unsafe" "unsafe"
@@ -19,16 +22,17 @@ import (
type encoder struct { type encoder struct {
engine *C.Encoder engine *C.Encoder
r video.Reader
buff []byte
} }
var _ codec.VideoEncoder = &encoder{}
var _ codec.VideoEncoderBuilder = codec.VideoEncoderBuilder(NewEncoder) var _ codec.VideoEncoderBuilder = codec.VideoEncoderBuilder(NewEncoder)
func init() { func init() {
codec.Register(webrtc.H264, codec.VideoEncoderBuilder(NewEncoder)) codec.Register(webrtc.H264, codec.VideoEncoderBuilder(NewEncoder))
} }
func NewEncoder(s codec.VideoSetting) (codec.VideoEncoder, error) { func NewEncoder(r video.Reader, s codec.VideoSetting) (io.ReadCloser, error) {
cEncoder, err := C.enc_new(C.EncoderOptions{ cEncoder, err := C.enc_new(C.EncoderOptions{
width: C.int(s.Width), width: C.int(s.Width),
height: C.int(s.Height), height: C.int(s.Height),
@@ -41,10 +45,27 @@ func NewEncoder(s codec.VideoSetting) (codec.VideoEncoder, error) {
return nil, fmt.Errorf("failed in creating encoder") return nil, fmt.Errorf("failed in creating encoder")
} }
return &encoder{cEncoder}, nil return &encoder{
engine: cEncoder,
r: r,
}, nil
} }
func (e *encoder) Encode(img image.Image) ([]byte, error) { func (e *encoder) Read(p []byte) (n int, err error) {
if e.buff != nil {
n, err = mio.Copy(p, e.buff)
if err == nil {
e.buff = nil
}
return n, err
}
img, err := e.r.Read()
if err != nil {
return 0, err
}
// TODO: Convert img to YCbCr since openh264 only accepts YCbCr // TODO: Convert img to YCbCr since openh264 only accepts YCbCr
// TODO: Convert img to 4:2:0 format which what openh264 accepts // TODO: Convert img to 4:2:0 format which what openh264 accepts
yuvImg := img.(*image.YCbCr) yuvImg := img.(*image.YCbCr)
@@ -58,10 +79,16 @@ func (e *encoder) Encode(img image.Image) ([]byte, error) {
}) })
if err != nil { if err != nil {
// TODO: better error message // TODO: better error message
return nil, fmt.Errorf("failed in encoding") return 0, fmt.Errorf("failed in encoding")
} }
return C.GoBytes(unsafe.Pointer(s.data), s.data_len), nil encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len)
n, err = mio.Copy(p, encoded)
if err != nil {
e.buff = encoded
}
return n, err
} }
func (e *encoder) Close() error { func (e *encoder) Close() error {

View File

@@ -2,8 +2,10 @@ package codec
import ( import (
"fmt" "fmt"
"github.com/pion/mediadevices/pkg/io/audio"
"io" "io"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
) )
var ( var (
@@ -20,13 +22,13 @@ func Register(name string, builder interface{}) {
} }
} }
func BuildVideoEncoder(name string, s VideoSetting) (VideoEncoder, error) { func BuildVideoEncoder(name string, r video.Reader, s VideoSetting) (io.ReadCloser, error) {
b, ok := videoEncoders[name] b, ok := videoEncoders[name]
if !ok { if !ok {
return nil, fmt.Errorf("codec: can't find %s video encoder", name) return nil, fmt.Errorf("codec: can't find %s video encoder", name)
} }
return b(s) return b(r, s)
} }
func BuildAudioEncoder(name string, r audio.Reader, s AudioSetting) (io.ReadCloser, error) { func BuildAudioEncoder(name string, r audio.Reader, s AudioSetting) (io.ReadCloser, error) {

View File

@@ -4,8 +4,12 @@ package driver
import "C" import "C"
import ( import (
"image"
"io"
"github.com/blackjack/webcam" "github.com/blackjack/webcam"
"github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/video"
) )
// Camera implementation using v4l2 // Camera implementation using v4l2
@@ -78,41 +82,51 @@ func (c *camera) Close() error {
return c.cam.StopStreaming() return c.cam.StopStreaming()
} }
func (c *camera) Start(setting VideoSetting, cb DataCb) error { func (c *camera) Start(setting VideoSetting) (video.Reader, error) {
pf := c.reversedFormats[setting.FrameFormat] decoder, err := frame.NewDecoder(setting.FrameFormat)
_, _, _, err := c.cam.SetImageFormat(pf, uint32(setting.Width), uint32(setting.Height))
if err != nil { if err != nil {
return err return nil, err
}
pf := c.reversedFormats[setting.FrameFormat]
_, _, _, err = c.cam.SetImageFormat(pf, uint32(setting.Width), uint32(setting.Height))
if err != nil {
return nil, err
} }
if err := c.cam.StartStreaming(); err != nil { if err := c.cam.StartStreaming(); err != nil {
return err return nil, err
} }
for { r := video.ReaderFunc(func() (img image.Image, err error) {
err := c.cam.WaitForFrame(5) // Wait until a frame is ready
switch err.(type) { for {
case nil: err := c.cam.WaitForFrame(5)
case *webcam.Timeout: switch err.(type) {
continue case nil:
default: case *webcam.Timeout:
// Camera has been stopped. We don't need to return an error. continue
return nil default:
} // Camera has been stopped.
return nil, io.EOF
}
frame, err := c.cam.ReadFrame() b, err := c.cam.ReadFrame()
if err != nil { if err != nil {
// Camera has been stopped. We don't need to return an error. // Camera has been stopped.
return nil return nil, io.EOF
} }
// Frame is not ready. // Frame is not ready.
if len(frame) == 0 { if len(b) == 0 {
continue continue
} }
cb(frame) return decoder.Decode(b, setting.Width, setting.Height)
} }
})
return r, nil
} }
func (c *camera) Stop() error { func (c *camera) Stop() error {

View File

@@ -1,8 +1,9 @@
package driver package driver
import ( import (
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/frame" "github.com/pion/mediadevices/pkg/frame"
"github.com/pion/mediadevices/pkg/io/audio"
"github.com/pion/mediadevices/pkg/io/video"
) )
type State uint type State uint
@@ -14,8 +15,6 @@ const (
StateStopped StateStopped
) )
type DataCb func(b []byte)
type OpenCloser interface { type OpenCloser interface {
Open() error Open() error
Close() error Close() error
@@ -31,7 +30,7 @@ type Info struct {
} }
type VideoCapable interface { type VideoCapable interface {
Start(setting VideoSetting, cb DataCb) error Start(setting VideoSetting) (video.Reader, error)
Stop() error Stop() error
Settings() []VideoSetting Settings() []VideoSetting
} }

View File

@@ -2,7 +2,7 @@ package driver
import ( import (
"fmt" "fmt"
"github.com/pion/mediadevices/pkg/io/video"
uuid "github.com/satori/go.uuid" uuid "github.com/satori/go.uuid"
) )
@@ -61,36 +61,20 @@ func (w *videoAdapterWrapper) Close() error {
return err return err
} }
func (w *videoAdapterWrapper) Start(setting VideoSetting, cb DataCb) error { func (w *videoAdapterWrapper) Start(setting VideoSetting) (video.Reader, error) {
if w.state == StateClosed { if w.state == StateClosed {
return fmt.Errorf("invalid state: driver hasn't been opened") return nil, fmt.Errorf("invalid state: driver hasn't been opened")
} }
if w.state == StateStarted { if w.state == StateStarted {
return fmt.Errorf("invalid state: driver has been started") return nil, fmt.Errorf("invalid state: driver has been started")
} }
// Make sure if there were 2 errors sent to errCh, none of the r, err := w.VideoAdapter.Start(setting)
// callers will be blocked.
errCh := make(chan error, 2)
go func() {
first := true
errCh <- w.VideoAdapter.Start(setting, func(b []byte) {
if first {
errCh <- nil
first = false
}
cb(b)
})
}()
// Block until either we receive an error or the first frame
err := <-errCh
if err == nil { if err == nil {
w.state = StateStarted w.state = StateStarted
} }
return err return r, err
} }
func (w *videoAdapterWrapper) Stop() error { func (w *videoAdapterWrapper) Stop() error {

13
pkg/io/error.go Normal file
View File

@@ -0,0 +1,13 @@
package io
import "fmt"
// InsufficientBufferError tells the caller that the buffer provided is not sufficient/big
// enough to hold the whole data/sample.
type InsufficientBufferError struct {
RequiredSize int
}
func (e *InsufficientBufferError) Error() string {
return fmt.Sprintf("provided buffer doesn't meet the size requirement of length, %d", e.RequiredSize)
}

11
pkg/io/io.go Normal file
View File

@@ -0,0 +1,11 @@
package io
// Copy copies data from src to dst. If dst is not big enough, return an
// InsufficientBufferError.
func Copy(dst, src []byte) (n int, err error) {
if len(dst) < len(src) {
return 0, &InsufficientBufferError{len(dst)}
}
return copy(dst, src), nil
}

View File

@@ -7,7 +7,7 @@ import (
"github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/codec"
"github.com/pion/mediadevices/pkg/driver" "github.com/pion/mediadevices/pkg/driver"
"github.com/pion/mediadevices/pkg/frame" mio "github.com/pion/mediadevices/pkg/io"
"github.com/pion/webrtc/v2" "github.com/pion/webrtc/v2"
) )
@@ -65,8 +65,7 @@ type videoTrack struct {
*track *track
d driver.VideoDriver d driver.VideoDriver
setting driver.VideoSetting setting driver.VideoSetting
decoder frame.Decoder encoder io.ReadCloser
encoder codec.VideoEncoder
} }
var _ Tracker = &videoTrack{} var _ Tracker = &videoTrack{}
@@ -77,12 +76,12 @@ func newVideoTrack(pc *webrtc.PeerConnection, d driver.VideoDriver, setting driv
return nil, err return nil, err
} }
decoder, err := frame.NewDecoder(setting.FrameFormat) r, err := d.Start(setting)
if err != nil { if err != nil {
return nil, err return nil, err
} }
encoder, err := codec.BuildVideoEncoder(codecName, codec.VideoSetting{ encoder, err := codec.BuildVideoEncoder(codecName, r, codec.VideoSetting{
Width: setting.Width, Width: setting.Width,
Height: setting.Height, Height: setting.Height,
TargetBitRate: 1000000, TargetBitRate: 1000000,
@@ -96,36 +95,30 @@ func newVideoTrack(pc *webrtc.PeerConnection, d driver.VideoDriver, setting driv
track: t, track: t,
d: d, d: d,
setting: setting, setting: setting,
decoder: decoder,
encoder: encoder, encoder: encoder,
} }
err = d.Start(setting, vt.dataCb) go vt.start()
if err != nil {
encoder.Close()
return nil, err
}
return &vt, nil return &vt, nil
} }
func (vt *videoTrack) dataCb(b []byte) { func (vt *videoTrack) start() {
img, err := vt.decoder.Decode(b, vt.setting.Width, vt.setting.Height) var n int
if err != nil { var err error
// TODO: probably do some logging here buff := make([]byte, 1024)
return for {
} n, err = vt.encoder.Read(buff)
if err != nil {
if e, ok := err.(*mio.InsufficientBufferError); ok {
buff = make([]byte, 2*e.RequiredSize)
continue
}
encoded, err := vt.encoder.Encode(img) // TODO: better error handling
if err != nil { panic(err)
// TODO: probably do some logging here }
return
}
err = vt.s.sample(encoded) vt.s.sample(buff[:n])
if err != nil {
// TODO: probably do some logging here
return
} }
} }