first-commit

This commit is contained in:
harshabose
2025-02-18 16:41:05 +05:30
commit f91d9277bb
22 changed files with 1075 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
.idea
/third-party

29
go.mod Normal file
View File

@@ -0,0 +1,29 @@
module harshabose/transcode/v1
go 1.23
require (
github.com/asticode/go-astiav v0.33.1 // indirect
github.com/asticode/go-astikit v0.42.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/pion/datachannel v1.5.10 // indirect
github.com/pion/dtls/v3 v3.0.4 // indirect
github.com/pion/ice/v4 v4.0.6 // indirect
github.com/pion/interceptor v0.1.37 // indirect
github.com/pion/logging v0.2.3 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.15 // indirect
github.com/pion/rtp v1.8.11 // indirect
github.com/pion/sctp v1.8.35 // indirect
github.com/pion/sdp/v3 v3.0.10 // indirect
github.com/pion/srtp/v3 v3.0.4 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pion/turn/v4 v4.0.0 // indirect
github.com/pion/webrtc/v4 v4.0.10 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
)

46
go.sum Normal file
View File

@@ -0,0 +1,46 @@
github.com/asticode/go-astiav v0.33.1 h1:fll7jDP1LCosVTpqJNze0z0TAokuyFj+Jjls+g1hsBA=
github.com/asticode/go-astiav v0.33.1/go.mod h1:K7D8UC6GeQt85FUxk2KVwYxHnotrxuEnp5evkkudc2s=
github.com/asticode/go-astikit v0.42.0 h1:pnir/2KLUSr0527Tv908iAH6EGYYrYta132vvjXsH5w=
github.com/asticode/go-astikit v0.42.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o=
github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M=
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI=
github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sctp v1.8.35 h1:qwtKvNK1Wc5tHMIYgTDJhfZk7vATGVHhXbUDfHbYwzA=
github.com/pion/sctp v1.8.35/go.mod h1:EcXP8zCYVTRy3W9xtOF7wJm1L1aXfKRQzaM33SjQlzg=
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw=
github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU=
github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

8
internal/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package internal
import "errors"
var (
ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage")
ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation")
)

1
internal/frame_pool.go Normal file
View File

@@ -0,0 +1 @@
package internal

106
internal/limit_buffer.go Normal file
View File

@@ -0,0 +1,106 @@
package internal
import (
"context"
"fmt"
)
type limitBuffer[T any] struct {
pool Pool[T]
bufferChannel chan *T
inputBuffer chan *T
ctx context.Context
}
func (buffer *limitBuffer[T]) Push(ctx context.Context, element *T) error {
select {
case buffer.inputBuffer <- element:
// WARN: LACKS CHECKS FOR CLOSED CHANNEL
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (buffer *limitBuffer[T]) Pop(ctx context.Context) (*T, error) {
select {
case <-ctx.Done():
return nil, ctx.Err()
case data, ok := <-buffer.bufferChannel:
if !ok {
return nil, ErrorChannelBufferClose
}
if data == nil {
return nil, ErrorElementUnallocated
}
return data, nil
}
}
func (buffer *limitBuffer[T]) Generate() *T {
return buffer.pool.Get()
}
func (buffer *limitBuffer[T]) PutBack(element *T) {
if buffer.pool != nil {
buffer.pool.Put(element)
}
}
func (buffer *limitBuffer[T]) GetChannel() chan *T {
return buffer.bufferChannel
}
func (buffer *limitBuffer[T]) Size() int {
return len(buffer.bufferChannel)
}
func (buffer *limitBuffer[T]) loop() {
defer buffer.close()
loop:
for {
select {
case <-buffer.ctx.Done():
return
case element, ok := <-buffer.inputBuffer:
if !ok || element == nil {
continue loop
}
select {
case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED
continue loop
default:
select {
case oldElement := <-buffer.bufferChannel:
buffer.PutBack(oldElement)
select {
case buffer.bufferChannel <- element:
continue loop
default:
fmt.Println("unexpected buffer state. skipping the element..")
buffer.PutBack(element)
}
}
}
}
}
}
func (buffer *limitBuffer[T]) close() {
loop:
for {
select {
case element := <-buffer.bufferChannel:
if buffer.pool != nil {
buffer.pool.Put(element)
}
default:
close(buffer.bufferChannel)
close(buffer.inputBuffer)
break loop
}
}
if buffer.pool != nil {
buffer.pool.Release()
}
}

82
internal/package.go Normal file
View File

@@ -0,0 +1,82 @@
package internal
import (
"context"
"sync"
"github.com/asticode/go-astiav"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4/pkg/media"
)
type Pool[T any] interface {
Get() *T
Put(*T)
Release()
}
type Buffer[T any] interface {
Push(context.Context, *T) error
Pop(ctx context.Context) (*T, error)
Size() int
}
type BufferWithGenerator[T any] interface {
Push(context.Context, *T) error
Pop(ctx context.Context) (*T, error)
Size() int
Generate() *T
PutBack(*T)
GetChannel() chan *T
}
func CreateFramePool() Pool[astiav.Frame] {
return &framePool{
pool: sync.Pool{
New: func() any {
return astiav.AllocFrame()
},
},
}
}
func CreateSamplePool() Pool[media.Sample] {
return &samplePool{
pool: sync.Pool{
New: func() any {
return &media.Sample{}
},
},
}
}
func CreateRTPPool() Pool[rtp.Packet] {
return &rtpPool{
pool: sync.Pool{
New: func() any {
return &rtp.Packet{}
},
},
}
}
func CreatePacketPool() Pool[astiav.Packet] {
return &packetPool{
pool: sync.Pool{
New: func() any {
return astiav.AllocPacket()
},
},
}
}
func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) BufferWithGenerator[T] {
buffer := &limitBuffer[T]{
pool: pool,
bufferChannel: make(chan *T, size),
inputBuffer: make(chan *T),
ctx: ctx,
}
go buffer.loop()
return buffer
}

1
internal/packet_pool.go Normal file
View File

@@ -0,0 +1 @@
package internal

1
internal/rtp_pool.go Normal file
View File

@@ -0,0 +1 @@
package internal

1
internal/sample_pool.go Normal file
View File

@@ -0,0 +1 @@
package internal

1
internal/timer_buffer.go Normal file
View File

@@ -0,0 +1 @@
package internal

21
main.go Normal file
View File

@@ -0,0 +1,21 @@
package main
import (
"fmt"
)
// TIP <p>To run your code, right-click the code and select <b>Run</b>.</p> <p>Alternatively, click
// the <icon src="AllIcons.Actions.Execute"/> icon in the gutter and select the <b>Run</b> menu item from here.</p>
func main() {
// TIP <p>Press <shortcut actionId="ShowIntentionActions"/> when your caret is at the underlined text
// to see how GoLand suggests fixing the warning.</p><p>Alternatively, if available, click the lightbulb to view possible fixes.</p>
s := "gopher"
fmt.Printf("Hello and welcome, %s!\n", s)
for i := 1; i <= 5; i++ {
// TIP <p>To start your debugging session, right-click your code in the editor and select the Debug option.</p> <p>We have set one <icon src="AllIcons.Debugger.Db_set_breakpoint"/> breakpoint
// for you, but you can always add more by pressing <shortcut actionId="ToggleLineBreakpoint"/>.</p>
fmt.Println("i =", 100/i)
}
}

13
pkg/constants.go Normal file
View File

@@ -0,0 +1,13 @@
package pkg
import "github.com/asticode/go-astiav"
const (
DefaultVideoPayloadType = 96
DefaultVideoFPS = int(25)
DefaultVideoClockRate = int(90000)
DefaultVideoHeight = int(1080)
DefaultVideoWidth = int(1920)
DefaultVideoPixFormat = astiav.PixelFormatYuv420P
DefaultVideoEncoderCodec = astiav.CodecIDH264
)

120
pkg/decoder.go Normal file
View File

@@ -0,0 +1,120 @@
package pkg
import (
"context"
"errors"
"fmt"
"time"
"github.com/asticode/go-astiav"
"harshabose/transcode/v1/internal"
)
type Decoder struct {
demuxer *Demuxer
decoderContext *astiav.CodecContext
codec *astiav.Codec
buffer internal.BufferWithGenerator[astiav.Frame]
ctx context.Context
}
func CreateDecoder(ctx context.Context, demuxer *Demuxer, options ...DecoderOption) (*Decoder, error) {
decoder := &Decoder{
demuxer: demuxer,
buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreateFramePool()),
ctx: ctx,
}
var err error
for _, option := range options {
if err = option(decoder); err != nil {
return nil, err
}
}
if err = decoder.decoderContext.Open(decoder.codec, nil); err != nil {
return nil, err
}
return decoder, nil
}
func (decoder *Decoder) Start() {
go decoder.loop()
}
func (decoder *Decoder) loop() {
var (
packet *astiav.Packet
frame *astiav.Frame
err error
)
defer decoder.close()
loop1:
for {
select {
case <-decoder.ctx.Done():
return
case packet = <-decoder.demuxer.WaitForPacket():
if err := decoder.decoderContext.SendPacket(packet); err != nil {
decoder.demuxer.PutBack(packet)
if !errors.Is(err, astiav.ErrEagain) {
continue loop1
}
}
loop2:
for {
frame = decoder.buffer.Generate()
if err := decoder.decoderContext.ReceiveFrame(frame); err != nil {
decoder.buffer.PutBack(frame)
break loop2
}
frame.SetPictureType(astiav.PictureTypeNone)
if err = decoder.pushFrame(frame); err != nil {
fmt.Println("warning: frame dropped!")
decoder.buffer.PutBack(frame)
continue loop2
}
}
decoder.demuxer.PutBack(packet)
}
}
}
func (decoder *Decoder) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(decoder.ctx, time.Second/time.Duration(DefaultVideoFPS))
defer cancel()
return decoder.buffer.Push(ctx, frame)
}
func (decoder *Decoder) GetFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(decoder.ctx, time.Second/time.Duration(DefaultVideoFPS))
defer cancel()
return decoder.buffer.Pop(ctx)
}
func (decoder *Decoder) WaitForFrame() chan *astiav.Frame {
return decoder.buffer.GetChannel()
}
func (decoder *Decoder) PutBack(frame *astiav.Frame) {
decoder.buffer.PutBack(frame)
}
func (decoder *Decoder) GetSrcFilterContextOptions() func(filter types.BaseFilter) error {
return filt.VideoSetFilterContextParameters(decoder.decoderContext)
}
func (decoder *Decoder) close() {
if decoder.decoderContext != nil {
decoder.decoderContext.Free()
}
}

29
pkg/decoder_options.go Normal file
View File

@@ -0,0 +1,29 @@
package pkg
import "github.com/asticode/go-astiav"
type DecoderOption = func(*Decoder) error
func VideoSetDecoderContext(codecParameters *astiav.CodecParameters, videoStream *astiav.Stream, formatContext *astiav.FormatContext) func(*Decoder) error {
return func(decoder *Decoder) error {
var (
err error
)
if decoder.codec = astiav.FindDecoder(codecParameters.CodecID()); decoder.codec == nil {
return ErrorNoCodecFound
}
if decoder.decoderContext = astiav.AllocCodecContext(decoder.codec); decoder.decoderContext == nil {
return ErrorAllocateCodecContext
}
if err = videoStream.CodecParameters().ToCodecContext(decoder.decoderContext); err != nil {
return ErrorFillCodecContext
}
decoder.decoderContext.SetFramerate(formatContext.GuessFrameRate(videoStream, nil))
decoder.decoderContext.SetTimeBase(videoStream.TimeBase())
return nil
}
}

134
pkg/demuxer.go Normal file
View File

@@ -0,0 +1,134 @@
package pkg
import (
"context"
"time"
"github.com/asticode/go-astiav"
"harshabose/transcode/v1/internal"
)
type Demuxer struct {
formatContext *astiav.FormatContext
inputOptions *astiav.Dictionary
stream *astiav.Stream
codecParameters *astiav.CodecParameters
buffer internal.BufferWithGenerator[astiav.Packet]
ctx context.Context
}
func CreateDemuxer(ctx context.Context, containerAddress string, options ...DemuxerOption) (*Demuxer, error) {
astiav.RegisterAllDevices()
demuxer := &Demuxer{
formatContext: astiav.AllocFormatContext(),
inputOptions: astiav.NewDictionary(),
buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreatePacketPool()),
ctx: ctx,
}
if demuxer.formatContext == nil {
return nil, ErrorAllocateFormatContext
}
for _, option := range options {
if err := option(demuxer); err != nil {
return nil, err
}
}
if err := demuxer.formatContext.OpenInput(containerAddress, nil, nil); err != nil {
return nil, ErrorOpenInputContainer
}
if err := demuxer.formatContext.FindStreamInfo(nil); err != nil {
return nil, ErrorNoStreamFound
}
for _, stream := range demuxer.formatContext.Streams() {
if stream.CodecParameters().MediaType() == astiav.MediaTypeVideo {
demuxer.stream = stream
break
}
}
if demuxer.stream == nil {
return nil, ErrorNoVideoStreamFound
}
demuxer.codecParameters = demuxer.stream.CodecParameters()
return demuxer, nil
}
func (demuxer *Demuxer) Start() {
go demuxer.loop()
}
func (demuxer *Demuxer) loop() {
defer demuxer.close()
ticker := time.NewTicker(time.Millisecond)
defer ticker.Stop()
loop1:
for {
select {
case <-demuxer.ctx.Done():
return
case <-ticker.C:
loop2:
for {
packet := demuxer.buffer.Generate()
if err := demuxer.formatContext.ReadFrame(packet); err != nil {
demuxer.buffer.PutBack(packet)
continue loop1
}
if packet.StreamIndex() != demuxer.stream.Index() {
demuxer.buffer.PutBack(packet)
continue loop2
}
if err := demuxer.pushPacket(packet); err != nil {
demuxer.buffer.PutBack(packet)
continue loop1
}
break loop2
}
}
}
}
func (demuxer *Demuxer) pushPacket(packet *astiav.Packet) error {
ctx, cancel := context.WithTimeout(demuxer.ctx, time.Second/time.Duration(DefaultVideoFPS)) // TODO: NEEDS TO BE BASED ON FPS ON INPUT_FORMAT
defer cancel()
return demuxer.buffer.Push(ctx, packet)
}
func (demuxer *Demuxer) WaitForPacket() chan *astiav.Packet {
return demuxer.buffer.GetChannel()
}
func (demuxer *Demuxer) GetPacket() (*astiav.Packet, error) {
ctx, cancel := context.WithTimeout(demuxer.ctx, time.Second/time.Duration(DefaultVideoFPS))
defer cancel()
return demuxer.buffer.Pop(ctx)
}
func (demuxer *Demuxer) GetDecoderContextOptions() func(*Decoder) error {
return VideoSetDecoderContext(demuxer.codecParameters, demuxer.stream, demuxer.formatContext)
}
func (demuxer *Demuxer) PutBack(packet *astiav.Packet) {
demuxer.buffer.PutBack(packet)
}
func (demuxer *Demuxer) close() {
if demuxer.formatContext != nil {
demuxer.formatContext.CloseInput()
demuxer.formatContext.Free()
}
}

25
pkg/demuxer_options.go Normal file
View File

@@ -0,0 +1,25 @@
package pkg
type DemuxerOption = func(*Demuxer) error
func WithRTSPInputOption(demuxer *Demuxer) error {
var err error = nil
if err = demuxer.inputOptions.Set("rtsp_transport", "tcp", 0); err != nil {
return err
}
if err = demuxer.inputOptions.Set("stimeout", "5000000", 0); err != nil {
return err
}
if err = demuxer.inputOptions.Set("fflags", "nobuffer", 0); err != nil {
return err
}
if err = demuxer.inputOptions.Set("flags", "low_delay", 0); err != nil {
return err
}
if err = demuxer.inputOptions.Set("reorder_queue_size", "0", 0); err != nil {
return err
}
return nil
}

168
pkg/encoder.go Normal file
View File

@@ -0,0 +1,168 @@
package pkg
import (
"context"
"errors"
"time"
"github.com/asticode/go-astiav"
"harshabose/transcode/v1/internal"
)
type Encoder struct {
buffer internal.BufferWithGenerator[astiav.Packet]
filter *Filter
ctx context.Context
codec *astiav.Codec
encoderContext *astiav.CodecContext
h264options *astiav.Dictionary
encoderSettings encoderCodecSetting
sps []byte
pps []byte
}
func CreateEncoder(ctx context.Context, codecID astiav.CodecID, filter *Filter, options ...EncoderOption) (*Encoder, error) {
encoder := &Encoder{
buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreatePacketPool()),
filter: filter,
h264options: astiav.NewDictionary(),
encoderSettings: EncoderCodecNoSetting,
ctx: ctx,
}
encoder.codec = astiav.FindEncoder(codecID)
if encoder.encoderContext = astiav.AllocCodecContext(encoder.codec); encoder.encoderContext == nil {
return nil, ErrorAllocateCodecContext
}
for _, option := range options {
if err := option(encoder); err != nil {
return nil, err
}
}
if encoder.encoderSettings == EncoderCodecNoSetting {
return nil, ErrorCodecNoSetting
}
encoder.encoderContext.SetFlags(astiav.NewCodecContextFlags(astiav.CodecContextFlagGlobalHeader))
if err := encoder.encoderContext.Open(encoder.codec, encoder.h264options); err != nil {
return nil, err
}
encoder.findParameterSets(encoder.encoderContext.ExtraData())
return encoder, nil
}
func (encoder *Encoder) Start() {
go encoder.loop()
}
func (encoder *Encoder) loop() {
var (
frame *astiav.Frame
packet *astiav.Packet
err error
)
defer encoder.close()
loop1:
for {
select {
case <-encoder.ctx.Done():
return
case frame = <-encoder.filter.WaitForFrame():
if err = encoder.encoderContext.SendFrame(frame); err != nil {
encoder.filter.PutBack(frame)
if !errors.Is(err, astiav.ErrEagain) {
continue loop1
}
}
loop2:
for {
packet = encoder.buffer.Generate()
if err = encoder.encoderContext.ReceivePacket(packet); err != nil {
encoder.buffer.PutBack(packet)
break loop2
}
if err = encoder.pushPacket(packet); err != nil {
encoder.buffer.PutBack(packet)
continue loop2
}
}
encoder.filter.PutBack(frame)
}
}
}
func (encoder *Encoder) WaitForPacket() chan *astiav.Packet {
return encoder.buffer.GetChannel()
}
func (encoder *Encoder) pushPacket(packet *astiav.Packet) error {
ctx, cancel := context.WithTimeout(encoder.ctx, time.Second)
defer cancel()
return encoder.buffer.Push(ctx, packet)
}
func (encoder *Encoder) GetPacket() (*astiav.Packet, error) {
ctx, cancel := context.WithTimeout(encoder.ctx, time.Second)
defer cancel()
return encoder.buffer.Pop(ctx)
}
func (encoder *Encoder) PutBack(packet *astiav.Packet) {
encoder.buffer.PutBack(packet)
}
func (encoder *Encoder) findParameterSets(extraData []byte) {
if len(extraData) > 0 {
// Find first start code (0x00000001)
for i := 0; i < len(extraData)-4; i++ {
if extraData[i] == 0 && extraData[i+1] == 0 && extraData[i+2] == 0 && extraData[i+3] == 1 {
// Skip start code to get NAL type
nalType := extraData[i+4] & 0x1F
// Find next start code or end
nextStart := len(extraData)
for j := i + 4; j < len(extraData)-4; j++ {
if extraData[j] == 0 && extraData[j+1] == 0 && extraData[j+2] == 0 && extraData[j+3] == 1 {
nextStart = j
break
}
}
if nalType == 7 { // SPS
encoder.sps = make([]byte, nextStart-i)
copy(encoder.sps, extraData[i:nextStart])
} else if nalType == 8 { // PPS
encoder.pps = make([]byte, len(extraData)-i)
copy(encoder.pps, extraData[i:])
}
i = nextStart - 1
}
}
}
}
func (encoder *Encoder) GetSPS() []byte {
return encoder.sps
}
func (encoder *Encoder) GetPPS() []byte {
return encoder.pps
}
func (encoder *Encoder) close() {
if encoder.encoderContext != nil {
encoder.encoderContext.Free()
}
}

14
pkg/encoder_options.go Normal file
View File

@@ -0,0 +1,14 @@
package pkg
type (
encoderCodecSetting string
EncoderOption = func(*Encoder) error
)
const (
EncoderCodecNoSetting encoderCodecSetting = "None"
EncoderCodecDefaultSetting encoderCodecSetting = "default"
EncoderCodecHighQualitySetting encoderCodecSetting = "high-quality"
EncoderCodecLowLatencySetting encoderCodecSetting = "low-latency"
EncoderCodecLowBandwidthSetting encoderCodecSetting = "low-bandwidth"
)

25
pkg/errors.go Normal file
View File

@@ -0,0 +1,25 @@
package pkg
import "errors"
var (
ErrorAllocateFormatContext = errors.New("error allocate format context")
ErrorOpenInputContainer = errors.New("error opening container")
ErrorNoStreamFound = errors.New("error no stream found")
ErrorNoVideoStreamFound = errors.New("no video stream found")
ErrorNoCodecFound = errors.New("error no codec found")
ErrorAllocateCodecContext = errors.New("error allocating codec context")
ErrorFillCodecContext = errors.New("error filling the codec context")
ErrorNoFilterName = errors.New("error filter name does not exists")
WarnNoFilterContent = errors.New("content is empty. no filtering will be done")
ErrorGraphParse = errors.New("error parsing the filter graph")
ErrorGraphConfigure = errors.New("error configuring the filter graph")
ErrorSrcContextSetParameter = errors.New("error while setting parameters to source context")
ErrorSrcContextInitialise = errors.New("error initialising the source context")
ErrorAllocSrcContext = errors.New("error setting source context")
ErrorAllocSinkContext = errors.New("error setting sink context")
ErrorCodecNoSetting = errors.New("error no settings given")
)

175
pkg/filter.go Normal file
View File

@@ -0,0 +1,175 @@
package pkg
import (
"context"
"fmt"
"time"
"github.com/asticode/go-astiav"
"harshabose/transcode/v1/internal"
)
type Filter struct {
content string
decoder *Decoder
buffer internal.BufferWithGenerator[astiav.Frame]
graph *astiav.FilterGraph
input *astiav.FilterInOut
output *astiav.FilterInOut
srcContext *astiav.BuffersrcFilterContext
sinkContext *astiav.BuffersinkFilterContext
srcContextParams *astiav.BuffersrcFilterContextParameters // NOTE: THIS BECOMES NIL AFTER INITIALISATION
ctx context.Context
}
func CreateFilter(ctx context.Context, decoder *Decoder, filterConfig *Config, options ...FilterOption) (*Filter, error) {
var (
filter *Filter
filterSrc *astiav.Filter
filterSink *astiav.Filter
err error
)
filter = &Filter{
graph: astiav.AllocFilterGraph(),
decoder: decoder,
buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreateFramePool()),
input: astiav.AllocFilterInOut(),
output: astiav.AllocFilterInOut(),
srcContextParams: astiav.AllocBuffersrcFilterContextParameters(),
ctx: ctx,
}
// TODO: CHECK IF ALL ATTRIBUTES ARE ALLOCATED PROPERLY
if filterSrc = astiav.FindFilterByName(filterConfig.Source.String()); filterSrc == nil {
return nil, ErrorNoFilterName
}
if filterSink = astiav.FindFilterByName(filterConfig.Sink.String()); filterSink == nil {
return nil, ErrorNoFilterName
}
if filter.srcContext, err = filter.graph.NewBuffersrcFilterContext(filterSrc, "in"); err != nil {
return nil, ErrorAllocSrcContext
}
if filter.sinkContext, err = filter.graph.NewBuffersinkFilterContext(filterSink, "out"); err != nil {
return nil, ErrorAllocSinkContext
}
for _, option := range options {
if err = option(filter); err != nil {
// TODO: SET CONTENT HERE
return nil, err
}
}
if err = filter.srcContext.SetParameters(filter.srcContextParams); err != nil {
return nil, ErrorSrcContextSetParameter
}
if err = filter.srcContext.Initialize(nil); err != nil {
return nil, ErrorSrcContextInitialise
}
filter.output.SetName("in")
filter.output.SetFilterContext(filter.srcContext.FilterContext())
filter.output.SetPadIdx(0)
filter.output.SetNext(nil)
filter.input.SetName("out")
filter.input.SetFilterContext(filter.sinkContext.FilterContext())
filter.input.SetPadIdx(0)
filter.input.SetNext(nil)
if filter.content == "" {
fmt.Println(WarnNoFilterContent)
}
if err = filter.graph.Parse(filter.content, filter.input, filter.output); err != nil {
return nil, ErrorGraphParse
}
if err = filter.graph.Configure(); err != nil {
return nil, ErrorGraphConfigure
}
if filter.srcContextParams != nil {
filter.srcContextParams.Free()
}
return filter, nil
}
func (filter *Filter) Start() {
go filter.loop()
}
func (filter *Filter) loop() {
var (
err error = nil
srcFrame *astiav.Frame
sinkFrame *astiav.Frame
)
defer filter.close()
loop1:
for {
select {
case <-filter.ctx.Done():
return
case srcFrame = <-filter.decoder.WaitForFrame():
if err = filter.srcContext.AddFrame(srcFrame, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil {
filter.buffer.PutBack(srcFrame)
continue loop1
}
loop2:
for {
sinkFrame = filter.buffer.Generate()
if err = filter.sinkContext.GetFrame(sinkFrame, astiav.NewBuffersinkFlags()); err != nil {
filter.buffer.PutBack(sinkFrame)
break loop2
}
if err = filter.pushFrame(sinkFrame); err != nil {
filter.buffer.PutBack(sinkFrame)
continue loop2
}
}
filter.decoder.PutBack(srcFrame)
}
}
}
func (filter *Filter) pushFrame(frame *astiav.Frame) error {
ctx, cancel := context.WithTimeout(filter.ctx, time.Second)
defer cancel()
return filter.buffer.Push(ctx, frame)
}
func (filter *Filter) GetFrame() (*astiav.Frame, error) {
ctx, cancel := context.WithTimeout(filter.ctx, time.Second)
defer cancel()
return filter.buffer.Pop(ctx)
}
func (filter *Filter) PutBack(frame *astiav.Frame) {
filter.buffer.PutBack(frame)
}
func (filter *Filter) WaitForFrame() chan *astiav.Frame {
return filter.buffer.GetChannel()
}
func (filter *Filter) close() {
if filter.graph != nil {
filter.graph.Free()
}
if filter.input != nil {
filter.input.Free()
}
if filter.output != nil {
filter.output.Free()
}
}

73
pkg/filter_options.go Normal file
View File

@@ -0,0 +1,73 @@
package pkg
import (
"fmt"
"github.com/asticode/go-astiav"
)
type (
FilterOption func(*Filter) error
Name string
)
func (f Name) String() string {
return string(f)
}
type Config struct {
Source Name
Sink Name
}
const (
videoBufferFilterName Name = "buffer"
videoBufferSinkFilterName Name = "buffersink"
)
var (
VideoFilters = &Config{
Source: videoBufferFilterName,
Sink: videoBufferSinkFilterName,
}
)
func VideoSetFilterContextParameters(codecContext *astiav.CodecContext) func(*Filter) error {
return func(filter *Filter) error {
filter.srcContextParams.SetHeight(codecContext.Height())
filter.srcContextParams.SetPixelFormat(codecContext.PixelFormat())
filter.srcContextParams.SetSampleAspectRatio(codecContext.SampleAspectRatio())
filter.srcContextParams.SetTimeBase(codecContext.TimeBase())
filter.srcContextParams.SetWidth(codecContext.Width())
return nil
}
}
func WithDefaultVideoFilterContentOptions(filter *Filter) error {
if err := videoScaleFilterContent(filter); err != nil {
return err
}
if err := videoPixelFormatFilterContent(filter); err != nil {
return err
}
if err := videoFPSFilterContent(filter); err != nil {
return err
}
return nil
}
func videoScaleFilterContent(filter *Filter) error {
filter.content += fmt.Sprintf("scale=%d:%d,", DefaultVideoWidth, DefaultVideoHeight)
return nil
}
func videoPixelFormatFilterContent(filter *Filter) error {
filter.content += fmt.Sprintf("format=pix_fmts=%s,", DefaultVideoPixFormat)
return nil
}
func videoFPSFilterContent(filter *Filter) error {
filter.content += fmt.Sprintf("fps=%d,", DefaultVideoFPS)
return nil
}