From f91d9277bba0d416fdcfa3e4c4aa193df159c739 Mon Sep 17 00:00:00 2001 From: harshabose Date: Tue, 18 Feb 2025 16:41:05 +0530 Subject: [PATCH] first-commit --- .gitignore | 2 + go.mod | 29 +++++++ go.sum | 46 ++++++++++ internal/errors.go | 8 ++ internal/frame_pool.go | 1 + internal/limit_buffer.go | 106 ++++++++++++++++++++++++ internal/package.go | 82 ++++++++++++++++++ internal/packet_pool.go | 1 + internal/rtp_pool.go | 1 + internal/sample_pool.go | 1 + internal/timer_buffer.go | 1 + main.go | 21 +++++ pkg/constants.go | 13 +++ pkg/decoder.go | 120 +++++++++++++++++++++++++++ pkg/decoder_options.go | 29 +++++++ pkg/demuxer.go | 134 ++++++++++++++++++++++++++++++ pkg/demuxer_options.go | 25 ++++++ pkg/encoder.go | 168 +++++++++++++++++++++++++++++++++++++ pkg/encoder_options.go | 14 ++++ pkg/errors.go | 25 ++++++ pkg/filter.go | 175 +++++++++++++++++++++++++++++++++++++++ pkg/filter_options.go | 73 ++++++++++++++++ 22 files changed, 1075 insertions(+) create mode 100644 .gitignore create mode 100644 go.mod create mode 100644 go.sum create mode 100644 internal/errors.go create mode 100644 internal/frame_pool.go create mode 100644 internal/limit_buffer.go create mode 100644 internal/package.go create mode 100644 internal/packet_pool.go create mode 100644 internal/rtp_pool.go create mode 100644 internal/sample_pool.go create mode 100644 internal/timer_buffer.go create mode 100644 main.go create mode 100644 pkg/constants.go create mode 100644 pkg/decoder.go create mode 100644 pkg/decoder_options.go create mode 100644 pkg/demuxer.go create mode 100644 pkg/demuxer_options.go create mode 100644 pkg/encoder.go create mode 100644 pkg/encoder_options.go create mode 100644 pkg/errors.go create mode 100644 pkg/filter.go create mode 100644 pkg/filter_options.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..6eacd7d --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +/third-party \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..0957e2f --- /dev/null +++ b/go.mod @@ -0,0 +1,29 @@ +module harshabose/transcode/v1 + +go 1.23 + +require ( + github.com/asticode/go-astiav v0.33.1 // indirect + github.com/asticode/go-astikit v0.42.0 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/pion/datachannel v1.5.10 // indirect + github.com/pion/dtls/v3 v3.0.4 // indirect + github.com/pion/ice/v4 v4.0.6 // indirect + github.com/pion/interceptor v0.1.37 // indirect + github.com/pion/logging v0.2.3 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.15 // indirect + github.com/pion/rtp v1.8.11 // indirect + github.com/pion/sctp v1.8.35 // indirect + github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/srtp/v3 v3.0.4 // indirect + github.com/pion/stun/v3 v3.0.0 // indirect + github.com/pion/transport/v3 v3.0.7 // indirect + github.com/pion/turn/v4 v4.0.0 // indirect + github.com/pion/webrtc/v4 v4.0.10 // indirect + github.com/wlynxg/anet v0.0.5 // indirect + golang.org/x/crypto v0.32.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..702edf6 --- /dev/null +++ b/go.sum @@ -0,0 +1,46 @@ +github.com/asticode/go-astiav v0.33.1 h1:fll7jDP1LCosVTpqJNze0z0TAokuyFj+Jjls+g1hsBA= +github.com/asticode/go-astiav v0.33.1/go.mod h1:K7D8UC6GeQt85FUxk2KVwYxHnotrxuEnp5evkkudc2s= +github.com/asticode/go-astikit v0.42.0 h1:pnir/2KLUSr0527Tv908iAH6EGYYrYta132vvjXsH5w= +github.com/asticode/go-astikit v0.42.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= +github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= +github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= +github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= +github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= +github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= +github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= +github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI= +github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90= +github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= +github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= +github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= +github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= +github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.35 h1:qwtKvNK1Wc5tHMIYgTDJhfZk7vATGVHhXbUDfHbYwzA= +github.com/pion/sctp v1.8.35/go.mod h1:EcXP8zCYVTRy3W9xtOF7wJm1L1aXfKRQzaM33SjQlzg= +github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= +github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= +github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= +github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw= +github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU= +github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0= +github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= +github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= +github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= +github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= +github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= +github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/internal/errors.go b/internal/errors.go new file mode 100644 index 0000000..f1f0a7a --- /dev/null +++ b/internal/errors.go @@ -0,0 +1,8 @@ +package internal + +import "errors" + +var ( + ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage") + ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation") +) diff --git a/internal/frame_pool.go b/internal/frame_pool.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/frame_pool.go @@ -0,0 +1 @@ +package internal diff --git a/internal/limit_buffer.go b/internal/limit_buffer.go new file mode 100644 index 0000000..2106e7e --- /dev/null +++ b/internal/limit_buffer.go @@ -0,0 +1,106 @@ +package internal + +import ( + "context" + "fmt" +) + +type limitBuffer[T any] struct { + pool Pool[T] + bufferChannel chan *T + inputBuffer chan *T + ctx context.Context +} + +func (buffer *limitBuffer[T]) Push(ctx context.Context, element *T) error { + select { + case buffer.inputBuffer <- element: + // WARN: LACKS CHECKS FOR CLOSED CHANNEL + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (buffer *limitBuffer[T]) Pop(ctx context.Context) (*T, error) { + select { + case <-ctx.Done(): + return nil, ctx.Err() + case data, ok := <-buffer.bufferChannel: + if !ok { + return nil, ErrorChannelBufferClose + } + if data == nil { + return nil, ErrorElementUnallocated + } + return data, nil + } +} + +func (buffer *limitBuffer[T]) Generate() *T { + return buffer.pool.Get() +} + +func (buffer *limitBuffer[T]) PutBack(element *T) { + if buffer.pool != nil { + buffer.pool.Put(element) + } +} + +func (buffer *limitBuffer[T]) GetChannel() chan *T { + return buffer.bufferChannel +} + +func (buffer *limitBuffer[T]) Size() int { + return len(buffer.bufferChannel) +} + +func (buffer *limitBuffer[T]) loop() { + defer buffer.close() +loop: + for { + select { + case <-buffer.ctx.Done(): + return + case element, ok := <-buffer.inputBuffer: + if !ok || element == nil { + continue loop + } + select { + case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED + continue loop + default: + select { + case oldElement := <-buffer.bufferChannel: + buffer.PutBack(oldElement) + select { + case buffer.bufferChannel <- element: + continue loop + default: + fmt.Println("unexpected buffer state. skipping the element..") + buffer.PutBack(element) + } + } + } + } + } +} + +func (buffer *limitBuffer[T]) close() { +loop: + for { + select { + case element := <-buffer.bufferChannel: + if buffer.pool != nil { + buffer.pool.Put(element) + } + default: + close(buffer.bufferChannel) + close(buffer.inputBuffer) + break loop + } + } + if buffer.pool != nil { + buffer.pool.Release() + } +} diff --git a/internal/package.go b/internal/package.go new file mode 100644 index 0000000..f2fb094 --- /dev/null +++ b/internal/package.go @@ -0,0 +1,82 @@ +package internal + +import ( + "context" + "sync" + + "github.com/asticode/go-astiav" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4/pkg/media" +) + +type Pool[T any] interface { + Get() *T + Put(*T) + Release() +} + +type Buffer[T any] interface { + Push(context.Context, *T) error + Pop(ctx context.Context) (*T, error) + Size() int +} + +type BufferWithGenerator[T any] interface { + Push(context.Context, *T) error + Pop(ctx context.Context) (*T, error) + Size() int + Generate() *T + PutBack(*T) + GetChannel() chan *T +} + +func CreateFramePool() Pool[astiav.Frame] { + return &framePool{ + pool: sync.Pool{ + New: func() any { + return astiav.AllocFrame() + }, + }, + } +} + +func CreateSamplePool() Pool[media.Sample] { + return &samplePool{ + pool: sync.Pool{ + New: func() any { + return &media.Sample{} + }, + }, + } +} + +func CreateRTPPool() Pool[rtp.Packet] { + return &rtpPool{ + pool: sync.Pool{ + New: func() any { + return &rtp.Packet{} + }, + }, + } +} + +func CreatePacketPool() Pool[astiav.Packet] { + return &packetPool{ + pool: sync.Pool{ + New: func() any { + return astiav.AllocPacket() + }, + }, + } +} + +func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) BufferWithGenerator[T] { + buffer := &limitBuffer[T]{ + pool: pool, + bufferChannel: make(chan *T, size), + inputBuffer: make(chan *T), + ctx: ctx, + } + go buffer.loop() + return buffer +} diff --git a/internal/packet_pool.go b/internal/packet_pool.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/packet_pool.go @@ -0,0 +1 @@ +package internal diff --git a/internal/rtp_pool.go b/internal/rtp_pool.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/rtp_pool.go @@ -0,0 +1 @@ +package internal diff --git a/internal/sample_pool.go b/internal/sample_pool.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/sample_pool.go @@ -0,0 +1 @@ +package internal diff --git a/internal/timer_buffer.go b/internal/timer_buffer.go new file mode 100644 index 0000000..5bf0569 --- /dev/null +++ b/internal/timer_buffer.go @@ -0,0 +1 @@ +package internal diff --git a/main.go b/main.go new file mode 100644 index 0000000..46a652c --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" +) + +// TIP

To run your code, right-click the code and select Run.

Alternatively, click +// the icon in the gutter and select the Run menu item from here.

+ +func main() { + // TIP

Press when your caret is at the underlined text + // to see how GoLand suggests fixing the warning.

Alternatively, if available, click the lightbulb to view possible fixes.

+ s := "gopher" + fmt.Printf("Hello and welcome, %s!\n", s) + + for i := 1; i <= 5; i++ { + // TIP

To start your debugging session, right-click your code in the editor and select the Debug option.

We have set one breakpoint + // for you, but you can always add more by pressing .

+ fmt.Println("i =", 100/i) + } +} diff --git a/pkg/constants.go b/pkg/constants.go new file mode 100644 index 0000000..9c47cb0 --- /dev/null +++ b/pkg/constants.go @@ -0,0 +1,13 @@ +package pkg + +import "github.com/asticode/go-astiav" + +const ( + DefaultVideoPayloadType = 96 + DefaultVideoFPS = int(25) + DefaultVideoClockRate = int(90000) + DefaultVideoHeight = int(1080) + DefaultVideoWidth = int(1920) + DefaultVideoPixFormat = astiav.PixelFormatYuv420P + DefaultVideoEncoderCodec = astiav.CodecIDH264 +) diff --git a/pkg/decoder.go b/pkg/decoder.go new file mode 100644 index 0000000..496e747 --- /dev/null +++ b/pkg/decoder.go @@ -0,0 +1,120 @@ +package pkg + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/asticode/go-astiav" + + "harshabose/transcode/v1/internal" +) + +type Decoder struct { + demuxer *Demuxer + decoderContext *astiav.CodecContext + codec *astiav.Codec + buffer internal.BufferWithGenerator[astiav.Frame] + ctx context.Context +} + +func CreateDecoder(ctx context.Context, demuxer *Demuxer, options ...DecoderOption) (*Decoder, error) { + decoder := &Decoder{ + demuxer: demuxer, + buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreateFramePool()), + ctx: ctx, + } + + var err error + + for _, option := range options { + if err = option(decoder); err != nil { + return nil, err + } + } + + if err = decoder.decoderContext.Open(decoder.codec, nil); err != nil { + return nil, err + } + + return decoder, nil +} + +func (decoder *Decoder) Start() { + go decoder.loop() +} + +func (decoder *Decoder) loop() { + var ( + packet *astiav.Packet + frame *astiav.Frame + err error + ) + + defer decoder.close() + +loop1: + for { + select { + case <-decoder.ctx.Done(): + return + case packet = <-decoder.demuxer.WaitForPacket(): + if err := decoder.decoderContext.SendPacket(packet); err != nil { + decoder.demuxer.PutBack(packet) + if !errors.Is(err, astiav.ErrEagain) { + continue loop1 + } + } + loop2: + for { + frame = decoder.buffer.Generate() + if err := decoder.decoderContext.ReceiveFrame(frame); err != nil { + decoder.buffer.PutBack(frame) + break loop2 + } + + frame.SetPictureType(astiav.PictureTypeNone) + + if err = decoder.pushFrame(frame); err != nil { + fmt.Println("warning: frame dropped!") + decoder.buffer.PutBack(frame) + continue loop2 + } + } + decoder.demuxer.PutBack(packet) + } + } +} + +func (decoder *Decoder) pushFrame(frame *astiav.Frame) error { + ctx, cancel := context.WithTimeout(decoder.ctx, time.Second/time.Duration(DefaultVideoFPS)) + defer cancel() + + return decoder.buffer.Push(ctx, frame) +} + +func (decoder *Decoder) GetFrame() (*astiav.Frame, error) { + ctx, cancel := context.WithTimeout(decoder.ctx, time.Second/time.Duration(DefaultVideoFPS)) + defer cancel() + + return decoder.buffer.Pop(ctx) +} + +func (decoder *Decoder) WaitForFrame() chan *astiav.Frame { + return decoder.buffer.GetChannel() +} + +func (decoder *Decoder) PutBack(frame *astiav.Frame) { + decoder.buffer.PutBack(frame) +} + +func (decoder *Decoder) GetSrcFilterContextOptions() func(filter types.BaseFilter) error { + return filt.VideoSetFilterContextParameters(decoder.decoderContext) +} + +func (decoder *Decoder) close() { + if decoder.decoderContext != nil { + decoder.decoderContext.Free() + } +} diff --git a/pkg/decoder_options.go b/pkg/decoder_options.go new file mode 100644 index 0000000..9161787 --- /dev/null +++ b/pkg/decoder_options.go @@ -0,0 +1,29 @@ +package pkg + +import "github.com/asticode/go-astiav" + +type DecoderOption = func(*Decoder) error + +func VideoSetDecoderContext(codecParameters *astiav.CodecParameters, videoStream *astiav.Stream, formatContext *astiav.FormatContext) func(*Decoder) error { + return func(decoder *Decoder) error { + var ( + err error + ) + + if decoder.codec = astiav.FindDecoder(codecParameters.CodecID()); decoder.codec == nil { + return ErrorNoCodecFound + } + + if decoder.decoderContext = astiav.AllocCodecContext(decoder.codec); decoder.decoderContext == nil { + return ErrorAllocateCodecContext + } + + if err = videoStream.CodecParameters().ToCodecContext(decoder.decoderContext); err != nil { + return ErrorFillCodecContext + } + + decoder.decoderContext.SetFramerate(formatContext.GuessFrameRate(videoStream, nil)) + decoder.decoderContext.SetTimeBase(videoStream.TimeBase()) + return nil + } +} diff --git a/pkg/demuxer.go b/pkg/demuxer.go new file mode 100644 index 0000000..ea747c1 --- /dev/null +++ b/pkg/demuxer.go @@ -0,0 +1,134 @@ +package pkg + +import ( + "context" + "time" + + "github.com/asticode/go-astiav" + + "harshabose/transcode/v1/internal" +) + +type Demuxer struct { + formatContext *astiav.FormatContext + inputOptions *astiav.Dictionary + stream *astiav.Stream + codecParameters *astiav.CodecParameters + buffer internal.BufferWithGenerator[astiav.Packet] + ctx context.Context +} + +func CreateDemuxer(ctx context.Context, containerAddress string, options ...DemuxerOption) (*Demuxer, error) { + astiav.RegisterAllDevices() + demuxer := &Demuxer{ + formatContext: astiav.AllocFormatContext(), + inputOptions: astiav.NewDictionary(), + buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreatePacketPool()), + ctx: ctx, + } + + if demuxer.formatContext == nil { + return nil, ErrorAllocateFormatContext + } + + for _, option := range options { + if err := option(demuxer); err != nil { + return nil, err + } + } + + if err := demuxer.formatContext.OpenInput(containerAddress, nil, nil); err != nil { + return nil, ErrorOpenInputContainer + } + + if err := demuxer.formatContext.FindStreamInfo(nil); err != nil { + return nil, ErrorNoStreamFound + } + + for _, stream := range demuxer.formatContext.Streams() { + if stream.CodecParameters().MediaType() == astiav.MediaTypeVideo { + demuxer.stream = stream + break + } + } + + if demuxer.stream == nil { + return nil, ErrorNoVideoStreamFound + } + demuxer.codecParameters = demuxer.stream.CodecParameters() + + return demuxer, nil +} + +func (demuxer *Demuxer) Start() { + go demuxer.loop() +} + +func (demuxer *Demuxer) loop() { + defer demuxer.close() + + ticker := time.NewTicker(time.Millisecond) + defer ticker.Stop() + +loop1: + for { + select { + case <-demuxer.ctx.Done(): + return + case <-ticker.C: + loop2: + for { + packet := demuxer.buffer.Generate() + + if err := demuxer.formatContext.ReadFrame(packet); err != nil { + demuxer.buffer.PutBack(packet) + continue loop1 + } + + if packet.StreamIndex() != demuxer.stream.Index() { + demuxer.buffer.PutBack(packet) + continue loop2 + } + + if err := demuxer.pushPacket(packet); err != nil { + demuxer.buffer.PutBack(packet) + continue loop1 + } + break loop2 + } + } + } +} + +func (demuxer *Demuxer) pushPacket(packet *astiav.Packet) error { + ctx, cancel := context.WithTimeout(demuxer.ctx, time.Second/time.Duration(DefaultVideoFPS)) // TODO: NEEDS TO BE BASED ON FPS ON INPUT_FORMAT + defer cancel() + + return demuxer.buffer.Push(ctx, packet) +} + +func (demuxer *Demuxer) WaitForPacket() chan *astiav.Packet { + return demuxer.buffer.GetChannel() +} + +func (demuxer *Demuxer) GetPacket() (*astiav.Packet, error) { + ctx, cancel := context.WithTimeout(demuxer.ctx, time.Second/time.Duration(DefaultVideoFPS)) + defer cancel() + + return demuxer.buffer.Pop(ctx) +} + +func (demuxer *Demuxer) GetDecoderContextOptions() func(*Decoder) error { + return VideoSetDecoderContext(demuxer.codecParameters, demuxer.stream, demuxer.formatContext) +} + +func (demuxer *Demuxer) PutBack(packet *astiav.Packet) { + demuxer.buffer.PutBack(packet) +} + +func (demuxer *Demuxer) close() { + if demuxer.formatContext != nil { + demuxer.formatContext.CloseInput() + demuxer.formatContext.Free() + } +} diff --git a/pkg/demuxer_options.go b/pkg/demuxer_options.go new file mode 100644 index 0000000..f69dca6 --- /dev/null +++ b/pkg/demuxer_options.go @@ -0,0 +1,25 @@ +package pkg + +type DemuxerOption = func(*Demuxer) error + +func WithRTSPInputOption(demuxer *Demuxer) error { + var err error = nil + + if err = demuxer.inputOptions.Set("rtsp_transport", "tcp", 0); err != nil { + return err + } + if err = demuxer.inputOptions.Set("stimeout", "5000000", 0); err != nil { + return err + } + if err = demuxer.inputOptions.Set("fflags", "nobuffer", 0); err != nil { + return err + } + if err = demuxer.inputOptions.Set("flags", "low_delay", 0); err != nil { + return err + } + if err = demuxer.inputOptions.Set("reorder_queue_size", "0", 0); err != nil { + return err + } + + return nil +} diff --git a/pkg/encoder.go b/pkg/encoder.go new file mode 100644 index 0000000..d7a5ca4 --- /dev/null +++ b/pkg/encoder.go @@ -0,0 +1,168 @@ +package pkg + +import ( + "context" + "errors" + "time" + + "github.com/asticode/go-astiav" + + "harshabose/transcode/v1/internal" +) + +type Encoder struct { + buffer internal.BufferWithGenerator[astiav.Packet] + filter *Filter + ctx context.Context + codec *astiav.Codec + encoderContext *astiav.CodecContext + h264options *astiav.Dictionary + encoderSettings encoderCodecSetting + sps []byte + pps []byte +} + +func CreateEncoder(ctx context.Context, codecID astiav.CodecID, filter *Filter, options ...EncoderOption) (*Encoder, error) { + encoder := &Encoder{ + buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreatePacketPool()), + filter: filter, + h264options: astiav.NewDictionary(), + encoderSettings: EncoderCodecNoSetting, + ctx: ctx, + } + + encoder.codec = astiav.FindEncoder(codecID) + if encoder.encoderContext = astiav.AllocCodecContext(encoder.codec); encoder.encoderContext == nil { + return nil, ErrorAllocateCodecContext + } + + for _, option := range options { + if err := option(encoder); err != nil { + return nil, err + } + } + + if encoder.encoderSettings == EncoderCodecNoSetting { + return nil, ErrorCodecNoSetting + } + + encoder.encoderContext.SetFlags(astiav.NewCodecContextFlags(astiav.CodecContextFlagGlobalHeader)) + + if err := encoder.encoderContext.Open(encoder.codec, encoder.h264options); err != nil { + return nil, err + } + + encoder.findParameterSets(encoder.encoderContext.ExtraData()) + + return encoder, nil +} + +func (encoder *Encoder) Start() { + go encoder.loop() +} + +func (encoder *Encoder) loop() { + var ( + frame *astiav.Frame + packet *astiav.Packet + err error + ) + defer encoder.close() + +loop1: + for { + select { + case <-encoder.ctx.Done(): + return + case frame = <-encoder.filter.WaitForFrame(): + if err = encoder.encoderContext.SendFrame(frame); err != nil { + encoder.filter.PutBack(frame) + if !errors.Is(err, astiav.ErrEagain) { + continue loop1 + } + } + loop2: + for { + packet = encoder.buffer.Generate() + if err = encoder.encoderContext.ReceivePacket(packet); err != nil { + encoder.buffer.PutBack(packet) + break loop2 + } + + if err = encoder.pushPacket(packet); err != nil { + encoder.buffer.PutBack(packet) + continue loop2 + } + } + encoder.filter.PutBack(frame) + } + } +} + +func (encoder *Encoder) WaitForPacket() chan *astiav.Packet { + return encoder.buffer.GetChannel() +} + +func (encoder *Encoder) pushPacket(packet *astiav.Packet) error { + ctx, cancel := context.WithTimeout(encoder.ctx, time.Second) + defer cancel() + + return encoder.buffer.Push(ctx, packet) +} + +func (encoder *Encoder) GetPacket() (*astiav.Packet, error) { + ctx, cancel := context.WithTimeout(encoder.ctx, time.Second) + defer cancel() + + return encoder.buffer.Pop(ctx) +} + +func (encoder *Encoder) PutBack(packet *astiav.Packet) { + encoder.buffer.PutBack(packet) +} + +func (encoder *Encoder) findParameterSets(extraData []byte) { + if len(extraData) > 0 { + // Find first start code (0x00000001) + for i := 0; i < len(extraData)-4; i++ { + if extraData[i] == 0 && extraData[i+1] == 0 && extraData[i+2] == 0 && extraData[i+3] == 1 { + // Skip start code to get NAL type + nalType := extraData[i+4] & 0x1F + + // Find next start code or end + nextStart := len(extraData) + for j := i + 4; j < len(extraData)-4; j++ { + if extraData[j] == 0 && extraData[j+1] == 0 && extraData[j+2] == 0 && extraData[j+3] == 1 { + nextStart = j + break + } + } + + if nalType == 7 { // SPS + encoder.sps = make([]byte, nextStart-i) + copy(encoder.sps, extraData[i:nextStart]) + } else if nalType == 8 { // PPS + encoder.pps = make([]byte, len(extraData)-i) + copy(encoder.pps, extraData[i:]) + } + + i = nextStart - 1 + } + } + } + +} + +func (encoder *Encoder) GetSPS() []byte { + return encoder.sps +} + +func (encoder *Encoder) GetPPS() []byte { + return encoder.pps +} + +func (encoder *Encoder) close() { + if encoder.encoderContext != nil { + encoder.encoderContext.Free() + } +} diff --git a/pkg/encoder_options.go b/pkg/encoder_options.go new file mode 100644 index 0000000..fcb321e --- /dev/null +++ b/pkg/encoder_options.go @@ -0,0 +1,14 @@ +package pkg + +type ( + encoderCodecSetting string + EncoderOption = func(*Encoder) error +) + +const ( + EncoderCodecNoSetting encoderCodecSetting = "None" + EncoderCodecDefaultSetting encoderCodecSetting = "default" + EncoderCodecHighQualitySetting encoderCodecSetting = "high-quality" + EncoderCodecLowLatencySetting encoderCodecSetting = "low-latency" + EncoderCodecLowBandwidthSetting encoderCodecSetting = "low-bandwidth" +) diff --git a/pkg/errors.go b/pkg/errors.go new file mode 100644 index 0000000..07535de --- /dev/null +++ b/pkg/errors.go @@ -0,0 +1,25 @@ +package pkg + +import "errors" + +var ( + ErrorAllocateFormatContext = errors.New("error allocate format context") + ErrorOpenInputContainer = errors.New("error opening container") + ErrorNoStreamFound = errors.New("error no stream found") + ErrorNoVideoStreamFound = errors.New("no video stream found") + + ErrorNoCodecFound = errors.New("error no codec found") + ErrorAllocateCodecContext = errors.New("error allocating codec context") + ErrorFillCodecContext = errors.New("error filling the codec context") + + ErrorNoFilterName = errors.New("error filter name does not exists") + WarnNoFilterContent = errors.New("content is empty. no filtering will be done") + ErrorGraphParse = errors.New("error parsing the filter graph") + ErrorGraphConfigure = errors.New("error configuring the filter graph") + ErrorSrcContextSetParameter = errors.New("error while setting parameters to source context") + ErrorSrcContextInitialise = errors.New("error initialising the source context") + ErrorAllocSrcContext = errors.New("error setting source context") + ErrorAllocSinkContext = errors.New("error setting sink context") + + ErrorCodecNoSetting = errors.New("error no settings given") +) diff --git a/pkg/filter.go b/pkg/filter.go new file mode 100644 index 0000000..7d38aef --- /dev/null +++ b/pkg/filter.go @@ -0,0 +1,175 @@ +package pkg + +import ( + "context" + "fmt" + "time" + + "github.com/asticode/go-astiav" + + "harshabose/transcode/v1/internal" +) + +type Filter struct { + content string + decoder *Decoder + buffer internal.BufferWithGenerator[astiav.Frame] + graph *astiav.FilterGraph + input *astiav.FilterInOut + output *astiav.FilterInOut + srcContext *astiav.BuffersrcFilterContext + sinkContext *astiav.BuffersinkFilterContext + srcContextParams *astiav.BuffersrcFilterContextParameters // NOTE: THIS BECOMES NIL AFTER INITIALISATION + ctx context.Context +} + +func CreateFilter(ctx context.Context, decoder *Decoder, filterConfig *Config, options ...FilterOption) (*Filter, error) { + var ( + filter *Filter + filterSrc *astiav.Filter + filterSink *astiav.Filter + err error + ) + filter = &Filter{ + graph: astiav.AllocFilterGraph(), + decoder: decoder, + buffer: internal.CreateChannelBuffer(ctx, DefaultVideoFPS*3, internal.CreateFramePool()), + input: astiav.AllocFilterInOut(), + output: astiav.AllocFilterInOut(), + srcContextParams: astiav.AllocBuffersrcFilterContextParameters(), + ctx: ctx, + } + + // TODO: CHECK IF ALL ATTRIBUTES ARE ALLOCATED PROPERLY + + if filterSrc = astiav.FindFilterByName(filterConfig.Source.String()); filterSrc == nil { + return nil, ErrorNoFilterName + } + if filterSink = astiav.FindFilterByName(filterConfig.Sink.String()); filterSink == nil { + return nil, ErrorNoFilterName + } + + if filter.srcContext, err = filter.graph.NewBuffersrcFilterContext(filterSrc, "in"); err != nil { + return nil, ErrorAllocSrcContext + } + + if filter.sinkContext, err = filter.graph.NewBuffersinkFilterContext(filterSink, "out"); err != nil { + return nil, ErrorAllocSinkContext + } + + for _, option := range options { + if err = option(filter); err != nil { + // TODO: SET CONTENT HERE + return nil, err + } + } + + if err = filter.srcContext.SetParameters(filter.srcContextParams); err != nil { + return nil, ErrorSrcContextSetParameter + } + + if err = filter.srcContext.Initialize(nil); err != nil { + return nil, ErrorSrcContextInitialise + } + + filter.output.SetName("in") + filter.output.SetFilterContext(filter.srcContext.FilterContext()) + filter.output.SetPadIdx(0) + filter.output.SetNext(nil) + + filter.input.SetName("out") + filter.input.SetFilterContext(filter.sinkContext.FilterContext()) + filter.input.SetPadIdx(0) + filter.input.SetNext(nil) + + if filter.content == "" { + fmt.Println(WarnNoFilterContent) + } + + if err = filter.graph.Parse(filter.content, filter.input, filter.output); err != nil { + return nil, ErrorGraphParse + } + + if err = filter.graph.Configure(); err != nil { + return nil, ErrorGraphConfigure + } + + if filter.srcContextParams != nil { + filter.srcContextParams.Free() + } + return filter, nil +} + +func (filter *Filter) Start() { + go filter.loop() +} + +func (filter *Filter) loop() { + var ( + err error = nil + srcFrame *astiav.Frame + sinkFrame *astiav.Frame + ) + defer filter.close() + +loop1: + for { + select { + case <-filter.ctx.Done(): + return + case srcFrame = <-filter.decoder.WaitForFrame(): + if err = filter.srcContext.AddFrame(srcFrame, astiav.NewBuffersrcFlags(astiav.BuffersrcFlagKeepRef)); err != nil { + filter.buffer.PutBack(srcFrame) + continue loop1 + } + loop2: + for { + sinkFrame = filter.buffer.Generate() + if err = filter.sinkContext.GetFrame(sinkFrame, astiav.NewBuffersinkFlags()); err != nil { + filter.buffer.PutBack(sinkFrame) + break loop2 + } + + if err = filter.pushFrame(sinkFrame); err != nil { + filter.buffer.PutBack(sinkFrame) + continue loop2 + } + } + filter.decoder.PutBack(srcFrame) + } + } +} + +func (filter *Filter) pushFrame(frame *astiav.Frame) error { + ctx, cancel := context.WithTimeout(filter.ctx, time.Second) + defer cancel() + + return filter.buffer.Push(ctx, frame) +} + +func (filter *Filter) GetFrame() (*astiav.Frame, error) { + ctx, cancel := context.WithTimeout(filter.ctx, time.Second) + defer cancel() + + return filter.buffer.Pop(ctx) +} + +func (filter *Filter) PutBack(frame *astiav.Frame) { + filter.buffer.PutBack(frame) +} + +func (filter *Filter) WaitForFrame() chan *astiav.Frame { + return filter.buffer.GetChannel() +} + +func (filter *Filter) close() { + if filter.graph != nil { + filter.graph.Free() + } + if filter.input != nil { + filter.input.Free() + } + if filter.output != nil { + filter.output.Free() + } +} diff --git a/pkg/filter_options.go b/pkg/filter_options.go new file mode 100644 index 0000000..60c5080 --- /dev/null +++ b/pkg/filter_options.go @@ -0,0 +1,73 @@ +package pkg + +import ( + "fmt" + + "github.com/asticode/go-astiav" +) + +type ( + FilterOption func(*Filter) error + Name string +) + +func (f Name) String() string { + return string(f) +} + +type Config struct { + Source Name + Sink Name +} + +const ( + videoBufferFilterName Name = "buffer" + videoBufferSinkFilterName Name = "buffersink" +) + +var ( + VideoFilters = &Config{ + Source: videoBufferFilterName, + Sink: videoBufferSinkFilterName, + } +) + +func VideoSetFilterContextParameters(codecContext *astiav.CodecContext) func(*Filter) error { + return func(filter *Filter) error { + filter.srcContextParams.SetHeight(codecContext.Height()) + filter.srcContextParams.SetPixelFormat(codecContext.PixelFormat()) + filter.srcContextParams.SetSampleAspectRatio(codecContext.SampleAspectRatio()) + filter.srcContextParams.SetTimeBase(codecContext.TimeBase()) + filter.srcContextParams.SetWidth(codecContext.Width()) + return nil + } +} + +func WithDefaultVideoFilterContentOptions(filter *Filter) error { + if err := videoScaleFilterContent(filter); err != nil { + return err + } + if err := videoPixelFormatFilterContent(filter); err != nil { + return err + } + if err := videoFPSFilterContent(filter); err != nil { + return err + } + + return nil +} + +func videoScaleFilterContent(filter *Filter) error { + filter.content += fmt.Sprintf("scale=%d:%d,", DefaultVideoWidth, DefaultVideoHeight) + return nil +} + +func videoPixelFormatFilterContent(filter *Filter) error { + filter.content += fmt.Sprintf("format=pix_fmts=%s,", DefaultVideoPixFormat) + return nil +} + +func videoFPSFilterContent(filter *Filter) error { + filter.content += fmt.Sprintf("fps=%d,", DefaultVideoFPS) + return nil +}