diff --git a/.gitignore b/.gitignore index 7a288b3..16ff45b 100644 --- a/.gitignore +++ b/.gitignore @@ -19,6 +19,7 @@ vendor/ # Build directories build/ +build_bak/ third_party/ # Environment files diff --git a/.gitmodules b/.gitmodules index 44a6ddc..f033f10 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,9 @@ [submodule "dependencies/mediapipe"] path = dependencies/mediapipe url = https://github.com/harshabose/mediapipe.git +[submodule "dependencies/tools/multierr"] + path = dependencies/tools/multierr + url = https://github.com/harshabose/multierr.git +[submodule "dependencies/services"] + path = dependencies/services + url = https://github.com/harshabose/services.git diff --git a/Dockerfile b/Dockerfile index 562d3dc..bb811f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -29,7 +29,7 @@ RUN make install-mavp2p # Will create complete.env file RUN make create-env-file -RUN bash -c 'make build-delivery-gcs' +RUN bash -c 'make build-fpv-gcs' # Run main application CMD bash -c 'make run-delivery-gcs' diff --git a/cmd/delivery/constants.go b/cmd/fpv/constants.go similarity index 96% rename from cmd/delivery/constants.go rename to cmd/fpv/constants.go index 8d6ad18..150d6e7 100644 --- a/cmd/delivery/constants.go +++ b/cmd/fpv/constants.go @@ -1,4 +1,4 @@ -package delivery +package fpv const ( DefaultVideoClockRate uint32 = 90000 diff --git a/cmd/delivery/constants_cgo.go b/cmd/fpv/constants_cgo.go similarity index 87% rename from cmd/delivery/constants_cgo.go rename to cmd/fpv/constants_cgo.go index c09878a..9bf8eb9 100644 --- a/cmd/delivery/constants_cgo.go +++ b/cmd/fpv/constants_cgo.go @@ -1,6 +1,6 @@ //go:build cgo_enabled -package delivery +package fpv import "github.com/asticode/go-astiav" diff --git a/cmd/delivery/drone/main.go b/cmd/fpv/drone/main.go similarity index 67% rename from cmd/delivery/drone/main.go rename to cmd/fpv/drone/main.go index df77364..bb27a6f 100644 --- a/cmd/delivery/drone/main.go +++ b/cmd/fpv/drone/main.go @@ -8,19 +8,20 @@ import ( "time" "github.com/asticode/go-astiav" - "github.com/harshabose/mediapipe" - "github.com/harshabose/mediapipe/pkg/dioreader" - "github.com/harshabose/mediapipe/pkg/diowriter" - "github.com/harshabose/mediapipe/pkg/loopback" "github.com/pion/interceptor" "github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4/pkg/media" - "github.com/harshabose/simple_webrtc_comm/client/pkg" + "github.com/harshabose/mediapipe" + "github.com/harshabose/mediapipe/pkg/consumers" + "github.com/harshabose/mediapipe/pkg/duplexers" + "github.com/harshabose/mediapipe/pkg/generators" + "github.com/harshabose/tools/pkg/buffer" + + "github.com/harshabose/simple_webrtc_comm/client" "github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource" "github.com/harshabose/simple_webrtc_comm/client/pkg/transcode" - "github.com/harshabose/simple_webrtc_comm/cmd/delivery" - "github.com/harshabose/tools/buffer/pkg" + "github.com/harshabose/simple_webrtc_comm/cmd/fpv" ) func main() { @@ -30,7 +31,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l, err := loopback.NewLoopBack(context.Background(), "127.0.0.1:14559") + l, err := duplexers.NewLoopBack(context.Background(), "127.0.0.1:14559") if err != nil { panic(err) } @@ -42,29 +43,29 @@ func main() { transcode.WithGeneralDemuxer(ctx, "0", transcode.WithAvFoundationInputFormatOption, - transcode.WithDemuxerBuffer(int(delivery.DefaultVideoFPS), pPool), + transcode.WithDemuxerBuffer(int(fpv.DefaultVideoFPS), pPool), ), transcode.WithGeneralDecoder(ctx, - transcode.WithDecoderBuffer(int(delivery.DefaultVideoFPS), fPool), + transcode.WithDecoderBuffer(int(fpv.DefaultVideoFPS), fPool), ), transcode.WithGeneralFilter(ctx, transcode.VideoFilters, - transcode.WithFilterBuffer(int(delivery.DefaultVideoFPS), fPool), - transcode.WithVideoScaleFilterContent(delivery.DefaultVideoWidth, delivery.DefaultVideoHeight), - transcode.WithVideoPixelFormatFilterContent(delivery.DefaultPixelFormat), - transcode.WithVideoFPSFilterContent(delivery.DefaultVideoFPS), + transcode.WithFilterBuffer(int(fpv.DefaultVideoFPS), fPool), + transcode.WithVideoScaleFilterContent(fpv.DefaultVideoWidth, fpv.DefaultVideoHeight), + transcode.WithVideoPixelFormatFilterContent(fpv.DefaultPixelFormat), + transcode.WithVideoFPSFilterContent(fpv.DefaultVideoFPS), ), transcode.WithGeneralEncoder( ctx, astiav.CodecIDH264, transcode.WithCodecSettings(transcode.LowLatencyBitrateControlled), - transcode.WithEncoderBufferSize(int(delivery.DefaultVideoFPS), pPool), + transcode.WithEncoderBufferSize(int(fpv.DefaultVideoFPS), pPool), ), // transcode.WithMultiEncoderBitrateControl(ctx, // astiav.CodecIDH264, - // transcode.NewMultiConfig(delivery.MinimumBitrate, delivery.MaximumBitrate, 10), + // transcode.NewMultiConfig(fpv.MinimumBitrate, fpv.MaximumBitrate, 10), // transcode.LowLatencyBitrateControlled, - // int(delivery.DefaultVideoFPS), buffer.CreatePacketPool(), + // int(fpv.DefaultVideoFPS), buffer.CreatePacketPool(), // ), ) if err != nil { @@ -77,8 +78,8 @@ func main() { drone, err := client.CreateClient( ctx, cancel, mediaEngine, registry, settings, - client.WithH264MediaEngine(delivery.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, delivery.DefaultSPSBase64, delivery.DefaultPPSBase64), - client.WithBandwidthControlInterceptor(delivery.InitialBitrate, delivery.MinimumBitrate, delivery.MaximumBitrate, time.Second), + client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), + client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second), client.WithTWCCHeaderExtensionSender(), client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency), client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency), @@ -107,7 +108,7 @@ func main() { } track, err := pc.CreateMediaSource("A8-MINI", - mediasource.WithH264Track(delivery.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31), + mediasource.WithH264Track(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31), mediasource.WithPriority(mediasource.Level5), ) if err != nil { @@ -132,11 +133,11 @@ func main() { rl := mediapipe.NewIdentityAnyReader[[]byte](l) wl := mediapipe.NewIdentityAnyWriter[[]byte](l) - ird, err := dioreader.NewDataChannel(datachannel.DataChannel(), math.MaxUint16) + ird, err := generators.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16) if err != nil { panic(err) } - iwd, err := diowriter.NewDataChannel(datachannel.DataChannel(), math.MaxUint16) + iwd, err := consumers.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16) if err != nil { panic(err) } @@ -149,14 +150,14 @@ func main() { s := media.Sample{ Data: make([]byte, packet.Size()), Timestamp: time.Now(), - Duration: time.Second / time.Duration(delivery.DefaultVideoFPS), + Duration: time.Second / time.Duration(fpv.DefaultVideoFPS), PacketTimestamp: uint32(packet.Pts()), PrevDroppedPackets: 0, Metadata: nil, RTPHeaders: nil, } copy(s.Data, packet.Data()) - transcoder.PutBack(packet) + pPool.Put(packet) return s, nil }) diff --git a/cmd/delivery/drone/skyline_sonata.delivery.drone.service b/cmd/fpv/drone/skyline_sonata.fpv.drone.service similarity index 100% rename from cmd/delivery/drone/skyline_sonata.delivery.drone.service rename to cmd/fpv/drone/skyline_sonata.fpv.drone.service diff --git a/cmd/delivery/gcs/main.go b/cmd/fpv/gcs/main.go similarity index 74% rename from cmd/delivery/gcs/main.go rename to cmd/fpv/gcs/main.go index 1691e6c..ee7df61 100644 --- a/cmd/delivery/gcs/main.go +++ b/cmd/fpv/gcs/main.go @@ -5,17 +5,17 @@ import ( "math" "time" - "github.com/harshabose/mediapipe" - "github.com/harshabose/mediapipe/pkg/dioreader" - "github.com/harshabose/mediapipe/pkg/diowriter" - "github.com/harshabose/mediapipe/pkg/loopback" - "github.com/harshabose/mediapipe/pkg/rtsp" + "github.com/harshabose/services/pkg/rtsp" "github.com/pion/interceptor" "github.com/pion/webrtc/v4" - "github.com/harshabose/simple_webrtc_comm/client/pkg" + "github.com/harshabose/mediapipe" + "github.com/harshabose/mediapipe/pkg/consumers" + "github.com/harshabose/mediapipe/pkg/duplexers" + "github.com/harshabose/mediapipe/pkg/generators" + "github.com/harshabose/simple_webrtc_comm/client" "github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink" - "github.com/harshabose/simple_webrtc_comm/cmd/delivery" + "github.com/harshabose/simple_webrtc_comm/cmd/fpv" ) func main() { @@ -25,7 +25,7 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - l, err := loopback.NewLoopBack(context.Background(), "127.0.0.1:0", loopback.WithLoopBackPort(14550)) + l, err := duplexers.NewLoopBack(context.Background(), "127.0.0.1:0", duplexers.WithLoopBackPort(14550)) if err != nil { panic(err) } @@ -36,7 +36,7 @@ func main() { gcs, err := client.CreateClient( ctx, cancel, mediaEngine, registry, settings, - client.WithH264MediaEngine(delivery.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, delivery.DefaultSPSBase64, delivery.DefaultPPSBase64), + client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), client.WithTWCCHeaderExtensionSender(), client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency), client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency), @@ -81,16 +81,16 @@ func main() { panic(err) } - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) rl := mediapipe.NewIdentityAnyReader[[]byte](l) wl := mediapipe.NewIdentityAnyWriter[[]byte](l) - ird, err := dioreader.NewDataChannel(datachannel.DataChannel(), math.MaxUint16) + ird, err := generators.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16) if err != nil { panic(err) } - iwd, err := diowriter.NewDataChannel(datachannel.DataChannel(), math.MaxUint16) + iwd, err := consumers.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16) if err != nil { panic(err) } diff --git a/cmd/delivery/gcs/skyline_sonata.delivery.gcs.service b/cmd/fpv/gcs/skyline_sonata.fpv.gcs.service similarity index 100% rename from cmd/delivery/gcs/skyline_sonata.delivery.gcs.service rename to cmd/fpv/gcs/skyline_sonata.fpv.gcs.service diff --git a/cmd/sample/client/main.go b/cmd/sample/client/main.go deleted file mode 100644 index f436cfe..0000000 --- a/cmd/sample/client/main.go +++ /dev/null @@ -1,8 +0,0 @@ -package main - -import "fmt" - -func main() { - // Here use the simple_skyline_sonata SDK - fmt.Println("Simple Skyline Sonata says Hi! 👋 ") -} diff --git a/cmd/sample/client/sample.client.service b/cmd/sample/client/sample.client.service deleted file mode 100644 index b4c2c63..0000000 --- a/cmd/sample/client/sample.client.service +++ /dev/null @@ -1,15 +0,0 @@ -[Unit] -Description=Skyline Sonata Sample Client Service -After=network.target - -[Service] -Type=simple -User=drone -WorkingDirectory=/path/to/build/directory/sample/client -ExecStart=/path/to/build/directory/sample/client/skyline_sonata.sample.client -Environment=/etc/skyline-sonata/sample/complete.env -Restart=on-failure -RestartSec=5s - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/dependencies/client b/dependencies/client index 195bf1a..1ec0dc6 160000 --- a/dependencies/client +++ b/dependencies/client @@ -1 +1 @@ -Subproject commit 195bf1a8308231a19b6195538857bf94efa7397c +Subproject commit 1ec0dc6cc62baa6850166205101cd1b1119990a9 diff --git a/dependencies/mediapipe b/dependencies/mediapipe index 8526825..45b24d4 160000 --- a/dependencies/mediapipe +++ b/dependencies/mediapipe @@ -1 +1 @@ -Subproject commit 8526825b2ca29bc80156cb9dc69bae26e02b64c9 +Subproject commit 45b24d44568bf78a60c27252b8819941476ae4f1 diff --git a/dependencies/services b/dependencies/services new file mode 160000 index 0000000..3fde5ed --- /dev/null +++ b/dependencies/services @@ -0,0 +1 @@ +Subproject commit 3fde5ed19ed95aded15cdec60b8105513cfe615a diff --git a/dependencies/tools/.gitignore b/dependencies/tools/.gitignore new file mode 100644 index 0000000..7a288b3 --- /dev/null +++ b/dependencies/tools/.gitignore @@ -0,0 +1,50 @@ +# Binaries for programs and plugins +*.exe +*.exe~ +*.dll +*.so +*.dylib + +# Test binary, built with `go test -c` +*.test + +# Output of the go coverage tool +*.out + +# Go workspace file +go.work + +# Dependency directories +vendor/ + +# Build directories +build/ +third_party/ + +# Environment files +*.env +!.env.example + +# macOS +.DS_Store +.DS_Store? +._* +.Spotlight-V100 +.Trashes + +# Windows +ehthumbs.db +Thumbs.db + +# Linux +*~ + +# IDE files +.vscode/ +.idea/ +*.swp +*.swo + +# Logs +*.log +logs/ \ No newline at end of file diff --git a/dependencies/tools/buffer b/dependencies/tools/buffer deleted file mode 160000 index ddd94b7..0000000 --- a/dependencies/tools/buffer +++ /dev/null @@ -1 +0,0 @@ -Subproject commit ddd94b7d330dea91b144b90221644d7ae30afb22 diff --git a/dependencies/tools/go.mod b/dependencies/tools/go.mod new file mode 100644 index 0000000..2d8e23a --- /dev/null +++ b/dependencies/tools/go.mod @@ -0,0 +1,7 @@ +module github.com/harshabose/tools + +go 1.24.1 + +require github.com/asticode/go-astiav v0.37.0 + +require github.com/asticode/go-astikit v0.42.0 // indirect diff --git a/dependencies/tools/go.sum b/dependencies/tools/go.sum new file mode 100644 index 0000000..cc69ada --- /dev/null +++ b/dependencies/tools/go.sum @@ -0,0 +1,12 @@ +github.com/asticode/go-astiav v0.37.0 h1:Ph4usW4lulotVvne8hqZ1JCOHX1f8ces6yVKdg+PnyQ= +github.com/asticode/go-astiav v0.37.0/go.mod h1:GI0pHw6K2/pl/o8upCtT49P/q4KCwhv/8nGLlCsZLdA= +github.com/asticode/go-astikit v0.42.0 h1:pnir/2KLUSr0527Tv908iAH6EGYYrYta132vvjXsH5w= +github.com/asticode/go-astikit v0.42.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/dependencies/tools/pkg/buffer/errors.go b/dependencies/tools/pkg/buffer/errors.go new file mode 100644 index 0000000..74507d5 --- /dev/null +++ b/dependencies/tools/pkg/buffer/errors.go @@ -0,0 +1,8 @@ +package buffer + +import "errors" + +var ( + ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage") + ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation") +) diff --git a/dependencies/tools/pkg/buffer/frame_pool.go b/dependencies/tools/pkg/buffer/frame_pool.go new file mode 100644 index 0000000..c1f6b4c --- /dev/null +++ b/dependencies/tools/pkg/buffer/frame_pool.go @@ -0,0 +1,55 @@ +//go:build cgo_enabled + +package buffer + +import ( + "sync" + + "github.com/asticode/go-astiav" +) + +type framePool struct { + pool sync.Pool +} + +func CreateFramePool() Pool[*astiav.Frame] { + return &framePool{ + pool: sync.Pool{ + New: func() any { + return astiav.AllocFrame() + }, + }, + } +} + +func (pool *framePool) Get() *astiav.Frame { + frame, ok := pool.pool.Get().(*astiav.Frame) + + if frame == nil || !ok { + return astiav.AllocFrame() + } + return frame +} + +func (pool *framePool) Put(frame *astiav.Frame) { + if frame == nil { + return + } + + frame.Unref() + pool.pool.Put(frame) +} + +func (pool *framePool) Release() { + for { + frame, ok := pool.pool.Get().(*astiav.Frame) + if frame == nil { + break + } + if !ok { + continue + } + + frame.Free() + } +} diff --git a/dependencies/tools/pkg/buffer/limit_buffer.go b/dependencies/tools/pkg/buffer/limit_buffer.go new file mode 100644 index 0000000..24f59c8 --- /dev/null +++ b/dependencies/tools/pkg/buffer/limit_buffer.go @@ -0,0 +1,144 @@ +package buffer + +import ( + "context" + "errors" + "fmt" + "sync" +) + +type ChannelBuffer[T any] struct { + pool Pool[T] + bufferChannel chan T + inputBuffer chan T + closed bool + mux sync.RWMutex + ctx context.Context +} + +func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] { + buffer := &ChannelBuffer[T]{ + pool: pool, + bufferChannel: make(chan T, size), + inputBuffer: make(chan T, size), + closed: false, + ctx: ctx, + } + go buffer.loop() + return buffer +} + +func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element T) error { + buffer.mux.RLock() + defer buffer.mux.RUnlock() + + if buffer.closed { + return errors.New("buffer closed") + } + select { + case buffer.inputBuffer <- element: + // WARN: LACKS CHECKS FOR CLOSED CHANNEL + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (T, error) { + buffer.mux.RLock() + defer buffer.mux.RUnlock() + + var zero T + + if buffer.closed { + return zero, errors.New("buffer closed") + } + select { + case <-ctx.Done(): + return zero, ctx.Err() + case data, ok := <-buffer.bufferChannel: + if !ok { + return zero, ErrorChannelBufferClose + } + // TODO: NIL CHECK IS REQUIRED BUT GENERIC CANNOT DO THIS. SOLVE THIS ASAP + // if data == nil { + // return zero, ErrorElementUnallocated + // } + return data, nil + } +} + +func (buffer *ChannelBuffer[T]) Generate() T { + return buffer.pool.Get() +} + +func (buffer *ChannelBuffer[T]) PutBack(element T) { + if buffer.pool != nil { + buffer.pool.Put(element) + } +} + +func (buffer *ChannelBuffer[T]) GetChannel() chan T { + return buffer.bufferChannel +} + +func (buffer *ChannelBuffer[T]) Size() int { + return len(buffer.bufferChannel) +} + +func (buffer *ChannelBuffer[T]) loop() { + defer buffer.close() +loop: + for { + select { + case <-buffer.ctx.Done(): + return + case element, ok := <-buffer.inputBuffer: + // if !ok || element == nil { + // continue loop + // } + if !ok { + continue loop + } + select { + case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED + continue loop + default: + select { + case oldElement := <-buffer.bufferChannel: + buffer.PutBack(oldElement) + select { + case buffer.bufferChannel <- element: + continue loop + default: + fmt.Println("unexpected buffer state. skipping the element..") + buffer.PutBack(element) + } + } + } + } + } +} + +func (buffer *ChannelBuffer[T]) close() { + buffer.mux.Lock() + buffer.closed = true + buffer.mux.Unlock() + +loop: + for { + select { + case element := <-buffer.bufferChannel: + if buffer.pool != nil { + buffer.pool.Put(element) + } + default: + close(buffer.bufferChannel) + close(buffer.inputBuffer) + break loop + } + } + if buffer.pool != nil { + buffer.pool.Release() + } +} diff --git a/dependencies/tools/pkg/buffer/package.go b/dependencies/tools/pkg/buffer/package.go new file mode 100644 index 0000000..8f4bdad --- /dev/null +++ b/dependencies/tools/pkg/buffer/package.go @@ -0,0 +1,26 @@ +// TODO: CLEAN THIS; THIS IS STUPID + +package buffer + +import "context" + +type Pool[T any] interface { + Get() T + Put(T) + Release() +} + +type Buffer[T any] interface { + Push(context.Context, T) error + Pop(ctx context.Context) (T, error) + Size() int +} + +type BufferWithGenerator[T any] interface { + Push(context.Context, T) error + Pop(ctx context.Context) (T, error) + Size() int + Generate() T + PutBack(T) + GetChannel() chan T +} diff --git a/dependencies/tools/pkg/buffer/packet_pool.go b/dependencies/tools/pkg/buffer/packet_pool.go new file mode 100644 index 0000000..a2861b7 --- /dev/null +++ b/dependencies/tools/pkg/buffer/packet_pool.go @@ -0,0 +1,55 @@ +//go:build cgo_enabled + +package buffer + +import ( + "sync" + + "github.com/asticode/go-astiav" +) + +type packetPool struct { + pool sync.Pool +} + +func CreatePacketPool() Pool[*astiav.Packet] { + return &packetPool{ + pool: sync.Pool{ + New: func() any { + return astiav.AllocPacket() + }, + }, + } +} + +func (pool *packetPool) Get() *astiav.Packet { + packet, ok := pool.pool.Get().(*astiav.Packet) + + if packet == nil || !ok { + return astiav.AllocPacket() + } + return packet +} + +func (pool *packetPool) Put(packet *astiav.Packet) { + if packet == nil { + return + } + + packet.Unref() + pool.pool.Put(packet) +} + +func (pool *packetPool) Release() { + for { + packet, ok := pool.pool.Get().(*astiav.Packet) + if packet == nil { + break + } + if !ok { + continue + } + // fmt.Printf("🗑️ Releasing packet: ptr=%p\n", packet) + packet.Free() + } +} diff --git a/dependencies/tools/pkg/multierr/multierr.go b/dependencies/tools/pkg/multierr/multierr.go new file mode 100644 index 0000000..a9d2290 --- /dev/null +++ b/dependencies/tools/pkg/multierr/multierr.go @@ -0,0 +1,116 @@ +// Package multierr provides utilities for combining multiple errors into a single error. +package multierr + +import ( + "errors" + "fmt" + "strings" +) + +// Combine takes a list of errors and combines them into a single error. +// If all errors are nil, it returns nil. +// If there's only one non-nil error, it returns that error. +// If there are multiple non-nil errors, it combines them into a single error. +func Combine(errs ...error) error { + var nonNilErrs []error + for _, err := range errs { + if err != nil { + nonNilErrs = append(nonNilErrs, err) + } + } + + if len(nonNilErrs) == 0 { + return nil + } + + if len(nonNilErrs) == 1 { + return nonNilErrs[0] + } + + return &multiError{ + errors: nonNilErrs, + } +} + +// Append combines the two errors into a single error. +// If both errors are nil, it returns nil. +// If one error is nil, it returns the non-nil error. +// If both errors are non-nil, it combines them into a single error. +func Append(err1, err2 error) error { + if err1 == nil { + return err2 + } + + if err2 == nil { + return err1 + } + + // If err1 is already a multiError, append err2 to it + if me, ok := err1.(*multiError); ok { + return &multiError{ + errors: append(me.errors, err2), + } + } + + // If err2 is already a multiError, prepend err1 to it + if me, ok := err2.(*multiError); ok { + return &multiError{ + errors: append([]error{err1}, me.errors...), + } + } + + // Neither error is a multiError, create a new one + return &multiError{ + errors: []error{err1, err2}, + } +} + +// multiError is an implementation of error that contains multiple errors. +type multiError struct { + errors []error +} + +// Error returns a string representation of all the errors. +func (m *multiError) Error() string { + if len(m.errors) == 0 { + return "" + } + + if len(m.errors) == 1 { + return m.errors[0].Error() + } + + var sb strings.Builder + sb.WriteString(fmt.Sprintf("%d errors occurred:\n", len(m.errors))) + for i, err := range m.errors { + sb.WriteString(fmt.Sprintf(" %d: %s\n", i+1, err.Error())) + } + return sb.String() +} + +// Unwrap returns the underlying errors. +// This allows errors.Is and errors.As to work with multiError. +func (m *multiError) Unwrap() []error { + return m.errors +} + +// Is reports whether any error in multiError matches target. +func (m *multiError) Is(target error) bool { + for _, err := range m.errors { + if errors.Is(err, target) { + return true + } + } + return false +} + +// As finds the first error in multiError that matches target, and if one is found, sets +// target to that error value and returns true. Otherwise, it returns false. +func (m *multiError) As(target interface{}) bool { + for _, err := range m.errors { + if errors.As(err, target) { + return true + } + } + return false +} diff --git a/go.mod b/go.mod index 49e000c..c222623 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,13 @@ module github.com/harshabose/simple_webrtc_comm -go 1.24 - -toolchain go1.24.1 +go 1.24.1 require ( github.com/asticode/go-astiav v0.37.0 github.com/coder/websocket v1.8.13 github.com/harshabose/mediapipe v0.0.0 github.com/harshabose/simple_webrtc_comm/client v0.0.0 - github.com/harshabose/tools/buffer v0.0.0 + github.com/harshabose/tools v0.0.0 github.com/pion/interceptor v0.1.40 github.com/pion/webrtc/v4 v4.1.2 ) @@ -74,7 +72,7 @@ require ( ) replace ( - github.com/harshabose/mediapipe => ../mediapipe + github.com/harshabose/mediapipe => ./dependencies/mediapipe github.com/harshabose/simple_webrtc_comm/client => ./dependencies/client - github.com/harshabose/tools/buffer => ./dependencies/tools/buffer + github.com/harshabose/tools => ./dependencies/tools ) diff --git a/pkg/message/session.go b/pkg/message/session.go index 64b1c4c..941bc7f 100644 --- a/pkg/message/session.go +++ b/pkg/message/session.go @@ -10,14 +10,14 @@ import ( "github.com/coder/websocket" - "github.com/harshabose/simple_webrtc_comm/client/pkg" + "github.com/harshabose/simple_webrtc_comm/client" "github.com/harshabose/simple_webrtc_comm/pkg/config" ) type Session struct { config config.Config Conn *websocket.Conn - PeerConnection *client.PeerConnection + PeerConnection *client.client mux sync.RWMutex }