From 7fdafa95985314d0fc3fe8963480f2fa7f9b6501 Mon Sep 17 00:00:00 2001 From: Lei Kang Date: Fri, 12 Sep 2025 15:35:07 -0700 Subject: [PATCH] add codec decoder interface --- pkg/codec/codec.go | 11 +++++ pkg/codec/vpx/vpx_decoder.go | 86 +++++++++++++++++++++++++----------- pkg/codec/vpx/vpx_image.go | 40 ----------------- pkg/codec/vpx/vpx_test.go | 17 ++++--- 4 files changed, 82 insertions(+), 72 deletions(-) delete mode 100644 pkg/codec/vpx/vpx_image.go diff --git a/pkg/codec/codec.go b/pkg/codec/codec.go index f584c8f..f933c43 100644 --- a/pkg/codec/codec.go +++ b/pkg/codec/codec.go @@ -1,6 +1,8 @@ package codec import ( + "image" + "io" "time" "github.com/pion/mediadevices/pkg/io/audio" @@ -153,6 +155,15 @@ type ReadCloser interface { Controllable } +type VideoDecoderBuilder interface { + BuildVideoDecoder(r io.Reader, p prop.Media) (VideoDecoder, error) +} + +type VideoDecoder interface { + Read() (image.Image, func(), error) + Close() error +} + // EncoderController is the interface allowing to control the encoder behaviour after it's initialisation. // It will possibly have common control method in the future. // A controller can have optional methods represented by *Controller interfaces diff --git a/pkg/codec/vpx/vpx_decoder.go b/pkg/codec/vpx/vpx_decoder.go index de015bb..a34d1d9 100644 --- a/pkg/codec/vpx/vpx_decoder.go +++ b/pkg/codec/vpx/vpx_decoder.go @@ -51,25 +51,35 @@ void freeDecoderCtx(vpx_codec_ctx_t* ctx) { import "C" import ( "fmt" + "image" + "io" "sync" "time" + "unsafe" + "github.com/pion/mediadevices/pkg/codec" "github.com/pion/mediadevices/pkg/prop" ) -type Decoder struct { +type decoder struct { codec *C.vpx_codec_ctx_t raw *C.vpx_image_t cfg *C.vpx_codec_dec_cfg_t frameIndex int tStart time.Time tLastFrame time.Time + reader io.Reader + buf []byte mu sync.Mutex closed bool } -func NewDecoder(p prop.Media) (Decoder, error) { +func BuildVideoDecoder(r io.Reader, property prop.Media) (codec.VideoDecoder, error) { + return NewDecoder(r, property) +} + +func NewDecoder(r io.Reader, p prop.Media) (codec.VideoDecoder, error) { cfg := &C.vpx_codec_dec_cfg_t{} cfg.threads = 1 cfg.w = C.uint(p.Width) @@ -77,35 +87,61 @@ func NewDecoder(p prop.Media) (Decoder, error) { codec := C.newDecoderCtx() if C.decoderInit(codec, C.ifaceVP8Decoder()) != C.VPX_CODEC_OK { - return Decoder{}, fmt.Errorf("vpx_codec_dec_init failed") + return nil, fmt.Errorf("vpx_codec_dec_init failed") } - return Decoder{ - codec: codec, - cfg: cfg, + return &decoder{ + codec: codec, + cfg: cfg, + reader: r, + buf: make([]byte, 1024*1024), }, nil } -func (d *Decoder) Decode(data []byte) error { - if len(data) == 0 { - return fmt.Errorf("empty data") +func (d *decoder) Read() (image.Image, func(), error) { + + n, err := d.reader.Read(d.buf) + if err != nil { + return nil, nil, err } - status := C.decodeFrame(d.codec, (*C.uint8_t)(&data[0]), C.uint(len(data))) - if status != C.VPX_CODEC_OK { - return fmt.Errorf("Decode failed: %v", status) + if n > 0 { + status := C.decodeFrame(d.codec, (*C.uint8_t)(&d.buf[0]), C.uint(n)) + if status != C.VPX_CODEC_OK { + return nil, nil, fmt.Errorf("Decode failed: %v", status) + } } + + var iter C.vpx_codec_iter_t = nil // initialize to NULL to start iteration + input := C.getFrame(d.codec, &iter) + if input == nil { + return nil, nil, io.EOF + } + w := int(input.d_w) + h := int(input.d_h) + yStride := int(input.stride[0]) + uStride := int(input.stride[1]) + vStride := int(input.stride[2]) + + ySrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[0])), yStride*h) + uSrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[1])), uStride*h/2) + vSrc := unsafe.Slice((*byte)(unsafe.Pointer(input.planes[2])), vStride*h/2) + + dst := image.NewYCbCr(image.Rect(0, 0, w, h), image.YCbCrSubsampleRatio420) + + // copy luma + for r := 0; r < h; r++ { + copy(dst.Y[r*dst.YStride:r*dst.YStride+w], ySrc[r*yStride:r*yStride+w]) + } + // copy chroma + for r := 0; r < h/2; r++ { + copy(dst.Cb[r*dst.CStride:r*dst.CStride+w/2], uSrc[r*uStride:r*uStride+w/2]) + copy(dst.Cr[r*dst.CStride:r*dst.CStride+w/2], vSrc[r*vStride:r*vStride+w/2]) + } + return dst, func() {}, nil +} + +func (d *decoder) Close() error { + C.freeDecoderCtx(d.codec) + d.closed = true return nil } - -func (d *Decoder) GetFrame() *VpxImage { - var iter C.vpx_codec_iter_t = nil // initialize to NULL to start iteration - img := C.getFrame(d.codec, &iter) - if img == nil { - return nil - } - return &VpxImage{img: img} -} - -func (d *Decoder) FreeDecoderCtx() { - C.freeDecoderCtx(d.codec) -} diff --git a/pkg/codec/vpx/vpx_image.go b/pkg/codec/vpx/vpx_image.go deleted file mode 100644 index 5ba4dae..0000000 --- a/pkg/codec/vpx/vpx_image.go +++ /dev/null @@ -1,40 +0,0 @@ -package vpx - -/* -#cgo pkg-config: vpx -#include -*/ -import "C" -import "unsafe" - -type VpxImage struct { - img *C.vpx_image_t -} - -func NewImageFromPtr(ptr *C.vpx_image_t) *VpxImage { - return &VpxImage{img: ptr} -} - -func (i *VpxImage) Width() int { - return int(i.img.d_w) -} - -func (i *VpxImage) Height() int { - return int(i.img.d_h) -} - -func (i *VpxImage) YStride() int { - return int(i.img.stride[0]) -} - -func (i *VpxImage) UStride() int { - return int(i.img.stride[1]) -} - -func (i *VpxImage) VStride() int { - return int(i.img.stride[2]) -} - -func (i *VpxImage) Plane(n int) unsafe.Pointer { - return unsafe.Pointer(i.img.planes[n]) -} diff --git a/pkg/codec/vpx/vpx_test.go b/pkg/codec/vpx/vpx_test.go index c744392..141500b 100644 --- a/pkg/codec/vpx/vpx_test.go +++ b/pkg/codec/vpx/vpx_test.go @@ -429,7 +429,8 @@ func TestVP8DynamicQPControl(t *testing.T) { func TestVP8EncodeDecode(t *testing.T) { t.Run("VP8", func(t *testing.T) { initialWidth, initialHeight := 800, 600 - decoder, err := NewDecoder(prop.Media{ + reader, writer := io.Pipe() + decoder, err := BuildVideoDecoder(reader, prop.Media{ Video: prop.Video{ Width: initialWidth, Height: initialHeight, @@ -439,7 +440,7 @@ func TestVP8EncodeDecode(t *testing.T) { if err != nil { t.Fatalf("Error creating VP8 decoder: %v", err) } - defer decoder.FreeDecoderCtx() + defer decoder.Close() // [... encoder setup code ...] p, err := NewVP8Params() @@ -479,10 +480,8 @@ func TestVP8EncodeDecode(t *testing.T) { defer rel() // Decode the frame - err = decoder.Decode(data) - if err != nil { - t.Fatal(err) - } + writer.Write(data) + writer.Close() // Poll for frame with timeout timeout := time.After(2 * time.Second) @@ -494,7 +493,11 @@ func TestVP8EncodeDecode(t *testing.T) { case <-timeout: t.Fatal("Timeout: No frame received within 2 seconds") case <-ticker.C: - frame := decoder.GetFrame() + frame, rel, err := decoder.Read() + if err != nil { + t.Fatal(err) + } + defer rel() if frame != nil { t.Log("Successfully received and decoded frame") return // Test passes