mirror of
https://github.com/harshabose/client.git
synced 2025-09-26 19:31:20 +08:00
Compare commits
2 Commits
6cdcd27197
...
fbb0c5526f
Author | SHA1 | Date | |
---|---|---|---|
![]() |
fbb0c5526f | ||
![]() |
a8f9294f8f |
17
client.go
17
client.go
@@ -20,7 +20,11 @@ type Client struct {
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
func CreateClient(ctx context.Context, cancel context.CancelFunc, mediaEngine *webrtc.MediaEngine, interceptorRegistry *interceptor.Registry, settings *webrtc.SettingEngine, options ...ClientOption) (*Client, error) {
|
||||
func NewClient(
|
||||
ctx context.Context, cancel context.CancelFunc,
|
||||
mediaEngine *webrtc.MediaEngine, interceptorRegistry *interceptor.Registry,
|
||||
settings *webrtc.SettingEngine, options ...ClientOption,
|
||||
) (*Client, error) {
|
||||
if mediaEngine == nil {
|
||||
mediaEngine = &webrtc.MediaEngine{}
|
||||
}
|
||||
@@ -54,6 +58,17 @@ func CreateClient(ctx context.Context, cancel context.CancelFunc, mediaEngine *w
|
||||
return peerConnections, nil
|
||||
}
|
||||
|
||||
func NewClientFromConfig(
|
||||
ctx context.Context, cancel context.CancelFunc,
|
||||
mediaEngine *webrtc.MediaEngine, interceptorRegistry *interceptor.Registry,
|
||||
settings *webrtc.SettingEngine, config *ClientConfig,
|
||||
) (*Client, error) {
|
||||
return NewClient(
|
||||
ctx, cancel, mediaEngine, interceptorRegistry, settings,
|
||||
config.ToOptions()..., // The magic spread operator!
|
||||
)
|
||||
}
|
||||
|
||||
func (client *Client) CreatePeerConnection(label string, config webrtc.Configuration, options ...PeerConnectionOption) (*PeerConnection, error) {
|
||||
var err error
|
||||
|
||||
|
52
client_constants.go
Normal file
52
client_constants.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/interceptor/pkg/nack"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
const (
|
||||
H264PayloadType webrtc.PayloadType = 102
|
||||
H264RTXPayloadType webrtc.PayloadType = 103
|
||||
VP8PayloadType webrtc.PayloadType = 96
|
||||
VP8RTXPayloadType webrtc.PayloadType = 97
|
||||
OpusPayloadType webrtc.PayloadType = 111
|
||||
)
|
||||
|
||||
type NACKGeneratorOptions []nack.GeneratorOption
|
||||
|
||||
var (
|
||||
NACKGeneratorLowLatency NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(256), nack.GeneratorSkipLastN(2), nack.GeneratorMaxNacksPerPacket(1), nack.GeneratorInterval(10 * time.Millisecond)}
|
||||
NACKGeneratorDefault NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(512), nack.GeneratorSkipLastN(5), nack.GeneratorMaxNacksPerPacket(2), nack.GeneratorInterval(50 * time.Millisecond)}
|
||||
NACKGeneratorHighQuality NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(4096), nack.GeneratorSkipLastN(10), nack.GeneratorMaxNacksPerPacket(5), nack.GeneratorInterval(100 * time.Millisecond)}
|
||||
NACKGeneratorLowBandwidth NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(256), nack.GeneratorSkipLastN(15), nack.GeneratorMaxNacksPerPacket(1), nack.GeneratorInterval(200 * time.Millisecond)}
|
||||
)
|
||||
|
||||
type NACKResponderOptions []nack.ResponderOption
|
||||
|
||||
var (
|
||||
NACKResponderLowLatency NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(256)}
|
||||
NACKResponderDefault NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(1024)}
|
||||
NACKResponderHighQuality NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(4096)}
|
||||
NACKResponderLowBandwidth NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(256)}
|
||||
)
|
||||
|
||||
type TWCCSenderInterval time.Duration
|
||||
|
||||
const (
|
||||
TWCCIntervalLowLatency = TWCCSenderInterval(100 * time.Millisecond)
|
||||
TWCCIntervalDefault = TWCCSenderInterval(200 * time.Millisecond)
|
||||
TWCCIntervalHighQuality = TWCCSenderInterval(300 * time.Millisecond)
|
||||
TWCCIntervalLowBandwidth = TWCCSenderInterval(500 * time.Millisecond)
|
||||
)
|
||||
|
||||
type RTCPReportInterval time.Duration
|
||||
|
||||
const (
|
||||
RTCPReportIntervalLowLatency = RTCPReportInterval(1 * time.Second)
|
||||
RTCPReportIntervalDefault = RTCPReportInterval(3 * time.Second)
|
||||
RTCPReportIntervalHighQuality = RTCPReportInterval(2 * time.Second)
|
||||
RTCPReportIntervalLowBandwidth = RTCPReportInterval(10 * time.Second)
|
||||
)
|
@@ -13,46 +13,13 @@ import (
|
||||
"github.com/pion/interceptor/pkg/twcc"
|
||||
"github.com/pion/sdp/v3"
|
||||
"github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource"
|
||||
)
|
||||
|
||||
type ClientOption = func(*Client) error
|
||||
|
||||
type PacketisationMode uint8
|
||||
|
||||
const (
|
||||
H264PayloadType webrtc.PayloadType = 102
|
||||
H264RTXPayloadType webrtc.PayloadType = 103
|
||||
VP8PayloadType webrtc.PayloadType = 96
|
||||
VP8RTXPayloadType webrtc.PayloadType = 97
|
||||
OpusPayloadType webrtc.PayloadType = 111
|
||||
)
|
||||
|
||||
const (
|
||||
PacketisationMode0 PacketisationMode = 0
|
||||
PacketisationMode1 PacketisationMode = 1
|
||||
PacketisationMode2 PacketisationMode = 2
|
||||
)
|
||||
|
||||
type ProfileLevel string
|
||||
|
||||
const (
|
||||
ProfileLevelBaseline21 ProfileLevel = "420015" // Level 2.1 (480p)
|
||||
ProfileLevelBaseline31 ProfileLevel = "42001f" // Level 3.1 (720p)
|
||||
ProfileLevelBaseline41 ProfileLevel = "420029" // Level 4.1 (1080p)
|
||||
ProfileLevelBaseline42 ProfileLevel = "42002a" // Level 4.2 (2K)
|
||||
|
||||
ProfileLevelMain21 ProfileLevel = "4D0015" // Level 2.1
|
||||
ProfileLevelMain31 ProfileLevel = "4D001f" // Level 3.1
|
||||
ProfileLevelMain41 ProfileLevel = "4D0029" // Level 4.1
|
||||
ProfileLevelMain42 ProfileLevel = "4D002a" // Level 4.2
|
||||
|
||||
ProfileLevelHigh21 ProfileLevel = "640015" // Level 2.1
|
||||
ProfileLevelHigh31 ProfileLevel = "64001f" // Level 3.1
|
||||
ProfileLevelHigh41 ProfileLevel = "640029" // Level 4.1
|
||||
ProfileLevelHigh42 ProfileLevel = "64002a" // Level 4.2
|
||||
)
|
||||
|
||||
func WithH264MediaEngine(clockrate uint32, packetisationMode PacketisationMode, profileLevelID ProfileLevel, sps, pps string) ClientOption {
|
||||
func WithH264MediaEngine(clockrate uint32, packetisationMode mediasource.PacketisationMode, profileLevelID mediasource.ProfileLevel, sps, pps string) ClientOption {
|
||||
return func(client *Client) error {
|
||||
RTCPFeedback := []webrtc.RTCPFeedback{{Type: webrtc.TypeRTCPFBGoogREMB}, {Type: webrtc.TypeRTCPFBCCM, Parameter: "fir"}, {Type: webrtc.TypeRTCPFBNACK}, {Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"}}
|
||||
if err := client.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
|
||||
@@ -87,8 +54,6 @@ func WithH264MediaEngine(clockrate uint32, packetisationMode PacketisationMode,
|
||||
|
||||
func WithVP8MediaEngine(clockrate uint32) ClientOption {
|
||||
return func(client *Client) error {
|
||||
fmt.Println("setting up VP8 media codec in media engine ..")
|
||||
fmt.Println("setting up VP8 media codec in media engine with REMB, CCM, NACK AND NACK-PLI RTCP feedback ...")
|
||||
RTCPFeedback := []webrtc.RTCPFeedback{{Type: webrtc.TypeRTCPFBGoogREMB}, {Type: webrtc.TypeRTCPFBCCM, Parameter: "fir"}, {Type: webrtc.TypeRTCPFBNACK}, {Type: webrtc.TypeRTCPFBNACK, Parameter: "pli"}}
|
||||
if err := client.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
|
||||
RTPCodecCapability: webrtc.RTPCodecCapability{
|
||||
@@ -102,7 +67,6 @@ func WithVP8MediaEngine(clockrate uint32) ClientOption {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("setting up VP8 media codec RTX in media engine ...")
|
||||
if err := client.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
|
||||
RTPCodecCapability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeRTX,
|
||||
@@ -115,7 +79,6 @@ func WithVP8MediaEngine(clockrate uint32) ClientOption {
|
||||
return err
|
||||
}
|
||||
|
||||
fmt.Println("... done setting VP8 media codec in media enginer")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@@ -138,55 +101,32 @@ func WithDefaultInterceptorRegistry() ClientOption {
|
||||
}
|
||||
}
|
||||
|
||||
type StereoType uint8
|
||||
|
||||
const (
|
||||
Mono StereoType = 0
|
||||
Dual StereoType = 1
|
||||
)
|
||||
|
||||
func WithOpusMediaEngine(samplerate uint32, channelLayout uint16, stereo StereoType) ClientOption {
|
||||
func WithOpusMediaEngine(samplerate uint32, channelLayout uint16, stereo mediasource.StereoType) ClientOption {
|
||||
return func(client *Client) error {
|
||||
if err := client.mediaEngine.RegisterCodec(webrtc.RTPCodecParameters{
|
||||
RTPCodecCapability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeOpus,
|
||||
ClockRate: samplerate,
|
||||
Channels: channelLayout,
|
||||
SDPFmtpLine: fmt.Sprintf("minptime=10;useinbandfec=1;stereo=%d", stereo),
|
||||
MimeType: webrtc.MimeTypeOpus,
|
||||
ClockRate: samplerate,
|
||||
Channels: channelLayout,
|
||||
RTCPFeedback: nil,
|
||||
SDPFmtpLine: fmt.Sprintf("minptime=10;useinbandfec=1;stereo=%d", stereo),
|
||||
},
|
||||
PayloadType: 111,
|
||||
PayloadType: OpusPayloadType,
|
||||
}, webrtc.RTPCodecTypeAudio); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type NACKGeneratorOptions []nack.GeneratorOption
|
||||
|
||||
var (
|
||||
NACKGeneratorLowLatency NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(256), nack.GeneratorSkipLastN(2), nack.GeneratorMaxNacksPerPacket(1), nack.GeneratorInterval(50 * time.Millisecond)}
|
||||
NACKGeneratorDefault NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(512), nack.GeneratorSkipLastN(5), nack.GeneratorMaxNacksPerPacket(2), nack.GeneratorInterval(100 * time.Millisecond)}
|
||||
NACKGeneratorHighQuality NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(2048), nack.GeneratorSkipLastN(10), nack.GeneratorMaxNacksPerPacket(3), nack.GeneratorInterval(200 * time.Millisecond)}
|
||||
NACKGeneratorLowBandwidth NACKGeneratorOptions = []nack.GeneratorOption{nack.GeneratorSize(4096), nack.GeneratorSkipLastN(15), nack.GeneratorMaxNacksPerPacket(4), nack.GeneratorInterval(150 * time.Millisecond)}
|
||||
)
|
||||
|
||||
type NACKResponderOptions []nack.ResponderOption
|
||||
|
||||
var (
|
||||
NACKResponderLowLatency NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(256), nack.DisableCopy()}
|
||||
NACKResponderDefault NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(1024)}
|
||||
NACKResponderHighQuality NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(2048)}
|
||||
NACKResponderLowBandwidth NACKResponderOptions = []nack.ResponderOption{nack.ResponderSize(4096)}
|
||||
)
|
||||
|
||||
func WithNACKInterceptor(generatorOptions NACKGeneratorOptions, responderOptions NACKResponderOptions) ClientOption {
|
||||
return func(client *Client) error {
|
||||
generator, err := nack.NewGeneratorInterceptor()
|
||||
generator, err := nack.NewGeneratorInterceptor(generatorOptions...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
responder, err := nack.NewResponderInterceptor()
|
||||
responder, err := nack.NewResponderInterceptor(responderOptions...)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -200,15 +140,6 @@ func WithNACKInterceptor(generatorOptions NACKGeneratorOptions, responderOptions
|
||||
}
|
||||
}
|
||||
|
||||
type TWCCSenderInterval time.Duration
|
||||
|
||||
const (
|
||||
TWCCIntervalLowLatency = TWCCSenderInterval(200 * time.Millisecond)
|
||||
TWCCIntervalDefault = TWCCSenderInterval(100 * time.Millisecond)
|
||||
TWCCIntervalHighQuality = TWCCSenderInterval(200 * time.Millisecond)
|
||||
TWCCIntervalLowBandwidth = TWCCSenderInterval(500 * time.Millisecond)
|
||||
)
|
||||
|
||||
func WithTWCCSenderInterceptor(interval TWCCSenderInterval) ClientOption {
|
||||
return func(client *Client) error {
|
||||
client.mediaEngine.RegisterFeedback(webrtc.RTCPFeedback{Type: webrtc.TypeRTCPFBTransportCC}, webrtc.RTPCodecTypeVideo)
|
||||
@@ -248,15 +179,6 @@ func WithJitterBufferInterceptor() ClientOption {
|
||||
}
|
||||
}
|
||||
|
||||
type RTCPReportInterval time.Duration
|
||||
|
||||
const (
|
||||
RTCPReportIntervalLowLatency = RTCPReportInterval(1 * time.Second)
|
||||
RTCPReportIntervalDefault = RTCPReportInterval(1 * time.Second)
|
||||
RTCPReportIntervalHighQuality = RTCPReportInterval(1500 * time.Millisecond)
|
||||
RTCPReportIntervalLowBandwidth = RTCPReportInterval(2 * time.Second)
|
||||
)
|
||||
|
||||
func WithRTCPReportsInterceptor(interval RTCPReportInterval) ClientOption {
|
||||
return func(client *Client) error {
|
||||
receiver, err := report.NewReceiverInterceptor(report.ReceiverInterval(time.Duration(interval)))
|
||||
@@ -300,7 +222,7 @@ func WithSimulcastExtensionHeaders() ClientOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithBandwidthControlInterceptor(initialBitrate, minimumBitrate, maximumBitrate int64, interval time.Duration) ClientOption {
|
||||
func WithBandwidthControlInterceptor(initialBitrate, minimumBitrate, maximumBitrate uint64, interval time.Duration) ClientOption {
|
||||
return func(client *Client) error {
|
||||
congestionController, err := cc.NewInterceptor(func() (cc.BandwidthEstimator, error) {
|
||||
return gcc.NewSendSideBWE(gcc.SendSideBWEInitialBitrate(int(initialBitrate)), gcc.SendSideBWEMinBitrate(int(minimumBitrate)), gcc.SendSideBWEMaxBitrate(int(maximumBitrate)))
|
||||
|
468
config.go
Normal file
468
config.go
Normal file
@@ -0,0 +1,468 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/datachannel"
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink"
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource"
|
||||
)
|
||||
|
||||
type ClientConfig struct {
|
||||
// Media configuration
|
||||
H264 *H264Config `json:"h264,omitempty"`
|
||||
VP8 *VP8Config `json:"vp8,omitempty"`
|
||||
Opus *OpusConfig `json:"opus,omitempty"`
|
||||
|
||||
// Interceptor configurations
|
||||
NACK *NACKPreset `json:"nack,omitempty"`
|
||||
RTCPReports *RTCPReportsPreset `json:"rtcp_reports,omitempty"`
|
||||
TWCC *TWCCPreset `json:"twcc,omitempty"`
|
||||
Bandwidth *BandwidthConfig `json:"bandwidth,omitempty"`
|
||||
|
||||
// Feature flags
|
||||
SimulcastExtensions bool `json:"simulcast_extensions,omitempty"`
|
||||
TWCCHeaderExtension bool `json:"twcc_header_extension,omitempty"`
|
||||
}
|
||||
|
||||
type H264Config struct {
|
||||
ClockRate uint32 `json:"clock_rate"`
|
||||
PacketisationMode mediasource.PacketisationMode `json:"packetisation_mode"`
|
||||
ProfileLevel mediasource.ProfileLevel `json:"profile_level"`
|
||||
SPSBase64 string `json:"sps_base64"`
|
||||
PPSBase64 string `json:"pps_base64"`
|
||||
}
|
||||
|
||||
type VP8Config struct {
|
||||
ClockRate uint32 `json:"clock_rate"`
|
||||
}
|
||||
|
||||
type OpusConfig struct {
|
||||
SampleRate uint32 `json:"sample_rate"`
|
||||
ChannelLayout uint16 `json:"channel_layout"`
|
||||
Stereo mediasource.StereoType `json:"stereo"`
|
||||
}
|
||||
|
||||
type NACKPreset string
|
||||
type RTCPReportsPreset string
|
||||
type TWCCPreset string
|
||||
|
||||
const (
|
||||
NACKLowLatency NACKPreset = "low_latency"
|
||||
NACKDefault NACKPreset = "default"
|
||||
NACKHighQuality NACKPreset = "high_quality"
|
||||
NACKLowBandwidth NACKPreset = "low_bandwidth"
|
||||
|
||||
RTCPReportsLowLatency RTCPReportsPreset = "low_latency"
|
||||
RTCPReportsDefault RTCPReportsPreset = "default"
|
||||
RTCPReportsHighQuality RTCPReportsPreset = "high_quality"
|
||||
RTCPReportsLowBandwidth RTCPReportsPreset = "low_bandwidth"
|
||||
|
||||
TWCCLowLatency TWCCPreset = "low_latency"
|
||||
TWCCDefault TWCCPreset = "default"
|
||||
TWCCHighQuality TWCCPreset = "high_quality"
|
||||
TWCCLowBandwidth TWCCPreset = "low_bandwidth"
|
||||
)
|
||||
|
||||
type BandwidthConfig struct {
|
||||
Initial uint64 `json:"initial"`
|
||||
Minimum uint64 `json:"minimum"`
|
||||
Maximum uint64 `json:"maximum"`
|
||||
Interval time.Duration `json:"interval"`
|
||||
}
|
||||
|
||||
type optionBuilder struct {
|
||||
options []ClientOption
|
||||
}
|
||||
|
||||
func (ob *optionBuilder) add(option ClientOption) *optionBuilder {
|
||||
if option != nil {
|
||||
ob.options = append(ob.options, option)
|
||||
}
|
||||
return ob
|
||||
}
|
||||
|
||||
var (
|
||||
nackGeneratorPresets = map[NACKPreset]NACKGeneratorOptions{
|
||||
NACKLowLatency: NACKGeneratorLowLatency,
|
||||
NACKDefault: NACKGeneratorDefault,
|
||||
NACKHighQuality: NACKGeneratorHighQuality,
|
||||
NACKLowBandwidth: NACKGeneratorLowBandwidth,
|
||||
}
|
||||
|
||||
nackResponderPresets = map[NACKPreset]NACKResponderOptions{
|
||||
NACKLowLatency: NACKResponderLowLatency,
|
||||
NACKDefault: NACKResponderDefault,
|
||||
NACKHighQuality: NACKResponderHighQuality,
|
||||
NACKLowBandwidth: NACKResponderLowBandwidth,
|
||||
}
|
||||
|
||||
rtcpReportsPresets = map[RTCPReportsPreset]RTCPReportInterval{
|
||||
RTCPReportsLowLatency: RTCPReportIntervalLowLatency,
|
||||
RTCPReportsDefault: RTCPReportIntervalDefault,
|
||||
RTCPReportsHighQuality: RTCPReportIntervalHighQuality,
|
||||
RTCPReportsLowBandwidth: RTCPReportIntervalLowBandwidth,
|
||||
}
|
||||
|
||||
twccPresets = map[TWCCPreset]TWCCSenderInterval{
|
||||
TWCCLowLatency: TWCCIntervalLowLatency,
|
||||
TWCCDefault: TWCCIntervalDefault,
|
||||
TWCCHighQuality: TWCCIntervalHighQuality,
|
||||
TWCCLowBandwidth: TWCCIntervalLowBandwidth,
|
||||
}
|
||||
)
|
||||
|
||||
func (c *ClientConfig) ToOptions() []ClientOption {
|
||||
builder := &optionBuilder{}
|
||||
|
||||
return builder.
|
||||
add(c.h264Option()).
|
||||
add(c.vp8Option()).
|
||||
add(c.opusOption()).
|
||||
add(c.nackOption()).
|
||||
add(c.rtcpReportsOption()).
|
||||
add(c.twccOption()).
|
||||
add(c.bandwidthOption()).
|
||||
add(c.simulcastOption()).
|
||||
add(c.twccHeaderOption()).
|
||||
options
|
||||
}
|
||||
|
||||
func (c *ClientConfig) h264Option() ClientOption {
|
||||
if c.H264 == nil {
|
||||
return nil
|
||||
}
|
||||
return WithH264MediaEngine(
|
||||
c.H264.ClockRate,
|
||||
c.H264.PacketisationMode,
|
||||
c.H264.ProfileLevel,
|
||||
c.H264.SPSBase64,
|
||||
c.H264.PPSBase64,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) vp8Option() ClientOption {
|
||||
if c.VP8 == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return WithVP8MediaEngine(c.VP8.ClockRate)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) opusOption() ClientOption {
|
||||
if c.Opus == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return WithOpusMediaEngine(
|
||||
c.Opus.SampleRate,
|
||||
c.Opus.ChannelLayout,
|
||||
c.Opus.Stereo,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) nackOption() ClientOption {
|
||||
if c.NACK == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
generator, generatorExists := nackGeneratorPresets[*c.NACK]
|
||||
responder, responderExists := nackResponderPresets[*c.NACK]
|
||||
|
||||
if !generatorExists || !responderExists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return WithNACKInterceptor(generator, responder)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) rtcpReportsOption() ClientOption {
|
||||
if c.RTCPReports == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
interval, exists := rtcpReportsPresets[*c.RTCPReports]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return WithRTCPReportsInterceptor(interval)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) twccOption() ClientOption {
|
||||
if c.TWCC == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
interval, exists := twccPresets[*c.TWCC]
|
||||
if !exists {
|
||||
return nil
|
||||
}
|
||||
|
||||
return WithTWCCSenderInterceptor(interval)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) bandwidthOption() ClientOption {
|
||||
if c.Bandwidth == nil {
|
||||
return nil
|
||||
}
|
||||
return WithBandwidthControlInterceptor(
|
||||
c.Bandwidth.Initial,
|
||||
c.Bandwidth.Minimum,
|
||||
c.Bandwidth.Maximum,
|
||||
c.Bandwidth.Interval,
|
||||
)
|
||||
}
|
||||
|
||||
func (c *ClientConfig) simulcastOption() ClientOption {
|
||||
if !c.SimulcastExtensions {
|
||||
return nil
|
||||
}
|
||||
return WithSimulcastExtensionHeaders()
|
||||
}
|
||||
|
||||
func (c *ClientConfig) twccHeaderOption() ClientOption {
|
||||
if !c.TWCCHeaderExtension {
|
||||
return nil
|
||||
}
|
||||
return WithTWCCHeaderExtensionSender()
|
||||
}
|
||||
|
||||
type PeerConnectionConfig struct {
|
||||
// Basic settings
|
||||
Name string `json:"name"`
|
||||
|
||||
// RTC and Singnaling Control
|
||||
FirebaseOfferSignal *bool `json:"firebase_offer_signal,omitempty"`
|
||||
FirebaseOfferAnswer *bool `json:"firebase_offer_answer"`
|
||||
RTCConfig webrtc.Configuration `json:"rtc_config"`
|
||||
|
||||
// Declarative resource definitions
|
||||
DataChannels []DataChannelSpec `json:"data_channels_specs,omitempty"`
|
||||
MediaSources []MediaSourceSpec `json:"media_sources_specs,omitempty"`
|
||||
MediaSinks []MediaSinkSpec `json:"media_sinks_specs,omitempty"`
|
||||
}
|
||||
|
||||
type DataChannelSpec struct {
|
||||
Label string `json:"label"`
|
||||
ID *uint16 `json:"id,omitempty"`
|
||||
Ordered *bool `json:"ordered,omitempty"`
|
||||
Protocol *string `json:"protocol,omitempty"`
|
||||
Negotiated *bool `json:"negotiated,omitempty"`
|
||||
MaxPacketLifeTime *uint16 `json:"max_packet_life_time,omitempty"`
|
||||
MaxRetransmits *uint16 `json:"max_retransmits,omitempty"`
|
||||
}
|
||||
|
||||
type MediaSourceSpec struct {
|
||||
Name string `json:"name"`
|
||||
H264 *H264TrackConfig `json:"h264,omitempty"`
|
||||
VP8 *VP8TrackConfig `json:"vp8,omitempty"`
|
||||
Opus *OpusTrackConfig `json:"opus,omitempty"`
|
||||
Priority *mediasource.Priority `json:"priority,omitempty"`
|
||||
}
|
||||
|
||||
type trackOptionBuilder struct {
|
||||
options []mediasource.TrackOption
|
||||
}
|
||||
|
||||
func (ob *trackOptionBuilder) add(option mediasource.TrackOption) *trackOptionBuilder {
|
||||
if option != nil {
|
||||
ob.options = append(ob.options, option)
|
||||
}
|
||||
return ob
|
||||
}
|
||||
|
||||
func (c *MediaSourceSpec) withTrackOption() mediasource.TrackOption {
|
||||
if c.H264 != nil {
|
||||
return mediasource.WithH264Track(c.H264.ClockRate, c.H264.PacketisationMode, c.H264.ProfileLevel)
|
||||
}
|
||||
|
||||
if c.VP8 != nil {
|
||||
return mediasource.WithVP8Track(c.VP8.ClockRate)
|
||||
}
|
||||
|
||||
if c.Opus != nil {
|
||||
return mediasource.WithOpusTrack(c.Opus.Samplerate, c.Opus.ChannelLayout, c.Opus.Stereo)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MediaSourceSpec) ToOptions() []mediasource.TrackOption {
|
||||
builder := trackOptionBuilder{}
|
||||
|
||||
return builder.add(c.withTrackOption()).add(c.withPriority()).options
|
||||
}
|
||||
|
||||
func (c *MediaSourceSpec) withPriority() mediasource.TrackOption {
|
||||
if c.Priority == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return mediasource.WithPriority(*c.Priority)
|
||||
}
|
||||
|
||||
// MediaSinkSpec defines a media sink to create
|
||||
type MediaSinkSpec struct {
|
||||
Name string `json:"name"`
|
||||
H264 *H264TrackConfig `json:"h264,omitempty"`
|
||||
VP8 *VP8TrackConfig `json:"vp8,omitempty"`
|
||||
Opus *OpusTrackConfig `json:"opus,omitempty"`
|
||||
}
|
||||
|
||||
type sinkOptionBuilder struct {
|
||||
options []mediasink.SinkOption
|
||||
}
|
||||
|
||||
func (ob *sinkOptionBuilder) add(option mediasink.SinkOption) *sinkOptionBuilder {
|
||||
if option != nil {
|
||||
ob.options = append(ob.options, option)
|
||||
}
|
||||
return ob
|
||||
}
|
||||
|
||||
func (c *MediaSinkSpec) withTrackOption() mediasink.SinkOption {
|
||||
if c.H264 != nil {
|
||||
return mediasink.WithH264Track(c.H264.ClockRate)
|
||||
}
|
||||
|
||||
if c.VP8 != nil {
|
||||
return mediasink.WithVP8Track(c.VP8.ClockRate)
|
||||
}
|
||||
|
||||
if c.Opus != nil {
|
||||
return mediasink.WithOpusTrack(c.Opus.Samplerate, c.Opus.ChannelLayout)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *MediaSinkSpec) ToOptions() []mediasink.SinkOption {
|
||||
builder := sinkOptionBuilder{}
|
||||
|
||||
return builder.add(c.withTrackOption()).options
|
||||
}
|
||||
|
||||
type H264TrackConfig struct {
|
||||
ClockRate uint32 `json:"clock_rate"`
|
||||
PacketisationMode mediasource.PacketisationMode `json:"packetisation_mode"`
|
||||
ProfileLevel mediasource.ProfileLevel `json:"profile_level"`
|
||||
}
|
||||
|
||||
type VP8TrackConfig struct {
|
||||
ClockRate uint32 `json:"clock_rate"`
|
||||
}
|
||||
|
||||
type OpusTrackConfig struct {
|
||||
Samplerate uint32 `json:"samplerate"`
|
||||
ChannelLayout uint16 `json:"channel_layout"`
|
||||
Stereo mediasource.StereoType `json:"stereo"`
|
||||
}
|
||||
|
||||
type pcOptionBuilder struct {
|
||||
options []PeerConnectionOption
|
||||
}
|
||||
|
||||
func (ob *pcOptionBuilder) add(option PeerConnectionOption) *pcOptionBuilder {
|
||||
if option != nil {
|
||||
ob.options = append(ob.options, option)
|
||||
}
|
||||
return ob
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) ToOptions() []PeerConnectionOption {
|
||||
builder := pcOptionBuilder{}
|
||||
|
||||
return builder.
|
||||
add(c.withFirebaseOfferSignal()).
|
||||
add(c.withFirebaseOfferAnswer()).
|
||||
add(c.withDataChannels()).
|
||||
add(c.withMediaSource()).
|
||||
add(c.withMediaSinks()).
|
||||
options
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) withFirebaseOfferSignal() PeerConnectionOption {
|
||||
if c.FirebaseOfferSignal == nil {
|
||||
return nil
|
||||
}
|
||||
return WithFirebaseOfferSignal
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) withFirebaseOfferAnswer() PeerConnectionOption {
|
||||
if c.FirebaseOfferAnswer == nil {
|
||||
return nil
|
||||
}
|
||||
return WithFirebaseAnswerSignal
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) withMediaSource() PeerConnectionOption {
|
||||
if len(c.MediaSources) == 0 {
|
||||
return nil
|
||||
}
|
||||
return WithMediaSources()
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) withMediaSinks() PeerConnectionOption {
|
||||
if len(c.MediaSinks) == 0 {
|
||||
return nil
|
||||
}
|
||||
return WithMediaSinks()
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) withDataChannels() PeerConnectionOption {
|
||||
if len(c.DataChannels) == 0 {
|
||||
return nil
|
||||
}
|
||||
return WithDataChannels()
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) CreateDataChannels(pc *PeerConnection) error {
|
||||
if len(c.DataChannels) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, config := range c.DataChannels {
|
||||
if _, err := pc.CreateDataChannel(config.Label, datachannel.WithDataChannelInit(&webrtc.DataChannelInit{
|
||||
Ordered: config.Ordered,
|
||||
MaxPacketLifeTime: config.MaxPacketLifeTime,
|
||||
MaxRetransmits: config.MaxRetransmits,
|
||||
Protocol: config.Protocol,
|
||||
Negotiated: config.Negotiated,
|
||||
ID: config.ID,
|
||||
})); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) CreateMediaSources(pc *PeerConnection) error {
|
||||
if len(c.MediaSources) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, config := range c.MediaSources {
|
||||
if _, err := pc.CreateMediaSource(c.Name, config.ToOptions()...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *PeerConnectionConfig) CreateMediaSinks(pc *PeerConnection) error {
|
||||
if len(c.MediaSinks) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, config := range c.MediaSinks {
|
||||
if _, err := pc.CreateMediaSink(c.Name, config.ToOptions()...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
18
config_cgo.go
Normal file
18
config_cgo.go
Normal file
@@ -0,0 +1,18 @@
|
||||
//go:build cgo_enabled
|
||||
|
||||
package client
|
||||
|
||||
// func (c *PeerConnectionConfig) withBandwidthControl() PeerConnectionOption {
|
||||
// if len(c.BWSubscriptions) == 0 {
|
||||
// return nil
|
||||
// }
|
||||
// return WithBandwidthControl()
|
||||
// }
|
||||
|
||||
// func (c *PeerConnectionConfig) ToOptions() []PeerConnectionOption {
|
||||
// builder := pcOptionBuilder{}
|
||||
//
|
||||
// return builder.
|
||||
// add(c.withFirebaseOfferSignal()).
|
||||
// options
|
||||
// }
|
@@ -25,16 +25,17 @@ type PeerConnection struct {
|
||||
}
|
||||
|
||||
func CreatePeerConnection(ctx context.Context, cancel context.CancelFunc, label string, api *webrtc.API, config webrtc.Configuration, options ...PeerConnectionOption) (*PeerConnection, error) {
|
||||
var err error
|
||||
pc := &PeerConnection{
|
||||
label: label,
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
|
||||
if pc.peerConnection, err = api.NewPeerConnection(config); err != nil {
|
||||
peerConnection, err := api.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pc.peerConnection = peerConnection
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(pc); err != nil {
|
||||
@@ -42,6 +43,10 @@ func CreatePeerConnection(ctx context.Context, cancel context.CancelFunc, label
|
||||
}
|
||||
}
|
||||
|
||||
if pc.signal == nil {
|
||||
return nil, errors.New("signaling protocol not provided")
|
||||
}
|
||||
|
||||
return pc.onConnectionStateChangeEvent().onICEConnectionStateChange().onICEGatheringStateChange().onICECandidate(), err
|
||||
}
|
||||
|
||||
@@ -94,11 +99,11 @@ func (pc *PeerConnection) onICECandidate() *PeerConnection {
|
||||
return pc
|
||||
}
|
||||
|
||||
func (pc *PeerConnection) CreateDataChannel(label string) (*datachannel.DataChannel, error) {
|
||||
func (pc *PeerConnection) CreateDataChannel(label string, options ...datachannel.Option) (*datachannel.DataChannel, error) {
|
||||
if pc.dataChannels == nil {
|
||||
return nil, errors.New("data channels are not enabled")
|
||||
}
|
||||
channel, err := pc.dataChannels.CreateDataChannel(label, pc.peerConnection)
|
||||
channel, err := pc.dataChannels.CreateDataChannel(label, pc.peerConnection, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -119,12 +124,12 @@ func (pc *PeerConnection) CreateMediaSource(label string, options ...mediasource
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (pc *PeerConnection) CreateMediaSink(label string, operator func(context.Context, *webrtc.TrackRemote) error) (*mediasink.Sink, error) {
|
||||
func (pc *PeerConnection) CreateMediaSink(label string, options ...mediasink.SinkOption) (*mediasink.Sink, error) {
|
||||
if pc.sinks == nil {
|
||||
return nil, errors.New("media sinks are not enabled")
|
||||
}
|
||||
|
||||
sink, err := pc.sinks.CreateSink(label, operator)
|
||||
sink, err := pc.sinks.CreateSink(label, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -133,6 +138,9 @@ func (pc *PeerConnection) CreateMediaSink(label string, operator func(context.Co
|
||||
}
|
||||
|
||||
func (pc *PeerConnection) Connect(category string) error {
|
||||
if pc.signal == nil {
|
||||
return errors.New("no signaling protocol provided")
|
||||
}
|
||||
if err := pc.signal.Connect(category, pc.label); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@@ -1,6 +1,8 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/datachannel"
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasink"
|
||||
"github.com/harshabose/simple_webrtc_comm/client/pkg/mediasource"
|
||||
@@ -9,17 +11,26 @@ import (
|
||||
type PeerConnectionOption = func(*PeerConnection) error
|
||||
|
||||
func WithFirebaseOfferSignal(connection *PeerConnection) error {
|
||||
if connection.signal != nil {
|
||||
return errors.New("multiple options for signaling were provided. this is not supported")
|
||||
}
|
||||
connection.signal = CreateFirebaseOfferSignal(connection.ctx, connection)
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithFirebaseAnswerSignal(connection *PeerConnection) error {
|
||||
if connection.signal != nil {
|
||||
return errors.New("multiple options for signaling were provided. this is not supported")
|
||||
}
|
||||
connection.signal = CreateFirebaseAnswerSignal(connection.ctx, connection)
|
||||
return nil
|
||||
}
|
||||
|
||||
func WithFileOfferSignal(offerPath, answerPath string) PeerConnectionOption {
|
||||
return func(connection *PeerConnection) error {
|
||||
if connection.signal != nil {
|
||||
return errors.New("multiple options for signaling were provided. this is not supported")
|
||||
}
|
||||
connection.signal = CreateFileOfferSignal(connection.ctx, connection, offerPath, answerPath)
|
||||
return nil
|
||||
}
|
||||
@@ -27,6 +38,9 @@ func WithFileOfferSignal(offerPath, answerPath string) PeerConnectionOption {
|
||||
|
||||
func WithFileAnswerSignal(offerPath, answerPath string) PeerConnectionOption {
|
||||
return func(connection *PeerConnection) error {
|
||||
if connection.signal != nil {
|
||||
return errors.New("multiple options for signaling were provided. this is not supported")
|
||||
}
|
||||
connection.signal = CreateFileAnswerSignal(connection.ctx, connection, offerPath, answerPath)
|
||||
return nil
|
||||
}
|
||||
|
@@ -11,32 +11,31 @@ import (
|
||||
type DataChannel struct {
|
||||
label string
|
||||
datachannel *webrtc.DataChannel
|
||||
init *webrtc.DataChannelInit
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func CreateDataChannel(ctx context.Context, label string, peerConnection *webrtc.PeerConnection) (*DataChannel, error) {
|
||||
datachannel := &DataChannel{
|
||||
func CreateDataChannel(ctx context.Context, label string, peerConnection *webrtc.PeerConnection, options ...Option) (*DataChannel, error) {
|
||||
dc := &DataChannel{
|
||||
label: label,
|
||||
datachannel: nil,
|
||||
ctx: ctx,
|
||||
}
|
||||
var (
|
||||
dataChannelNegotiated = true
|
||||
dataChannelProtocol = "binary"
|
||||
dataChannelOrdered = true
|
||||
dataChannelInit = webrtc.DataChannelInit{
|
||||
Negotiated: &dataChannelNegotiated,
|
||||
Protocol: &dataChannelProtocol,
|
||||
Ordered: &dataChannelOrdered,
|
||||
}
|
||||
err error
|
||||
)
|
||||
|
||||
if datachannel.datachannel, err = peerConnection.CreateDataChannel(label, &dataChannelInit); err != nil {
|
||||
for _, option := range options {
|
||||
if err := option(dc); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
datachannel, err := peerConnection.CreateDataChannel(label, dc.init)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return datachannel.onOpen().onClose(), nil
|
||||
dc.datachannel = datachannel
|
||||
|
||||
return dc.onOpen().onClose(), nil
|
||||
}
|
||||
|
||||
func CreateRawDataChannel(ctx context.Context, channel *webrtc.DataChannel) (*DataChannel, error) {
|
||||
@@ -93,12 +92,12 @@ func CreateDataChannels(ctx context.Context) *DataChannels {
|
||||
}
|
||||
}
|
||||
|
||||
func (dataChannels *DataChannels) CreateDataChannel(label string, peerConnection *webrtc.PeerConnection) (*DataChannel, error) {
|
||||
func (dataChannels *DataChannels) CreateDataChannel(label string, peerConnection *webrtc.PeerConnection, options ...Option) (*DataChannel, error) {
|
||||
if _, exits := dataChannels.datachannel[label]; exits {
|
||||
return nil, fmt.Errorf("datachannel with id = '%s' already exists", label)
|
||||
}
|
||||
|
||||
channel, err := CreateDataChannel(dataChannels.ctx, label, peerConnection)
|
||||
channel, err := CreateDataChannel(dataChannels.ctx, label, peerConnection, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
12
pkg/datachannel/options.go
Normal file
12
pkg/datachannel/options.go
Normal file
@@ -0,0 +1,12 @@
|
||||
package datachannel
|
||||
|
||||
import "github.com/pion/webrtc/v4"
|
||||
|
||||
type Option = func(*DataChannel) error
|
||||
|
||||
func WithDataChannelInit(init *webrtc.DataChannelInit) Option {
|
||||
return func(channel *DataChannel) error {
|
||||
channel.init = init
|
||||
return nil
|
||||
}
|
||||
}
|
56
pkg/mediasink/options.go
Normal file
56
pkg/mediasink/options.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package mediasink
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type SinkOption = func(*Sink) error
|
||||
|
||||
// TODO: CLOCKRATE, STEREO, PROFILE etc ARE IN MEDIA SOURCE. MAYBE BE INCLUDE THEM HERE?
|
||||
|
||||
func WithH264Track(clockrate uint32) SinkOption {
|
||||
return func(track *Sink) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecParameters{}
|
||||
track.codecCapability.PayloadType = webrtc.PayloadType(102)
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeH264
|
||||
track.codecCapability.ClockRate = clockrate
|
||||
track.codecCapability.Channels = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithVP8Track(clockrate uint32) SinkOption {
|
||||
return func(track *Sink) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecParameters{}
|
||||
track.codecCapability.PayloadType = webrtc.PayloadType(96)
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeVP8
|
||||
track.codecCapability.ClockRate = clockrate
|
||||
track.codecCapability.Channels = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpusTrack(samplerate uint32, channelLayout uint16) SinkOption {
|
||||
return func(track *Sink) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecParameters{}
|
||||
track.codecCapability.PayloadType = webrtc.PayloadType(111)
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeOpus
|
||||
track.codecCapability.ClockRate = samplerate
|
||||
track.codecCapability.Channels = channelLayout
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
@@ -1,31 +0,0 @@
|
||||
package mediasink
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/harshabose/mediapipe"
|
||||
"github.com/harshabose/mediapipe/pkg/duplexers"
|
||||
"github.com/harshabose/mediapipe/pkg/generators"
|
||||
)
|
||||
|
||||
func RTSPSink(config *duplexers.RTSPClientConfig) func(context.Context, *webrtc.TrackRemote) error {
|
||||
return func(ctx context.Context, remote *webrtc.TrackRemote) error {
|
||||
client, err := duplexers.NewRTSPClient(ctx, config, nil, duplexers.WithOptionsFromRemote(remote))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
client.Start()
|
||||
time.Sleep(5 * time.Second)
|
||||
|
||||
r := mediapipe.NewIdentityAnyReader(generators.NewPionRTPGenerator(remote))
|
||||
w := mediapipe.NewIdentityAnyWriter[*rtp.Packet](client)
|
||||
|
||||
mediapipe.NewAnyPipe(ctx, r, w).Start()
|
||||
return nil
|
||||
}
|
||||
}
|
@@ -2,18 +2,88 @@ package mediasink
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
|
||||
"github.com/harshabose/mediapipe/pkg/generators"
|
||||
)
|
||||
|
||||
type Sink struct {
|
||||
operator func(context.Context, *webrtc.TrackRemote) error
|
||||
generator generators.CanGeneratePionRTPPacket
|
||||
codecCapability *webrtc.RTPCodecParameters
|
||||
rtpReceiver *webrtc.RTPReceiver
|
||||
mux sync.RWMutex
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (s *Sink) CreateSink(operator func(context.Context, *webrtc.TrackRemote) error) {
|
||||
s.operator = operator
|
||||
func CreateSink(ctx context.Context, options ...SinkOption) (*Sink, error) {
|
||||
sink := &Sink{ctx: ctx}
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(sink); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if sink.codecCapability == nil {
|
||||
return nil, errors.New("no sink capabilities given")
|
||||
}
|
||||
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
func (s *Sink) setGenerator(generator generators.CanGeneratePionRTPPacket) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
s.generator = generator
|
||||
}
|
||||
|
||||
func (s *Sink) setRTPReceiver(receiver *webrtc.RTPReceiver) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
s.rtpReceiver = receiver
|
||||
}
|
||||
|
||||
func (s *Sink) readRTPReceiver(rtcpBuf []byte) {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
|
||||
if s.rtpReceiver == nil {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
return
|
||||
}
|
||||
|
||||
if _, _, err := s.rtpReceiver.Read(rtcpBuf); err != nil {
|
||||
fmt.Printf("error while reading rtcp packets")
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Sink) rtpReceiverLoop() {
|
||||
// THIS IS NEEDED AS interceptors (pion) do not work
|
||||
for {
|
||||
rtcpBuf := make([]byte, 1500)
|
||||
s.readRTPReceiver(rtcpBuf)
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Sink) ReadRTP() (*rtp.Packet, interceptor.Attributes, error) {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
|
||||
if s.generator == nil {
|
||||
return nil, interceptor.Attributes{}, nil
|
||||
}
|
||||
|
||||
return s.generator.ReadRTP()
|
||||
}
|
||||
|
||||
type Sinks struct {
|
||||
@@ -34,39 +104,84 @@ func CreateSinks(ctx context.Context, pc *webrtc.PeerConnection) *Sinks {
|
||||
|
||||
func (s *Sinks) onTrack(pc *webrtc.PeerConnection) {
|
||||
pc.OnTrack(func(remote *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
|
||||
fmt.Println("triggered on track")
|
||||
s.mux.RLock()
|
||||
sink, exists := s.sinks[remote.ID()]
|
||||
if !exists {
|
||||
s.mux.RUnlock()
|
||||
fmt.Printf("ERROR: no sink set for track with id %s; ignoring track...\n", remote.ID())
|
||||
sink, err := s.GetSink(remote.ID())
|
||||
if err != nil {
|
||||
fmt.Println(err.Error())
|
||||
// TODO: MAYBE SET A DEFAULT SINK?
|
||||
return
|
||||
}
|
||||
s.mux.RUnlock()
|
||||
|
||||
if err := sink.operator(s.ctx, remote); err != nil {
|
||||
fmt.Printf("ERROR: failed to operate on track (id=%s); err: %v\n", remote.ID(), err)
|
||||
if !CompareRTPCodecParameters(remote.Codec(), *(sink.codecCapability)) {
|
||||
fmt.Println("sink registered codec did not match. skipping...")
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
for {
|
||||
rtcpBuf := make([]byte, 1500)
|
||||
if _, _, err := receiver.Read(rtcpBuf); err != nil {
|
||||
fmt.Printf("error while reading rtcp packets")
|
||||
}
|
||||
}
|
||||
}()
|
||||
sink.setRTPReceiver(receiver)
|
||||
sink.setGenerator(remote)
|
||||
|
||||
go sink.rtpReceiverLoop()
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Sinks) CreateSink(label string, operator func(context.Context, *webrtc.TrackRemote) error) (*Sink, error) {
|
||||
func (s *Sinks) CreateSink(label string, options ...SinkOption) (*Sink, error) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
if _, exists := s.sinks[label]; exists {
|
||||
return nil, fmt.Errorf("sink with id = '%s' already exists", label)
|
||||
return nil, fmt.Errorf("sink with id='%s' already exists", label)
|
||||
}
|
||||
|
||||
s.sinks[label] = &Sink{operator: operator}
|
||||
return s.sinks[label], nil
|
||||
sink, err := CreateSink(s.ctx, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s.sinks[label] = sink
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
func (s *Sinks) GetSink(label string) (*Sink, error) {
|
||||
s.mux.RLock()
|
||||
defer s.mux.RUnlock()
|
||||
|
||||
sink, exists := s.sinks[label]
|
||||
if !exists {
|
||||
return nil, fmt.Errorf("ERROR: no sink set for track with id %s; ignoring track...\n", label)
|
||||
}
|
||||
|
||||
return sink, nil
|
||||
}
|
||||
|
||||
func CompareRTPCodecParameters(a, b webrtc.RTPCodecParameters) bool {
|
||||
identical := true
|
||||
|
||||
if a.PayloadType != b.PayloadType {
|
||||
fmt.Printf("PayloadType differs: %v != %v\n", a.PayloadType, b.PayloadType)
|
||||
identical = false
|
||||
}
|
||||
|
||||
if a.MimeType != b.MimeType {
|
||||
fmt.Printf("MimeType differs: %s != %s\n", a.MimeType, b.MimeType)
|
||||
identical = false
|
||||
}
|
||||
|
||||
if a.ClockRate != b.ClockRate {
|
||||
fmt.Printf("ClockRate differs: %d != %d\n", a.ClockRate, b.ClockRate)
|
||||
identical = false
|
||||
}
|
||||
|
||||
if a.Channels != b.Channels {
|
||||
fmt.Printf("Channels differs: %d != %d\n", a.Channels, b.Channels)
|
||||
identical = false
|
||||
}
|
||||
|
||||
if a.SDPFmtpLine != b.SDPFmtpLine {
|
||||
fmt.Printf("SDPFmtpLine differs (ignored): %s != %s\n", a.SDPFmtpLine, b.SDPFmtpLine)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(a.RTCPFeedback, b.RTCPFeedback) {
|
||||
fmt.Printf("RTCPFeedback differs (ignored): %v != %v\n", a.RTCPFeedback, b.RTCPFeedback)
|
||||
}
|
||||
|
||||
return identical
|
||||
}
|
||||
|
46
pkg/mediasource/constants.go
Normal file
46
pkg/mediasource/constants.go
Normal file
@@ -0,0 +1,46 @@
|
||||
package mediasource
|
||||
|
||||
type PacketisationMode uint8
|
||||
|
||||
const (
|
||||
PacketisationMode0 PacketisationMode = 0
|
||||
PacketisationMode1 PacketisationMode = 1
|
||||
PacketisationMode2 PacketisationMode = 2
|
||||
)
|
||||
|
||||
type ProfileLevel string
|
||||
|
||||
const (
|
||||
ProfileLevelBaseline21 ProfileLevel = "420015" // Level 2.1 (480p)
|
||||
ProfileLevelBaseline31 ProfileLevel = "42001f" // Level 3.1 (720p)
|
||||
ProfileLevelBaseline41 ProfileLevel = "420029" // Level 4.1 (1080p)
|
||||
ProfileLevelBaseline42 ProfileLevel = "42002a" // Level 4.2 (2K)
|
||||
|
||||
ProfileLevelMain21 ProfileLevel = "4D0015" // Level 2.1
|
||||
ProfileLevelMain31 ProfileLevel = "4D001f" // Level 3.1
|
||||
ProfileLevelMain41 ProfileLevel = "4D0029" // Level 4.1
|
||||
ProfileLevelMain42 ProfileLevel = "4D002a" // Level 4.2
|
||||
|
||||
ProfileLevelHigh21 ProfileLevel = "640015" // Level 2.1
|
||||
ProfileLevelHigh31 ProfileLevel = "64001f" // Level 3.1
|
||||
ProfileLevelHigh41 ProfileLevel = "640029" // Level 4.1
|
||||
ProfileLevelHigh42 ProfileLevel = "64002a" // Level 4.2
|
||||
)
|
||||
|
||||
type StereoType uint8
|
||||
|
||||
const (
|
||||
StereoMono = StereoType(0)
|
||||
StereoDual = StereoType(1)
|
||||
)
|
||||
|
||||
type Priority uint8
|
||||
|
||||
const (
|
||||
Level0 Priority = 0
|
||||
Level1 Priority = 1
|
||||
Level2 Priority = 2
|
||||
Level3 Priority = 3
|
||||
Level4 Priority = 4
|
||||
Level5 Priority = 5
|
||||
)
|
58
pkg/mediasource/options.go
Normal file
58
pkg/mediasource/options.go
Normal file
@@ -0,0 +1,58 @@
|
||||
package mediasource
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type TrackOption = func(*Track) error
|
||||
|
||||
func WithH264Track(clockrate uint32, packetisationMode PacketisationMode, profileLevel ProfileLevel) TrackOption {
|
||||
return func(track *Track) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecCapability{}
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeH264
|
||||
track.codecCapability.ClockRate = clockrate
|
||||
track.codecCapability.Channels = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithVP8Track(clockrate uint32) TrackOption {
|
||||
return func(track *Track) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecCapability{}
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeVP8
|
||||
track.codecCapability.ClockRate = clockrate
|
||||
track.codecCapability.Channels = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithOpusTrack(samplerate uint32, channelLayout uint16, stereo StereoType) TrackOption {
|
||||
return func(track *Track) error {
|
||||
if track.codecCapability != nil {
|
||||
return errors.New("multiple tracks are not supported on single media source")
|
||||
}
|
||||
track.codecCapability = &webrtc.RTPCodecCapability{}
|
||||
track.codecCapability.MimeType = webrtc.MimeTypeOpus
|
||||
track.codecCapability.ClockRate = samplerate
|
||||
track.codecCapability.Channels = channelLayout
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithPriority(level Priority) TrackOption {
|
||||
return func(track *Track) error {
|
||||
track.priority = level
|
||||
return nil
|
||||
}
|
||||
}
|
@@ -2,27 +2,28 @@ package mediasource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
"github.com/pion/webrtc/v4/pkg/media"
|
||||
|
||||
_ "github.com/harshabose/mediapipe"
|
||||
"github.com/harshabose/mediapipe/pkg/consumers"
|
||||
)
|
||||
|
||||
// NO BUFFER IMPLEMENTATION
|
||||
|
||||
type Track struct {
|
||||
track *webrtc.TrackLocalStaticSample
|
||||
rtcCapability *webrtc.RTPCodecCapability
|
||||
rtpSender *webrtc.RTPSender
|
||||
priority Priority
|
||||
ctx context.Context
|
||||
consumer consumers.CanConsumePionSamplePacket
|
||||
codecCapability *webrtc.RTPCodecCapability
|
||||
rtpSender *webrtc.RTPSender
|
||||
priority Priority
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func CreateTrack(ctx context.Context, label string, peerConnection *webrtc.PeerConnection, options ...TrackOption) (*Track, error) {
|
||||
var err error
|
||||
track := &Track{ctx: ctx, rtcCapability: &webrtc.RTPCodecCapability{}}
|
||||
track := &Track{ctx: ctx}
|
||||
|
||||
for _, option := range options {
|
||||
if err := option(track); err != nil {
|
||||
@@ -30,21 +31,25 @@ func CreateTrack(ctx context.Context, label string, peerConnection *webrtc.PeerC
|
||||
}
|
||||
}
|
||||
|
||||
if track.track, err = webrtc.NewTrackLocalStaticSample(*track.rtcCapability, label, "webrtc"); err != nil {
|
||||
if track.codecCapability == nil {
|
||||
return nil, errors.New("no track capabilities given")
|
||||
}
|
||||
|
||||
consumer, err := webrtc.NewTrackLocalStaticSample(*track.codecCapability, label, "webrtc")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
track.consumer = consumer
|
||||
|
||||
if track.rtpSender, err = peerConnection.AddTrack(consumer); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if track.rtpSender, err = peerConnection.AddTrack(track.track); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
go track.rtpSenderLoop()
|
||||
|
||||
return track, nil
|
||||
}
|
||||
|
||||
func (track *Track) GetTrack() *webrtc.TrackLocalStaticSample {
|
||||
return track.track
|
||||
}
|
||||
|
||||
func (track *Track) GetPriority() Priority {
|
||||
return track.priority
|
||||
}
|
||||
@@ -59,8 +64,8 @@ func (track *Track) rtpSenderLoop() {
|
||||
}
|
||||
}
|
||||
|
||||
func (track *Track) Consume(sample media.Sample) error {
|
||||
if err := track.track.WriteSample(sample); err != nil {
|
||||
func (track *Track) WriteSample(sample media.Sample) error {
|
||||
if err := track.consumer.WriteSample(sample); err != nil {
|
||||
fmt.Printf("error while writing samples to track (id: ); err; %v. Continuing...", err)
|
||||
}
|
||||
|
||||
|
@@ -1,95 +0,0 @@
|
||||
package mediasource
|
||||
|
||||
import (
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type PacketisationMode uint8
|
||||
|
||||
const (
|
||||
PacketisationMode0 PacketisationMode = 0
|
||||
PacketisationMode1 PacketisationMode = 1
|
||||
PacketisationMode2 PacketisationMode = 2
|
||||
)
|
||||
|
||||
type ProfileLevel string
|
||||
|
||||
const (
|
||||
ProfileLevelBaseline21 ProfileLevel = "420015" // Level 2.1 (480p)
|
||||
ProfileLevelBaseline31 ProfileLevel = "42001f" // Level 3.1 (720p)
|
||||
ProfileLevelBaseline41 ProfileLevel = "420029" // Level 4.1 (1080p)
|
||||
ProfileLevelBaseline42 ProfileLevel = "42002a" // Level 4.2 (2K)
|
||||
|
||||
ProfileLevelMain21 ProfileLevel = "4D0015" // Level 2.1
|
||||
ProfileLevelMain31 ProfileLevel = "4D001f" // Level 3.1
|
||||
ProfileLevelMain41 ProfileLevel = "4D0029" // Level 4.1
|
||||
ProfileLevelMain42 ProfileLevel = "4D002a" // Level 4.2
|
||||
|
||||
ProfileLevelHigh21 ProfileLevel = "640015" // Level 2.1
|
||||
ProfileLevelHigh31 ProfileLevel = "64001f" // Level 3.1
|
||||
ProfileLevelHigh41 ProfileLevel = "640029" // Level 4.1
|
||||
ProfileLevelHigh42 ProfileLevel = "64002a" // Level 4.2
|
||||
)
|
||||
|
||||
type TrackOption = func(*Track) error
|
||||
|
||||
func WithH264Track(clockrate uint32, packetisationMode PacketisationMode, profileLevel ProfileLevel) TrackOption {
|
||||
return func(track *Track) error {
|
||||
|
||||
track.rtcCapability.MimeType = webrtc.MimeTypeH264
|
||||
track.rtcCapability.ClockRate = clockrate
|
||||
track.rtcCapability.Channels = 0
|
||||
// NOTE: NOT NEEDED
|
||||
// track.rtcCapability.SDPFmtpLine = track.rtcCapability.SDPFmtpLine + fmt.Sprintf("level-asymmetry-allowed=1;packetization-mode=%d;profile-level-id=%s", packetisationMode, profileLevel)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithVP8Track(clockrate uint32) TrackOption {
|
||||
return func(track *Track) error {
|
||||
track.rtcCapability.MimeType = webrtc.MimeTypeVP8
|
||||
track.rtcCapability.ClockRate = clockrate
|
||||
track.rtcCapability.Channels = 0
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type StereoType uint8
|
||||
|
||||
const (
|
||||
StereoMono = StereoType(0)
|
||||
StereoDual = StereoType(1)
|
||||
)
|
||||
|
||||
func WithOpusTrack(samplerate uint32, channelLayout uint16, stereo StereoType) TrackOption {
|
||||
return func(track *Track) error {
|
||||
|
||||
track.rtcCapability.MimeType = webrtc.MimeTypeOpus
|
||||
track.rtcCapability.ClockRate = samplerate
|
||||
track.rtcCapability.Channels = channelLayout
|
||||
// NOTE: NOT NEEDED
|
||||
// track.rtcCapability.SDPFmtpLine = track.rtcCapability.SDPFmtpLine + fmt.Sprintf("minptime=10;useinbandfec=1;stereo=%d", stereo)
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithPriority(level Priority) TrackOption {
|
||||
return func(track *Track) error {
|
||||
track.priority = level
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
type Priority uint8
|
||||
|
||||
const (
|
||||
Level0 Priority = 0
|
||||
Level1 Priority = 1
|
||||
Level2 Priority = 2
|
||||
Level3 Priority = 3
|
||||
Level4 Priority = 4
|
||||
Level5 Priority = 5
|
||||
)
|
@@ -122,9 +122,9 @@ var LowLatencyBitrateControlled = &X264Options{
|
||||
|
||||
X264AdvancedOptions: &X264AdvancedOptions{
|
||||
Bitrate: "800", // 800kbps
|
||||
VBVMaxBitrate: "800",
|
||||
VBVBuffer: "400",
|
||||
RateTolerance: "1", // 1% rate tolerance
|
||||
VBVMaxBitrate: "800", // Equal to Bitrate
|
||||
VBVBuffer: "400", // Half of Bitrate
|
||||
RateTolerance: "1", // 1% rate tolerance
|
||||
MaxGOP: "25",
|
||||
MinGOP: "13",
|
||||
// MaxQP: "80",
|
||||
|
Reference in New Issue
Block a user