diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index e3e0057..12ce57e 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -1,15 +1,11 @@ package codec import ( - "github.com/pion/mediadevices/pkg/io/audio" - "image" "io" -) -type VideoEncoder interface { - Encode(img image.Image) ([]byte, error) - Close() error -} + "github.com/pion/mediadevices/pkg/io/audio" + "github.com/pion/mediadevices/pkg/io/video" +) type VideoSetting struct { Width, Height int @@ -17,7 +13,7 @@ type VideoSetting struct { FrameRate float32 } -type VideoEncoderBuilder func(s VideoSetting) (VideoEncoder, error) +type VideoEncoderBuilder func(r video.Reader, s VideoSetting) (io.ReadCloser, error) type AudioSetting struct { InSampleRate, OutSampleRate int diff --git a/pkg/codec/openh264/openh264.go b/pkg/codec/openh264/openh264.go index 25f6604..9f54bae 100644 --- a/pkg/codec/openh264/openh264.go +++ b/pkg/codec/openh264/openh264.go @@ -11,7 +11,10 @@ import "C" import ( "fmt" + "io" "github.com/pion/mediadevices/pkg/codec" + mio "github.com/pion/mediadevices/pkg/io" + "github.com/pion/mediadevices/pkg/io/video" "github.com/pion/webrtc/v2" "image" "unsafe" @@ -19,16 +22,17 @@ import ( type encoder struct { engine *C.Encoder + r video.Reader + buff []byte } -var _ codec.VideoEncoder = &encoder{} var _ codec.VideoEncoderBuilder = codec.VideoEncoderBuilder(NewEncoder) func init() { codec.Register(webrtc.H264, codec.VideoEncoderBuilder(NewEncoder)) } -func NewEncoder(s codec.VideoSetting) (codec.VideoEncoder, error) { +func NewEncoder(r video.Reader, s codec.VideoSetting) (io.ReadCloser, error) { cEncoder, err := C.enc_new(C.EncoderOptions{ width: C.int(s.Width), height: C.int(s.Height), @@ -41,10 +45,27 @@ func NewEncoder(s codec.VideoSetting) (codec.VideoEncoder, error) { return nil, fmt.Errorf("failed in creating encoder") } - return &encoder{cEncoder}, nil + return &encoder{ + engine: cEncoder, + r: r, + }, nil } -func (e *encoder) Encode(img image.Image) ([]byte, error) { +func (e *encoder) Read(p []byte) (n int, err error) { + if e.buff != nil { + n, err = mio.Copy(p, e.buff) + if err == nil { + e.buff = nil + } + + return n, err + } + + img, err := e.r.Read() + if err != nil { + return 0, err + } + // TODO: Convert img to YCbCr since openh264 only accepts YCbCr // TODO: Convert img to 4:2:0 format which what openh264 accepts yuvImg := img.(*image.YCbCr) @@ -58,10 +79,16 @@ func (e *encoder) Encode(img image.Image) ([]byte, error) { }) if err != nil { // TODO: better error message - return nil, fmt.Errorf("failed in encoding") + return 0, fmt.Errorf("failed in encoding") } - return C.GoBytes(unsafe.Pointer(s.data), s.data_len), nil + encoded := C.GoBytes(unsafe.Pointer(s.data), s.data_len) + n, err = mio.Copy(p, encoded) + if err != nil { + e.buff = encoded + } + + return n, err } func (e *encoder) Close() error { diff --git a/pkg/codec/registrar.go b/pkg/codec/registrar.go index 4e3a128..04b3c22 100644 --- a/pkg/codec/registrar.go +++ b/pkg/codec/registrar.go @@ -2,8 +2,10 @@ package codec import ( "fmt" - "github.com/pion/mediadevices/pkg/io/audio" "io" + + "github.com/pion/mediadevices/pkg/io/audio" + "github.com/pion/mediadevices/pkg/io/video" ) var ( @@ -20,13 +22,13 @@ func Register(name string, builder interface{}) { } } -func BuildVideoEncoder(name string, s VideoSetting) (VideoEncoder, error) { +func BuildVideoEncoder(name string, r video.Reader, s VideoSetting) (io.ReadCloser, error) { b, ok := videoEncoders[name] if !ok { return nil, fmt.Errorf("codec: can't find %s video encoder", name) } - return b(s) + return b(r, s) } func BuildAudioEncoder(name string, r audio.Reader, s AudioSetting) (io.ReadCloser, error) { diff --git a/pkg/driver/camera_linux.go b/pkg/driver/camera_linux.go index 877b2d3..65ef3a4 100644 --- a/pkg/driver/camera_linux.go +++ b/pkg/driver/camera_linux.go @@ -4,8 +4,12 @@ package driver import "C" import ( + "image" + "io" + "github.com/blackjack/webcam" "github.com/pion/mediadevices/pkg/frame" + "github.com/pion/mediadevices/pkg/io/video" ) // Camera implementation using v4l2 @@ -78,41 +82,51 @@ func (c *camera) Close() error { return c.cam.StopStreaming() } -func (c *camera) Start(setting VideoSetting, cb DataCb) error { - pf := c.reversedFormats[setting.FrameFormat] - _, _, _, err := c.cam.SetImageFormat(pf, uint32(setting.Width), uint32(setting.Height)) +func (c *camera) Start(setting VideoSetting) (video.Reader, error) { + decoder, err := frame.NewDecoder(setting.FrameFormat) if err != nil { - return err + return nil, err + } + + pf := c.reversedFormats[setting.FrameFormat] + _, _, _, err = c.cam.SetImageFormat(pf, uint32(setting.Width), uint32(setting.Height)) + if err != nil { + return nil, err } if err := c.cam.StartStreaming(); err != nil { - return err + return nil, err } - for { - err := c.cam.WaitForFrame(5) - switch err.(type) { - case nil: - case *webcam.Timeout: - continue - default: - // Camera has been stopped. We don't need to return an error. - return nil - } + r := video.ReaderFunc(func() (img image.Image, err error) { + // Wait until a frame is ready + for { + err := c.cam.WaitForFrame(5) + switch err.(type) { + case nil: + case *webcam.Timeout: + continue + default: + // Camera has been stopped. + return nil, io.EOF + } - frame, err := c.cam.ReadFrame() - if err != nil { - // Camera has been stopped. We don't need to return an error. - return nil - } + b, err := c.cam.ReadFrame() + if err != nil { + // Camera has been stopped. + return nil, io.EOF + } - // Frame is not ready. - if len(frame) == 0 { - continue - } + // Frame is not ready. + if len(b) == 0 { + continue + } - cb(frame) - } + return decoder.Decode(b, setting.Width, setting.Height) + } + }) + + return r, nil } func (c *camera) Stop() error { diff --git a/pkg/driver/driver.go b/pkg/driver/driver.go index c64ed4f..af7f59b 100644 --- a/pkg/driver/driver.go +++ b/pkg/driver/driver.go @@ -1,8 +1,9 @@ package driver import ( - "github.com/pion/mediadevices/pkg/io/audio" "github.com/pion/mediadevices/pkg/frame" + "github.com/pion/mediadevices/pkg/io/audio" + "github.com/pion/mediadevices/pkg/io/video" ) type State uint @@ -14,8 +15,6 @@ const ( StateStopped ) -type DataCb func(b []byte) - type OpenCloser interface { Open() error Close() error @@ -31,7 +30,7 @@ type Info struct { } type VideoCapable interface { - Start(setting VideoSetting, cb DataCb) error + Start(setting VideoSetting) (video.Reader, error) Stop() error Settings() []VideoSetting } diff --git a/pkg/driver/wrapper.go b/pkg/driver/wrapper.go index 6821b67..654f5f7 100644 --- a/pkg/driver/wrapper.go +++ b/pkg/driver/wrapper.go @@ -2,7 +2,7 @@ package driver import ( "fmt" - + "github.com/pion/mediadevices/pkg/io/video" uuid "github.com/satori/go.uuid" ) @@ -61,36 +61,20 @@ func (w *videoAdapterWrapper) Close() error { return err } -func (w *videoAdapterWrapper) Start(setting VideoSetting, cb DataCb) error { +func (w *videoAdapterWrapper) Start(setting VideoSetting) (video.Reader, error) { if w.state == StateClosed { - return fmt.Errorf("invalid state: driver hasn't been opened") + return nil, fmt.Errorf("invalid state: driver hasn't been opened") } if w.state == StateStarted { - return fmt.Errorf("invalid state: driver has been started") + return nil, fmt.Errorf("invalid state: driver has been started") } - // Make sure if there were 2 errors sent to errCh, none of the - // callers will be blocked. - errCh := make(chan error, 2) - - go func() { - first := true - errCh <- w.VideoAdapter.Start(setting, func(b []byte) { - if first { - errCh <- nil - first = false - } - cb(b) - }) - }() - - // Block until either we receive an error or the first frame - err := <-errCh + r, err := w.VideoAdapter.Start(setting) if err == nil { w.state = StateStarted } - return err + return r, err } func (w *videoAdapterWrapper) Stop() error { diff --git a/pkg/io/error.go b/pkg/io/error.go new file mode 100644 index 0000000..70ff572 --- /dev/null +++ b/pkg/io/error.go @@ -0,0 +1,13 @@ +package io + +import "fmt" + +// InsufficientBufferError tells the caller that the buffer provided is not sufficient/big +// enough to hold the whole data/sample. +type InsufficientBufferError struct { + RequiredSize int +} + +func (e *InsufficientBufferError) Error() string { + return fmt.Sprintf("provided buffer doesn't meet the size requirement of length, %d", e.RequiredSize) +} diff --git a/pkg/io/io.go b/pkg/io/io.go new file mode 100644 index 0000000..d235335 --- /dev/null +++ b/pkg/io/io.go @@ -0,0 +1,11 @@ +package io + +// Copy copies data from src to dst. If dst is not big enough, return an +// InsufficientBufferError. +func Copy(dst, src []byte) (n int, err error) { + if len(dst) < len(src) { + return 0, &InsufficientBufferError{len(dst)} + } + + return copy(dst, src), nil +} diff --git a/track.go b/track.go index 80ebfa9..8d2468c 100644 --- a/track.go +++ b/track.go @@ -7,7 +7,7 @@ import ( "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/driver" - "github.com/pion/mediadevices/pkg/frame" + mio "github.com/pion/mediadevices/pkg/io" "github.com/pion/webrtc/v2" ) @@ -65,8 +65,7 @@ type videoTrack struct { *track d driver.VideoDriver setting driver.VideoSetting - decoder frame.Decoder - encoder codec.VideoEncoder + encoder io.ReadCloser } var _ Tracker = &videoTrack{} @@ -77,12 +76,12 @@ func newVideoTrack(pc *webrtc.PeerConnection, d driver.VideoDriver, setting driv return nil, err } - decoder, err := frame.NewDecoder(setting.FrameFormat) + r, err := d.Start(setting) if err != nil { return nil, err } - encoder, err := codec.BuildVideoEncoder(codecName, codec.VideoSetting{ + encoder, err := codec.BuildVideoEncoder(codecName, r, codec.VideoSetting{ Width: setting.Width, Height: setting.Height, TargetBitRate: 1000000, @@ -96,36 +95,30 @@ func newVideoTrack(pc *webrtc.PeerConnection, d driver.VideoDriver, setting driv track: t, d: d, setting: setting, - decoder: decoder, encoder: encoder, } - err = d.Start(setting, vt.dataCb) - if err != nil { - encoder.Close() - return nil, err - } - + go vt.start() return &vt, nil } -func (vt *videoTrack) dataCb(b []byte) { - img, err := vt.decoder.Decode(b, vt.setting.Width, vt.setting.Height) - if err != nil { - // TODO: probably do some logging here - return - } +func (vt *videoTrack) start() { + var n int + var err error + buff := make([]byte, 1024) + for { + n, err = vt.encoder.Read(buff) + if err != nil { + if e, ok := err.(*mio.InsufficientBufferError); ok { + buff = make([]byte, 2*e.RequiredSize) + continue + } - encoded, err := vt.encoder.Encode(img) - if err != nil { - // TODO: probably do some logging here - return - } + // TODO: better error handling + panic(err) + } - err = vt.s.sample(encoded) - if err != nil { - // TODO: probably do some logging here - return + vt.s.sample(buff[:n]) } }