refactored to use mediapipe consumers in mediasink

This commit is contained in:
harshabose
2025-07-12 17:21:43 +05:30
parent 6ec3a458a8
commit a25dcbda61
10 changed files with 98 additions and 65 deletions

View File

@@ -76,7 +76,7 @@ func main() {
registry := &interceptor.Registry{}
settings := &webrtc.SettingEngine{}
drone, err := client.CreateClient(
drone, err := client.NewClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
// client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second),

View File

@@ -33,7 +33,7 @@ func main() {
registry := &interceptor.Registry{}
settings := &webrtc.SettingEngine{}
gcs, err := client.CreateClient(
gcs, err := client.NewClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithTWCCHeaderExtensionSender(),
@@ -62,7 +62,7 @@ func main() {
panic(err)
}
if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&duplexers.RTSPClientConfig{
if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&consumers.RTSPClientConfig{
ServerAddr: "localhost",
ServerPort: 8554,
StreamPath: "DELIVERY/A8-MINI",

View File

@@ -11,10 +11,3 @@ const (
DefaultSPSBase64 = "AAAAAWdCwB/aA2D3m4QAAAMABAAAAwDLkYAMNQAYa4pIAeMGVA=="
DefaultPPSBase64 = "AAAAAWjOPIA="
)
const (
InitialBitrate int64 = 500_000
MinimumBitrate int64 = 100_000
MaximumBitrate int64 = 5_000_000
CutVideoBelowMinimumBitrate = false
)

View File

@@ -61,12 +61,6 @@ func main() {
transcode.WithCodecSettings(transcode.LowLatencyBitrateControlled),
transcode.WithEncoderBufferSize(int(fpv.DefaultVideoFPS), pPool),
),
// transcode.WithMultiEncoderBitrateControl(ctx,
// astiav.CodecIDH264,
// transcode.NewMultiConfig(fpv.MinimumBitrate, fpv.MaximumBitrate, 10),
// transcode.LowLatencyBitrateControlled,
// int(fpv.DefaultVideoFPS), buffer.CreatePacketPool(),
// ),
)
if err != nil {
panic(err)
@@ -76,15 +70,12 @@ func main() {
registry := &interceptor.Registry{}
settings := &webrtc.SettingEngine{}
drone, err := client.CreateClient(
drone, err := client.NewClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
// client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second),
// client.WithTWCCHeaderExtensionSender(),
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency),
client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency),
client.WithSimulcastExtensionHeaders(),
// client.WithTWCCSenderInterceptor(client.TWCCIntervalLowLatency),
)
if err != nil {
panic(err)
@@ -96,7 +87,6 @@ func main() {
client.WithFirebaseOfferSignal,
client.WithMediaSources(),
client.WithDataChannels(),
// client.WithBandwidthControl(),
)
if err != nil {
panic(err)
@@ -115,20 +105,11 @@ func main() {
panic(err)
}
// bwe, err := pc.GetBWEstimator()
// if err != nil {
// panic(err)
// }
//
// if err := bwe.Subscribe("A8-MINI", track.GetPriority(), transcoder.OnUpdateBitrate()); err != nil {
// panic(err)
// }
if err := pc.Connect("FPV"); err != nil {
panic(err)
}
time.Sleep(5 * time.Second)
time.Sleep(10 * time.Second)
rl := mediapipe.NewIdentityAnyReader[[]byte](l)
wl := mediapipe.NewIdentityAnyWriter[[]byte](l)
@@ -145,7 +126,7 @@ func main() {
rd := mediapipe.NewIdentityAnyReader[[]byte](ird)
wd := mediapipe.NewIdentityAnyWriter[[]byte](iwd)
w := mediapipe.NewAnyWriter[media.Sample, *astiav.Packet](track, nil)
w := mediapipe.NewAnyWriter[media.Sample, *astiav.Packet](consumers.NewPionSampleConsumer(track), nil)
r := mediapipe.NewAnyReader[media.Sample, *astiav.Packet](transcoder, func(packet *astiav.Packet) (media.Sample, error) {
s := media.Sample{
Data: make([]byte, packet.Size()),
@@ -166,7 +147,6 @@ func main() {
mediapipe.NewAnyPipe(ctx, rl, wd).Start()
mediapipe.NewAnyPipe(ctx, rd, wl).Start()
mediapipe.NewAnyPipe(ctx, r, w).Start()
// bwe.Start()
drone.WaitUntilClosed()
}()

View File

@@ -6,6 +6,7 @@ import (
"time"
"github.com/pion/interceptor"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
"github.com/harshabose/mediapipe"
@@ -14,6 +15,7 @@ import (
"github.com/harshabose/mediapipe/pkg/generators"
"github.com/harshabose/simple_webrtc_comm/client"
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink"
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource"
"github.com/harshabose/simple_webrtc_comm/cmd/fpv"
)
@@ -29,13 +31,34 @@ func main() {
panic(err)
}
rtspClient, err := consumers.NewRTSPClient(
ctx,
&consumers.RTSPClientConfig{
ServerAddr: "localhost",
ServerPort: 8554,
StreamPath: "fpv/main/a8-mini",
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
DialTimeout: 10 * time.Second,
ReconnectAttempts: 10,
ReconnectDelay: 2 * time.Second,
UserAgent: "GoRTSP-FPV/main",
},
nil,
consumers.WithH264Options(consumers.PacketisationMode1, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
)
if err != nil {
panic(err)
}
rtspClient.Start()
mediaEngine := &webrtc.MediaEngine{}
registry := &interceptor.Registry{}
settings := &webrtc.SettingEngine{}
gcs, err := client.CreateClient(
gcs, err := client.NewClient(
ctx, cancel, mediaEngine, registry, settings,
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithH264MediaEngine(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64),
client.WithTWCCHeaderExtensionSender(),
client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency),
client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency),
@@ -62,17 +85,8 @@ func main() {
panic(err)
}
if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&duplexers.RTSPClientConfig{
ServerAddr: "localhost",
ServerPort: 8554,
StreamPath: "DELIVERY/A8-MINI",
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
DialTimeout: 10 * time.Second,
ReconnectAttempts: 10,
ReconnectDelay: 2 * time.Second,
UserAgent: "GoRTSP-Host/1.0",
})); err != nil {
sink, err := pc.CreateMediaSink("A8-MINI", mediasink.WithH264Track(fpv.DefaultVideoClockRate))
if err != nil {
panic(err)
}
@@ -97,8 +111,12 @@ func main() {
rd := mediapipe.NewIdentityAnyReader[[]byte](ird)
wd := mediapipe.NewIdentityAnyWriter[[]byte](iwd)
r := mediapipe.NewIdentityAnyReader[*rtp.Packet](generators.NewPionRTPGenerator(sink))
w := mediapipe.NewIdentityAnyWriter[*rtp.Packet](rtspClient)
mediapipe.NewAnyPipe(ctx, rl, wd).Start()
mediapipe.NewAnyPipe(ctx, rd, wl).Start()
mediapipe.NewAnyPipe(ctx, r, w).Start()
gcs.WaitUntilClosed()
}()

View File

@@ -1,10 +1,14 @@
package config
import (
"context"
"time"
)
// WARN: DO NOT COMPILE THIS FILE. THIS IS FOR FUTURE
"github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
"github.com/harshabose/simple_webrtc_comm/client"
)
type DataChannelConfig struct {
Label string `json:"label" yaml:"label"`
@@ -16,8 +20,8 @@ type DataChannelConfig struct {
}
type MediaConfig struct {
Audio []AudioConfig `json:"audio" yaml:"audio"`
Video []VideoConfig `json:"video" yaml:"video"`
Audio []AudioConfig `json:"audio,omitempty" yaml:"audio,omitempty"`
Video []VideoConfig `json:"video,omitempty" yaml:"video,omitempty"`
}
type AudioConfig struct {
@@ -54,10 +58,13 @@ type LoggingConfig struct {
}
type Config struct {
DataChannels []DataChannelConfig `json:"data_channels" yaml:"data_channels"`
Media MediaConfig `json:"media" yaml:"media"`
ClientConfig client.ClientConfig `json:"client_config" yaml:"client_config"`
PeerConnectionConfigs []client.PeerConnectionConfig `json:"peer_connection_configs" yaml:"peer_connection_configs"`
MediaEngine webrtc.MediaEngine `json:"-"`
InterceptorRegistry interceptor.Registry `json:"-"`
SettingsEngine webrtc.SettingEngine `json:"-"`
// Security configuration
// Security SecurityConfig `json:"security" yaml:"security"`
Logging LoggingConfig `json:"logging" yaml:"logging"`
@@ -65,3 +72,32 @@ type Config struct {
UserID string `json:"user_id,omitempty" yaml:"user_id,omitempty"`
RoomID string `json:"room_id,omitempty" yaml:"room_id,omitempty"`
}
func (c *Config) GenerateClient(ctx context.Context, cancel context.CancelFunc) (*client.Client, error) {
return client.NewClientFromConfig(ctx, cancel, &c.MediaEngine, &c.InterceptorRegistry, &c.SettingsEngine, &c.ClientConfig)
}
func (c *Config) GeneratePeerConnection(client *client.Client, pcc client.PeerConnectionConfig) (*client.PeerConnection, error) {
pc, err := client.CreatePeerConnection(pcc.Name, pcc.RTCConfig, pcc.ToOptions()...)
if err != nil {
return nil, err
}
if err := pcc.CreateDataChannels(pc); err != nil {
// TODO: REMOVE PEER CONNECTION
return nil, err
}
if err := pcc.CreateMediaSources(pc); err != nil {
// TODO: REMOVE PEER CONNECTION
return nil, err
}
if err := pcc.CreateMediaSinks(pc); err != nil {
// TODO: REMOVE PEER CONNECTION
return nil, err
}
return pc, nil
}

View File

@@ -8,7 +8,7 @@ const IdentProtocol Protocol = "ident"
type Ident struct {
BaseMessage
Config config.Config `json:"config"`
Config *config.Config `json:"config"`
}
func (m *Ident) GetProtocol() Protocol {

View File

@@ -15,10 +15,13 @@ import (
)
type Session struct {
config config.Config
Conn *websocket.Conn
PeerConnection *client.Client
mux sync.RWMutex
config *config.Config
Conn *websocket.Conn
Client *client.Client
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mux sync.RWMutex
}
func (s *Session) Write(ctx context.Context, msg Message) error {
@@ -34,10 +37,13 @@ func (s *Session) Write(ctx context.Context, msg Message) error {
return nil
}
func NewSessionWithIdent(ident *Ident, conn *websocket.Conn, client *client.Client) *Session {
func NewSessionWithIdent(ctx context.Context, ident *Ident, conn *websocket.Conn, client *client.Client) *Session {
ctx2, cancel2 := context.WithCancel(ctx)
return &Session{
config: ident.Config,
Conn: conn,
PeerConnection: client,
config: ident.Config,
Conn: conn,
Client: client,
ctx: ctx2,
cancel: cancel2,
}
}