diff --git a/cmd/delivery/drone/main.go b/cmd/delivery/drone/main.go index a9b6815..b1a29be 100644 --- a/cmd/delivery/drone/main.go +++ b/cmd/delivery/drone/main.go @@ -76,7 +76,7 @@ func main() { registry := &interceptor.Registry{} settings := &webrtc.SettingEngine{} - drone, err := client.CreateClient( + drone, err := client.NewClient( ctx, cancel, mediaEngine, registry, settings, client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), // client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second), diff --git a/cmd/delivery/gcs/main.go b/cmd/delivery/gcs/main.go index 2bd7a4b..d176607 100644 --- a/cmd/delivery/gcs/main.go +++ b/cmd/delivery/gcs/main.go @@ -33,7 +33,7 @@ func main() { registry := &interceptor.Registry{} settings := &webrtc.SettingEngine{} - gcs, err := client.CreateClient( + gcs, err := client.NewClient( ctx, cancel, mediaEngine, registry, settings, client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), client.WithTWCCHeaderExtensionSender(), @@ -62,7 +62,7 @@ func main() { panic(err) } - if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&duplexers.RTSPClientConfig{ + if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&consumers.RTSPClientConfig{ ServerAddr: "localhost", ServerPort: 8554, StreamPath: "DELIVERY/A8-MINI", diff --git a/cmd/fpv/constants.go b/cmd/fpv/constants.go index 150d6e7..bc3826c 100644 --- a/cmd/fpv/constants.go +++ b/cmd/fpv/constants.go @@ -11,10 +11,3 @@ const ( DefaultSPSBase64 = "AAAAAWdCwB/aA2D3m4QAAAMABAAAAwDLkYAMNQAYa4pIAeMGVA==" DefaultPPSBase64 = "AAAAAWjOPIA=" ) - -const ( - InitialBitrate int64 = 500_000 - MinimumBitrate int64 = 100_000 - MaximumBitrate int64 = 5_000_000 - CutVideoBelowMinimumBitrate = false -) diff --git a/cmd/fpv/drone/main.go b/cmd/fpv/drone/main.go index a9b6815..019dfce 100644 --- a/cmd/fpv/drone/main.go +++ b/cmd/fpv/drone/main.go @@ -61,12 +61,6 @@ func main() { transcode.WithCodecSettings(transcode.LowLatencyBitrateControlled), transcode.WithEncoderBufferSize(int(fpv.DefaultVideoFPS), pPool), ), - // transcode.WithMultiEncoderBitrateControl(ctx, - // astiav.CodecIDH264, - // transcode.NewMultiConfig(fpv.MinimumBitrate, fpv.MaximumBitrate, 10), - // transcode.LowLatencyBitrateControlled, - // int(fpv.DefaultVideoFPS), buffer.CreatePacketPool(), - // ), ) if err != nil { panic(err) @@ -76,15 +70,12 @@ func main() { registry := &interceptor.Registry{} settings := &webrtc.SettingEngine{} - drone, err := client.CreateClient( + drone, err := client.NewClient( ctx, cancel, mediaEngine, registry, settings, - client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), - // client.WithBandwidthControlInterceptor(fpv.InitialBitrate, fpv.MinimumBitrate, fpv.MaximumBitrate, time.Second), - // client.WithTWCCHeaderExtensionSender(), + client.WithH264MediaEngine(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency), client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency), client.WithSimulcastExtensionHeaders(), - // client.WithTWCCSenderInterceptor(client.TWCCIntervalLowLatency), ) if err != nil { panic(err) @@ -96,7 +87,6 @@ func main() { client.WithFirebaseOfferSignal, client.WithMediaSources(), client.WithDataChannels(), - // client.WithBandwidthControl(), ) if err != nil { panic(err) @@ -115,20 +105,11 @@ func main() { panic(err) } - // bwe, err := pc.GetBWEstimator() - // if err != nil { - // panic(err) - // } - // - // if err := bwe.Subscribe("A8-MINI", track.GetPriority(), transcoder.OnUpdateBitrate()); err != nil { - // panic(err) - // } - if err := pc.Connect("FPV"); err != nil { panic(err) } - time.Sleep(5 * time.Second) + time.Sleep(10 * time.Second) rl := mediapipe.NewIdentityAnyReader[[]byte](l) wl := mediapipe.NewIdentityAnyWriter[[]byte](l) @@ -145,7 +126,7 @@ func main() { rd := mediapipe.NewIdentityAnyReader[[]byte](ird) wd := mediapipe.NewIdentityAnyWriter[[]byte](iwd) - w := mediapipe.NewAnyWriter[media.Sample, *astiav.Packet](track, nil) + w := mediapipe.NewAnyWriter[media.Sample, *astiav.Packet](consumers.NewPionSampleConsumer(track), nil) r := mediapipe.NewAnyReader[media.Sample, *astiav.Packet](transcoder, func(packet *astiav.Packet) (media.Sample, error) { s := media.Sample{ Data: make([]byte, packet.Size()), @@ -166,7 +147,6 @@ func main() { mediapipe.NewAnyPipe(ctx, rl, wd).Start() mediapipe.NewAnyPipe(ctx, rd, wl).Start() mediapipe.NewAnyPipe(ctx, r, w).Start() - // bwe.Start() drone.WaitUntilClosed() }() diff --git a/cmd/fpv/gcs/main.go b/cmd/fpv/gcs/main.go index d813b6e..822ba2c 100644 --- a/cmd/fpv/gcs/main.go +++ b/cmd/fpv/gcs/main.go @@ -6,6 +6,7 @@ import ( "time" "github.com/pion/interceptor" + "github.com/pion/rtp" "github.com/pion/webrtc/v4" "github.com/harshabose/mediapipe" @@ -14,6 +15,7 @@ import ( "github.com/harshabose/mediapipe/pkg/generators" "github.com/harshabose/simple_webrtc_comm/client" "github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink" + "github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource" "github.com/harshabose/simple_webrtc_comm/cmd/fpv" ) @@ -29,13 +31,34 @@ func main() { panic(err) } + rtspClient, err := consumers.NewRTSPClient( + ctx, + &consumers.RTSPClientConfig{ + ServerAddr: "localhost", + ServerPort: 8554, + StreamPath: "fpv/main/a8-mini", + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + DialTimeout: 10 * time.Second, + ReconnectAttempts: 10, + ReconnectDelay: 2 * time.Second, + UserAgent: "GoRTSP-FPV/main", + }, + nil, + consumers.WithH264Options(consumers.PacketisationMode1, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), + ) + if err != nil { + panic(err) + } + rtspClient.Start() + mediaEngine := &webrtc.MediaEngine{} registry := &interceptor.Registry{} settings := &webrtc.SettingEngine{} - gcs, err := client.CreateClient( + gcs, err := client.NewClient( ctx, cancel, mediaEngine, registry, settings, - client.WithH264MediaEngine(fpv.DefaultVideoClockRate, client.PacketisationMode1, client.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), + client.WithH264MediaEngine(fpv.DefaultVideoClockRate, mediasource.PacketisationMode1, mediasource.ProfileLevelBaseline31, fpv.DefaultSPSBase64, fpv.DefaultPPSBase64), client.WithTWCCHeaderExtensionSender(), client.WithNACKInterceptor(client.NACKGeneratorLowLatency, client.NACKResponderLowLatency), client.WithRTCPReportsInterceptor(client.RTCPReportIntervalLowLatency), @@ -62,17 +85,8 @@ func main() { panic(err) } - if _, err := pc.CreateMediaSink("A8-MINI", mediasink.RTSPSink(&duplexers.RTSPClientConfig{ - ServerAddr: "localhost", - ServerPort: 8554, - StreamPath: "DELIVERY/A8-MINI", - ReadTimeout: 10 * time.Second, - WriteTimeout: 10 * time.Second, - DialTimeout: 10 * time.Second, - ReconnectAttempts: 10, - ReconnectDelay: 2 * time.Second, - UserAgent: "GoRTSP-Host/1.0", - })); err != nil { + sink, err := pc.CreateMediaSink("A8-MINI", mediasink.WithH264Track(fpv.DefaultVideoClockRate)) + if err != nil { panic(err) } @@ -97,8 +111,12 @@ func main() { rd := mediapipe.NewIdentityAnyReader[[]byte](ird) wd := mediapipe.NewIdentityAnyWriter[[]byte](iwd) + r := mediapipe.NewIdentityAnyReader[*rtp.Packet](generators.NewPionRTPGenerator(sink)) + w := mediapipe.NewIdentityAnyWriter[*rtp.Packet](rtspClient) + mediapipe.NewAnyPipe(ctx, rl, wd).Start() mediapipe.NewAnyPipe(ctx, rd, wl).Start() + mediapipe.NewAnyPipe(ctx, r, w).Start() gcs.WaitUntilClosed() }() diff --git a/dependencies/client b/dependencies/client index 6cdcd27..a8f9294 160000 --- a/dependencies/client +++ b/dependencies/client @@ -1 +1 @@ -Subproject commit 6cdcd271973d1e7897459bfd467edf166517a75e +Subproject commit a8f9294f8f143bdf50a4d78200b0cd23c140aeac diff --git a/dependencies/mediapipe b/dependencies/mediapipe index 26561eb..177cd11 160000 --- a/dependencies/mediapipe +++ b/dependencies/mediapipe @@ -1 +1 @@ -Subproject commit 26561ebccfc0d9770cb4a9d13d09b4873110ea2b +Subproject commit 177cd11d355af13686544573f81adb8c240f6f6e diff --git a/pkg/config/config.go b/pkg/config/config.go index 772d1a8..7ea2961 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -1,10 +1,14 @@ package config import ( + "context" "time" -) -// WARN: DO NOT COMPILE THIS FILE. THIS IS FOR FUTURE + "github.com/pion/interceptor" + "github.com/pion/webrtc/v4" + + "github.com/harshabose/simple_webrtc_comm/client" +) type DataChannelConfig struct { Label string `json:"label" yaml:"label"` @@ -16,8 +20,8 @@ type DataChannelConfig struct { } type MediaConfig struct { - Audio []AudioConfig `json:"audio" yaml:"audio"` - Video []VideoConfig `json:"video" yaml:"video"` + Audio []AudioConfig `json:"audio,omitempty" yaml:"audio,omitempty"` + Video []VideoConfig `json:"video,omitempty" yaml:"video,omitempty"` } type AudioConfig struct { @@ -54,10 +58,13 @@ type LoggingConfig struct { } type Config struct { - DataChannels []DataChannelConfig `json:"data_channels" yaml:"data_channels"` - Media MediaConfig `json:"media" yaml:"media"` + ClientConfig client.ClientConfig `json:"client_config" yaml:"client_config"` + PeerConnectionConfigs []client.PeerConnectionConfig `json:"peer_connection_configs" yaml:"peer_connection_configs"` + + MediaEngine webrtc.MediaEngine `json:"-"` + InterceptorRegistry interceptor.Registry `json:"-"` + SettingsEngine webrtc.SettingEngine `json:"-"` - // Security configuration // Security SecurityConfig `json:"security" yaml:"security"` Logging LoggingConfig `json:"logging" yaml:"logging"` @@ -65,3 +72,32 @@ type Config struct { UserID string `json:"user_id,omitempty" yaml:"user_id,omitempty"` RoomID string `json:"room_id,omitempty" yaml:"room_id,omitempty"` } + +func (c *Config) GenerateClient(ctx context.Context, cancel context.CancelFunc) (*client.Client, error) { + return client.NewClientFromConfig(ctx, cancel, &c.MediaEngine, &c.InterceptorRegistry, &c.SettingsEngine, &c.ClientConfig) +} + +func (c *Config) GeneratePeerConnection(client *client.Client, pcc client.PeerConnectionConfig) (*client.PeerConnection, error) { + pc, err := client.CreatePeerConnection(pcc.Name, pcc.RTCConfig, pcc.ToOptions()...) + if err != nil { + return nil, err + } + + if err := pcc.CreateDataChannels(pc); err != nil { + // TODO: REMOVE PEER CONNECTION + return nil, err + } + + if err := pcc.CreateMediaSources(pc); err != nil { + // TODO: REMOVE PEER CONNECTION + return nil, err + } + + if err := pcc.CreateMediaSinks(pc); err != nil { + // TODO: REMOVE PEER CONNECTION + return nil, err + } + + return pc, nil + +} diff --git a/pkg/message/ident.go b/pkg/message/ident.go index 2171483..3ab7cc7 100644 --- a/pkg/message/ident.go +++ b/pkg/message/ident.go @@ -8,7 +8,7 @@ const IdentProtocol Protocol = "ident" type Ident struct { BaseMessage - Config config.Config `json:"config"` + Config *config.Config `json:"config"` } func (m *Ident) GetProtocol() Protocol { diff --git a/pkg/message/session.go b/pkg/message/session.go index 84ce97f..38b028f 100644 --- a/pkg/message/session.go +++ b/pkg/message/session.go @@ -15,10 +15,13 @@ import ( ) type Session struct { - config config.Config - Conn *websocket.Conn - PeerConnection *client.Client - mux sync.RWMutex + config *config.Config + Conn *websocket.Conn + Client *client.Client + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + mux sync.RWMutex } func (s *Session) Write(ctx context.Context, msg Message) error { @@ -34,10 +37,13 @@ func (s *Session) Write(ctx context.Context, msg Message) error { return nil } -func NewSessionWithIdent(ident *Ident, conn *websocket.Conn, client *client.Client) *Session { +func NewSessionWithIdent(ctx context.Context, ident *Ident, conn *websocket.Conn, client *client.Client) *Session { + ctx2, cancel2 := context.WithCancel(ctx) return &Session{ - config: ident.Config, - Conn: conn, - PeerConnection: client, + config: ident.Config, + Conn: conn, + Client: client, + ctx: ctx2, + cancel: cancel2, } }