removed delivery code; added fpv code

This commit is contained in:
harshabose
2025-07-11 10:40:44 +05:30
parent 9bec0ffe18
commit 3af2baf2e4
26 changed files with 529 additions and 73 deletions

1
.gitignore vendored
View File

@@ -19,6 +19,7 @@ vendor/
# Build directories
build/
build_bak/
third_party/
# Environment files

6
.gitmodules vendored
View File

@@ -7,3 +7,9 @@
[submodule "dependencies/mediapipe"]
path = dependencies/mediapipe
url = https://github.com/harshabose/mediapipe.git
[submodule "dependencies/tools/multierr"]
path = dependencies/tools/multierr
url = https://github.com/harshabose/multierr.git
[submodule "dependencies/services"]
path = dependencies/services
url = https://github.com/harshabose/services.git

View File

@@ -29,7 +29,7 @@ RUN make install-mavp2p
# Will create complete.env file
RUN make create-env-file
RUN bash -c 'make build-delivery-gcs'
RUN bash -c 'make build-fpv-gcs'
# Run main application
CMD bash -c 'make run-delivery-gcs'

View File

@@ -1,4 +1,4 @@
package delivery
package fpv
const (
DefaultVideoClockRate uint32 = 90000

View File

@@ -1,6 +1,6 @@
//go:build cgo_enabled
package delivery
package fpv
import "github.com/asticode/go-astiav"

View File

@@ -8,19 +8,20 @@ import (
"time"
"github.com/asticode/go-astiav"
"github.com/harshabose/mediapipe"
"github.com/harshabose/mediapipe/pkg/dioreader"
"github.com/harshabose/mediapipe/pkg/diowriter"
"github.com/harshabose/mediapipe/pkg/loopback"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
"github.com/harshabose/simple_webrtc_comm/client/pkg"
"github.com/harshabose/mediapipe"
"github.com/harshabose/mediapipe/pkg/consumers"
"github.com/harshabose/mediapipe/pkg/duplexers"
"github.com/harshabose/mediapipe/pkg/generators"
"github.com/harshabose/tools/pkg/buffer"
"github.com/harshabose/simple_webrtc_comm/client"
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource"
"github.com/harshabose/simple_webrtc_comm/client/pkg/transcode"
"github.com/harshabose/simple_webrtc_comm/cmd/delivery"
"github.com/harshabose/tools/buffer/pkg"
"github.com/harshabose/simple_webrtc_comm/cmd/fpv"
)
func main() {
@@ -30,7 +31,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l, err := loopback.NewLoopBack(context.Background(), "127.0.0.1:14559")
l, err := duplexers.NewLoopBack(context.Background(), "127.0.0.1:14559")
if err != nil {
panic(err)
}
@@ -42,29 +43,29 @@ func main() {
transcode.WithGeneralDemuxer(ctx,
"0",
transcode.WithAvFoundationInputFormatOption,
transcode.WithDemuxerBuffer(int(delivery.DefaultVideoFPS), pPool),
transcode.WithDemuxerBuffer(int(fpv.DefaultVideoFPS), pPool),
),
transcode.WithGeneralDecoder(ctx,
transcode.WithDecoderBuffer(int(delivery.DefaultVideoFPS), fPool),
transcode.WithDecoderBuffer(int(fpv.DefaultVideoFPS), fPool),
),
transcode.WithGeneralFilter(ctx,
transcode.VideoFilters,
transcode.WithFilterBuffer(int(delivery.DefaultVideoFPS), fPool),
transcode.WithVideoScaleFilterContent(delivery.DefaultVideoWidth, delivery.DefaultVideoHeight),
transcode.WithVideoPixelFormatFilterContent(delivery.DefaultPixelFormat),
transcode.WithVideoFPSFilterContent(delivery.DefaultVideoFPS),
transcode.WithFilterBuffer(int(fpv.DefaultVideoFPS), fPool),
transcode.WithVideoScaleFilterContent(fpv.DefaultVideoWidth, fpv.DefaultVideoHeight),
transcode.WithVideoPixelFormatFilterContent(fpv.DefaultPixelFormat),
transcode.WithVideoFPSFilterContent(fpv.DefaultVideoFPS),
),
transcode.WithGeneralEncoder(
ctx,
astiav.CodecIDH264,
transcode.WithCodecSettings(transcode.LowLatencyBitrateControlled),
transcode.WithEncoderBufferSize(int(delivery.DefaultVideoFPS), pPool),
transcode.WithEncoderBufferSize(int(fpv.DefaultVideoFPS), pPool),
),
// transcode.WithMultiEncoderBitrateControl(ctx,
// astiav.CodecIDH264,
// transcode.NewMultiConfig(delivery.MinimumBitrate, delivery.MaximumBitrate, 10),
// transcode.NewMultiConfig(fpv.MinimumBitrate, fpv.MaximumBitrate, 10),
// transcode.LowLatencyBitrateControlled,
// int(delivery.DefaultVideoFPS), buffer.CreatePacketPool(),
// int(fpv.DefaultVideoFPS), buffer.CreatePacketPool(),
// ),
)
if err != nil {
@@ -77,8 +78,8 @@ func main() {
drone, err := client.CreateClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(delivery.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, delivery.DefaultSPSBase64, delivery.DefaultPPSBase64),
client.WithBandwidthControlInterceptor(delivery.InitialBitrate, delivery.MinimumBitrate, delivery.MaximumBitrate, time.Second),
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second),
client.WithTWCCHeaderExtensionSender(),
client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency),
client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency),
@@ -107,7 +108,7 @@ func main() {
}
track, err := pc.CreateMediaSource("A8-MINI",
mediasource.WithH264Track(delivery.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31),
mediasource.WithH264Track(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31),
mediasource.WithPriority(mediasource.Level5),
)
if err != nil {
@@ -132,11 +133,11 @@ func main() {
rl := mediapipe.NewIdentityAnyReader[[]byte](l)
wl := mediapipe.NewIdentityAnyWriter[[]byte](l)
ird, err := dioreader.NewDataChannel(datachannel.DataChannel(), math.MaxUint16)
ird, err := generators.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16)
if err != nil {
panic(err)
}
iwd, err := diowriter.NewDataChannel(datachannel.DataChannel(), math.MaxUint16)
iwd, err := consumers.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16)
if err != nil {
panic(err)
}
@@ -149,14 +150,14 @@ func main() {
s := media.Sample{
Data: make([]byte, packet.Size()),
Timestamp: time.Now(),
Duration: time.Second / time.Duration(delivery.DefaultVideoFPS),
Duration: time.Second / time.Duration(fpv.DefaultVideoFPS),
PacketTimestamp: uint32(packet.Pts()),
PrevDroppedPackets: 0,
Metadata: nil,
RTPHeaders: nil,
}
copy(s.Data, packet.Data())
transcoder.PutBack(packet)
pPool.Put(packet)
return s, nil
})

View File

@@ -5,17 +5,17 @@ import (
"math"
"time"
"github.com/harshabose/mediapipe"
"github.com/harshabose/mediapipe/pkg/dioreader"
"github.com/harshabose/mediapipe/pkg/diowriter"
"github.com/harshabose/mediapipe/pkg/loopback"
"github.com/harshabose/mediapipe/pkg/rtsp"
"github.com/harshabose/services/pkg/rtsp"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
"github.com/harshabose/simple_webrtc_comm/client/pkg"
"github.com/harshabose/mediapipe"
"github.com/harshabose/mediapipe/pkg/consumers"
"github.com/harshabose/mediapipe/pkg/duplexers"
"github.com/harshabose/mediapipe/pkg/generators"
"github.com/harshabose/simple_webrtc_comm/client"
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink"
"github.com/harshabose/simple_webrtc_comm/cmd/delivery"
"github.com/harshabose/simple_webrtc_comm/cmd/fpv"
)
func main() {
@@ -25,7 +25,7 @@ func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
l, err := loopback.NewLoopBack(context.Background(), "127.0.0.1:0", loopback.WithLoopBackPort(14550))
l, err := duplexers.NewLoopBack(context.Background(), "127.0.0.1:0", duplexers.WithLoopBackPort(14550))
if err != nil {
panic(err)
}
@@ -36,7 +36,7 @@ func main() {
gcs, err := client.CreateClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(delivery.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, delivery.DefaultSPSBase64, delivery.DefaultPPSBase64),
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithTWCCHeaderExtensionSender(),
client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency),
client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency),
@@ -81,16 +81,16 @@ func main() {
panic(err)
}
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
rl := mediapipe.NewIdentityAnyReader[[]byte](l)
wl := mediapipe.NewIdentityAnyWriter[[]byte](l)
ird, err := dioreader.NewDataChannel(datachannel.DataChannel(), math.MaxUint16)
ird, err := generators.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16)
if err != nil {
panic(err)
}
iwd, err := diowriter.NewDataChannel(datachannel.DataChannel(), math.MaxUint16)
iwd, err := consumers.NewIODataChannel(datachannel.DataChannel(), math.MaxUint16)
if err != nil {
panic(err)
}

View File

@@ -1,8 +0,0 @@
package main
import "fmt"
func main() {
// Here use the simple_skyline_sonata SDK
fmt.Println("Simple Skyline Sonata says Hi! 👋 ")
}

View File

@@ -1,15 +0,0 @@
[Unit]
Description=Skyline Sonata Sample Client Service
After=network.target
[Service]
Type=simple
User=drone
WorkingDirectory=/path/to/build/directory/sample/client
ExecStart=/path/to/build/directory/sample/client/skyline_sonata.sample.client
Environment=/etc/skyline-sonata/sample/complete.env
Restart=on-failure
RestartSec=5s
[Install]
WantedBy=multi-user.target

1
dependencies/services vendored Submodule

Submodule dependencies/services added at 3fde5ed19e

50
dependencies/tools/.gitignore vendored Normal file
View File

@@ -0,0 +1,50 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool
*.out
# Go workspace file
go.work
# Dependency directories
vendor/
# Build directories
build/
third_party/
# Environment files
*.env
!.env.example
# macOS
.DS_Store
.DS_Store?
._*
.Spotlight-V100
.Trashes
# Windows
ehthumbs.db
Thumbs.db
# Linux
*~
# IDE files
.vscode/
.idea/
*.swp
*.swo
# Logs
*.log
logs/

7
dependencies/tools/go.mod vendored Normal file
View File

@@ -0,0 +1,7 @@
module github.com/harshabose/tools
go 1.24.1
require github.com/asticode/go-astiav v0.37.0
require github.com/asticode/go-astikit v0.42.0 // indirect

12
dependencies/tools/go.sum vendored Normal file
View File

@@ -0,0 +1,12 @@
github.com/asticode/go-astiav v0.37.0 h1:Ph4usW4lulotVvne8hqZ1JCOHX1f8ces6yVKdg+PnyQ=
github.com/asticode/go-astiav v0.37.0/go.mod h1:GI0pHw6K2/pl/o8upCtT49P/q4KCwhv/8nGLlCsZLdA=
github.com/asticode/go-astikit v0.42.0 h1:pnir/2KLUSr0527Tv908iAH6EGYYrYta132vvjXsH5w=
github.com/asticode/go-astikit v0.42.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@@ -0,0 +1,8 @@
package buffer
import "errors"
var (
ErrorElementUnallocated = errors.New("encountered nil in the buffer. this should not happen. check usage")
ErrorChannelBufferClose = errors.New("channel buffer has be closed. cannot perform this operation")
)

View File

@@ -0,0 +1,55 @@
//go:build cgo_enabled
package buffer
import (
"sync"
"github.com/asticode/go-astiav"
)
type framePool struct {
pool sync.Pool
}
func CreateFramePool() Pool[*astiav.Frame] {
return &framePool{
pool: sync.Pool{
New: func() any {
return astiav.AllocFrame()
},
},
}
}
func (pool *framePool) Get() *astiav.Frame {
frame, ok := pool.pool.Get().(*astiav.Frame)
if frame == nil || !ok {
return astiav.AllocFrame()
}
return frame
}
func (pool *framePool) Put(frame *astiav.Frame) {
if frame == nil {
return
}
frame.Unref()
pool.pool.Put(frame)
}
func (pool *framePool) Release() {
for {
frame, ok := pool.pool.Get().(*astiav.Frame)
if frame == nil {
break
}
if !ok {
continue
}
frame.Free()
}
}

View File

@@ -0,0 +1,144 @@
package buffer
import (
"context"
"errors"
"fmt"
"sync"
)
type ChannelBuffer[T any] struct {
pool Pool[T]
bufferChannel chan T
inputBuffer chan T
closed bool
mux sync.RWMutex
ctx context.Context
}
func CreateChannelBuffer[T any](ctx context.Context, size int, pool Pool[T]) *ChannelBuffer[T] {
buffer := &ChannelBuffer[T]{
pool: pool,
bufferChannel: make(chan T, size),
inputBuffer: make(chan T, size),
closed: false,
ctx: ctx,
}
go buffer.loop()
return buffer
}
func (buffer *ChannelBuffer[T]) Push(ctx context.Context, element T) error {
buffer.mux.RLock()
defer buffer.mux.RUnlock()
if buffer.closed {
return errors.New("buffer closed")
}
select {
case buffer.inputBuffer <- element:
// WARN: LACKS CHECKS FOR CLOSED CHANNEL
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func (buffer *ChannelBuffer[T]) Pop(ctx context.Context) (T, error) {
buffer.mux.RLock()
defer buffer.mux.RUnlock()
var zero T
if buffer.closed {
return zero, errors.New("buffer closed")
}
select {
case <-ctx.Done():
return zero, ctx.Err()
case data, ok := <-buffer.bufferChannel:
if !ok {
return zero, ErrorChannelBufferClose
}
// TODO: NIL CHECK IS REQUIRED BUT GENERIC CANNOT DO THIS. SOLVE THIS ASAP
// if data == nil {
// return zero, ErrorElementUnallocated
// }
return data, nil
}
}
func (buffer *ChannelBuffer[T]) Generate() T {
return buffer.pool.Get()
}
func (buffer *ChannelBuffer[T]) PutBack(element T) {
if buffer.pool != nil {
buffer.pool.Put(element)
}
}
func (buffer *ChannelBuffer[T]) GetChannel() chan T {
return buffer.bufferChannel
}
func (buffer *ChannelBuffer[T]) Size() int {
return len(buffer.bufferChannel)
}
func (buffer *ChannelBuffer[T]) loop() {
defer buffer.close()
loop:
for {
select {
case <-buffer.ctx.Done():
return
case element, ok := <-buffer.inputBuffer:
// if !ok || element == nil {
// continue loop
// }
if !ok {
continue loop
}
select {
case buffer.bufferChannel <- element: // SUCCESSFULLY BUFFERED
continue loop
default:
select {
case oldElement := <-buffer.bufferChannel:
buffer.PutBack(oldElement)
select {
case buffer.bufferChannel <- element:
continue loop
default:
fmt.Println("unexpected buffer state. skipping the element..")
buffer.PutBack(element)
}
}
}
}
}
}
func (buffer *ChannelBuffer[T]) close() {
buffer.mux.Lock()
buffer.closed = true
buffer.mux.Unlock()
loop:
for {
select {
case element := <-buffer.bufferChannel:
if buffer.pool != nil {
buffer.pool.Put(element)
}
default:
close(buffer.bufferChannel)
close(buffer.inputBuffer)
break loop
}
}
if buffer.pool != nil {
buffer.pool.Release()
}
}

View File

@@ -0,0 +1,26 @@
// TODO: CLEAN THIS; THIS IS STUPID
package buffer
import "context"
type Pool[T any] interface {
Get() T
Put(T)
Release()
}
type Buffer[T any] interface {
Push(context.Context, T) error
Pop(ctx context.Context) (T, error)
Size() int
}
type BufferWithGenerator[T any] interface {
Push(context.Context, T) error
Pop(ctx context.Context) (T, error)
Size() int
Generate() T
PutBack(T)
GetChannel() chan T
}

View File

@@ -0,0 +1,55 @@
//go:build cgo_enabled
package buffer
import (
"sync"
"github.com/asticode/go-astiav"
)
type packetPool struct {
pool sync.Pool
}
func CreatePacketPool() Pool[*astiav.Packet] {
return &packetPool{
pool: sync.Pool{
New: func() any {
return astiav.AllocPacket()
},
},
}
}
func (pool *packetPool) Get() *astiav.Packet {
packet, ok := pool.pool.Get().(*astiav.Packet)
if packet == nil || !ok {
return astiav.AllocPacket()
}
return packet
}
func (pool *packetPool) Put(packet *astiav.Packet) {
if packet == nil {
return
}
packet.Unref()
pool.pool.Put(packet)
}
func (pool *packetPool) Release() {
for {
packet, ok := pool.pool.Get().(*astiav.Packet)
if packet == nil {
break
}
if !ok {
continue
}
// fmt.Printf("🗑️ Releasing packet: ptr=%p\n", packet)
packet.Free()
}
}

View File

@@ -0,0 +1,116 @@
// Package multierr provides utilities for combining multiple errors into a single error.
package multierr
import (
"errors"
"fmt"
"strings"
)
// Combine takes a list of errors and combines them into a single error.
// If all errors are nil, it returns nil.
// If there's only one non-nil error, it returns that error.
// If there are multiple non-nil errors, it combines them into a single error.
func Combine(errs ...error) error {
var nonNilErrs []error
for _, err := range errs {
if err != nil {
nonNilErrs = append(nonNilErrs, err)
}
}
if len(nonNilErrs) == 0 {
return nil
}
if len(nonNilErrs) == 1 {
return nonNilErrs[0]
}
return &multiError{
errors: nonNilErrs,
}
}
// Append combines the two errors into a single error.
// If both errors are nil, it returns nil.
// If one error is nil, it returns the non-nil error.
// If both errors are non-nil, it combines them into a single error.
func Append(err1, err2 error) error {
if err1 == nil {
return err2
}
if err2 == nil {
return err1
}
// If err1 is already a multiError, append err2 to it
if me, ok := err1.(*multiError); ok {
return &multiError{
errors: append(me.errors, err2),
}
}
// If err2 is already a multiError, prepend err1 to it
if me, ok := err2.(*multiError); ok {
return &multiError{
errors: append([]error{err1}, me.errors...),
}
}
// Neither error is a multiError, create a new one
return &multiError{
errors: []error{err1, err2},
}
}
// multiError is an implementation of error that contains multiple errors.
type multiError struct {
errors []error
}
// Error returns a string representation of all the errors.
func (m *multiError) Error() string {
if len(m.errors) == 0 {
return ""
}
if len(m.errors) == 1 {
return m.errors[0].Error()
}
var sb strings.Builder
sb.WriteString(fmt.Sprintf("%d errors occurred:\n", len(m.errors)))
for i, err := range m.errors {
sb.WriteString(fmt.Sprintf(" %d: %s\n", i+1, err.Error()))
}
return sb.String()
}
// Unwrap returns the underlying errors.
// This allows errors.Is and errors.As to work with multiError.
func (m *multiError) Unwrap() []error {
return m.errors
}
// Is reports whether any error in multiError matches target.
func (m *multiError) Is(target error) bool {
for _, err := range m.errors {
if errors.Is(err, target) {
return true
}
}
return false
}
// As finds the first error in multiError that matches target, and if one is found, sets
// target to that error value and returns true. Otherwise, it returns false.
func (m *multiError) As(target interface{}) bool {
for _, err := range m.errors {
if errors.As(err, target) {
return true
}
}
return false
}

10
go.mod
View File

@@ -1,15 +1,13 @@
module github.com/harshabose/simple_webrtc_comm
go 1.24
toolchain go1.24.1
go 1.24.1
require (
github.com/asticode/go-astiav v0.37.0
github.com/coder/websocket v1.8.13
github.com/harshabose/mediapipe v0.0.0
github.com/harshabose/simple_webrtc_comm/client v0.0.0
github.com/harshabose/tools/buffer v0.0.0
github.com/harshabose/tools v0.0.0
github.com/pion/interceptor v0.1.40
github.com/pion/webrtc/v4 v4.1.2
)
@@ -74,7 +72,7 @@ require (
)
replace (
github.com/harshabose/mediapipe => ../mediapipe
github.com/harshabose/mediapipe => ./dependencies/mediapipe
github.com/harshabose/simple_webrtc_comm/client => ./dependencies/client
github.com/harshabose/tools/buffer => ./dependencies/tools/buffer
github.com/harshabose/tools => ./dependencies/tools
)

View File

@@ -10,14 +10,14 @@ import (
"github.com/coder/websocket"
"github.com/harshabose/simple_webrtc_comm/client/pkg"
"github.com/harshabose/simple_webrtc_comm/client"
"github.com/harshabose/simple_webrtc_comm/pkg/config"
)
type Session struct {
config config.Config
Conn *websocket.Conn
PeerConnection *client.PeerConnection
PeerConnection *client.client
mux sync.RWMutex
}