commit e2e3a59faf18233db97d5ce9024ee75a6c037d75 Author: harshabose Date: Wed Feb 19 23:24:36 2025 +0530 first commit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b694f27 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +/third_party +.idea \ No newline at end of file diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..ac84c69 --- /dev/null +++ b/go.mod @@ -0,0 +1,34 @@ +module mediasource + +go 1.23 + +require ( + github.com/google/uuid v1.6.0 // indirect + github.com/pion/datachannel v1.5.10 // indirect + github.com/pion/dtls/v3 v3.0.4 // indirect + github.com/pion/ice/v4 v4.0.6 // indirect + github.com/pion/interceptor v0.1.37 // indirect + github.com/pion/logging v0.2.3 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.15 // indirect + github.com/pion/rtp v1.8.11 // indirect + github.com/pion/sctp v1.8.35 // indirect + github.com/pion/sdp/v3 v3.0.10 // indirect + github.com/pion/srtp/v3 v3.0.4 // indirect + github.com/pion/stun/v3 v3.0.0 // indirect + github.com/pion/transport/v3 v3.0.7 // indirect + github.com/pion/turn/v4 v4.0.0 // indirect + github.com/pion/webrtc/v4 v4.0.10 // indirect + github.com/wlynxg/anet v0.0.5 // indirect + golang.org/x/crypto v0.32.0 // indirect + golang.org/x/net v0.34.0 // indirect + golang.org/x/sys v0.29.0 // indirect + github.com/harshabose/simple_webrtc_comm/transcode v0.0.0 + github.com/harshabose/tools/buffer v0.0.0 +) + +replace ( + github.com/harshabose/simple_webrtc_comm/transcode => ../transcode + github.com/harshabose/tools/buffer => ../tools/buffer +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..3d22b96 --- /dev/null +++ b/go.sum @@ -0,0 +1,42 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= +github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= +github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= +github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= +github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM= +github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= +github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= +github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y= +github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI= +github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90= +github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM= +github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= +github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= +github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk= +github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4= +github.com/pion/sctp v1.8.35 h1:qwtKvNK1Wc5tHMIYgTDJhfZk7vATGVHhXbUDfHbYwzA= +github.com/pion/sctp v1.8.35/go.mod h1:EcXP8zCYVTRy3W9xtOF7wJm1L1aXfKRQzaM33SjQlzg= +github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA= +github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E= +github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M= +github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ= +github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw= +github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU= +github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0= +github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= +github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= +github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= +github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q= +github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck= +github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= +github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc= +golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc= +golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0= +golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k= +golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= +golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= diff --git a/internal/sample_pool.go b/internal/sample_pool.go new file mode 100644 index 0000000..5b99825 --- /dev/null +++ b/internal/sample_pool.go @@ -0,0 +1,52 @@ +package internal + +import ( + "sync" + + "github.com/harshabose/tools/buffer/pkg" + "github.com/pion/webrtc/v4/pkg/media" +) + +type samplePool struct { + pool sync.Pool +} + +func CreateSamplePool() buffer.Pool[media.Sample] { + return &samplePool{ + pool: sync.Pool{ + New: func() any { + return &media.Sample{} + }, + }, + } +} + +func (pool *samplePool) Get() *media.Sample { + packet, ok := pool.pool.Get().(*media.Sample) + + if packet == nil || !ok { + return &media.Sample{} + } + return packet +} + +func (pool *samplePool) Put(sample *media.Sample) { + if sample == nil { + return + } + pool.pool.Put(sample) +} + +func (pool *samplePool) Release() { + for { + sample, ok := pool.pool.Get().(*media.Sample) + if !ok { + continue + } + if sample == nil { + break + } + + sample = nil + } +} diff --git a/internal/stream.go b/internal/stream.go new file mode 100644 index 0000000..758c6b0 --- /dev/null +++ b/internal/stream.go @@ -0,0 +1,131 @@ +package internal + +import ( + "context" + "fmt" + "time" + + "github.com/asticode/go-astiav" + "github.com/harshabose/simple_webrtc_comm/transcode/pkg" + "github.com/harshabose/tools/buffer/pkg" + "github.com/pion/webrtc/v4/pkg/media" +) + +type Options struct { + DemuxerOptions []transcode.DemuxerOption + DecoderOptions []transcode.DecoderOption + FilterOptions []transcode.FilterOption + EncoderOptions []transcode.EncoderOption +} + +type Stream struct { + demuxer *transcode.Demuxer + decoder *transcode.Decoder + filter *transcode.Filter + encoder *transcode.Encoder + buffer buffer.BufferWithGenerator[media.Sample] + ctx context.Context +} + +func CreateStream(ctx context.Context, containerAddress string, options *Options) (*Stream, error) { + var ( + demuxer *transcode.Demuxer + decoder *transcode.Decoder + filter *transcode.Filter + encoder *transcode.Encoder + err error + ) + + if demuxer, err = transcode.CreateDemuxer(ctx, containerAddress, options.DemuxerOptions...); err != nil { + return nil, err + } + if decoder, err = transcode.CreateDecoder(ctx, demuxer, append([]transcode.DecoderOption{demuxer.GetDecoderContextOptions()}, options.DecoderOptions...)...); err != nil { + return nil, err + } + if filter, err = transcode.CreateFilter(ctx, decoder, transcode.VideoFilters, decoder.GetSrcFilterContextOptions(), transcode.WithDefaultVideoFilterContentOptions); err != nil { + return nil, err + } + if encoder, err = transcode.CreateEncoder(ctx, filter, transcode.WithLowLatencyVideoEncoderSetting); err != nil { + return nil, err + } + + fmt.Println("started encoder with settings:") + + return &Stream{ + demuxer: demuxer, + decoder: decoder, + filter: filter, + encoder: encoder, + buffer: buffer.CreateChannelBuffer(ctx, encoder.GetFPS()*3, CreateSamplePool()), + ctx: ctx, + }, nil +} + +func (stream *Stream) Start() { + stream.demuxer.Start() + stream.decoder.Start() + stream.filter.Start() + stream.encoder.Start() + go stream.loop() +} + +func (stream *Stream) loop() { + var ( + packet *astiav.Packet + err error + ) + + for { + select { + case <-stream.ctx.Done(): + return + case packet = <-stream.encoder.WaitForPacket(): + if err = stream.pushSample(stream.packetToSample(packet)); err != nil { + stream.encoder.PutBack(packet) + continue + } + + stream.encoder.PutBack(packet) + } + } +} + +func (stream *Stream) pushSample(sample *media.Sample) error { + ctx, cancel := context.WithTimeout(stream.ctx, time.Second) + defer cancel() + + return stream.buffer.Push(ctx, sample) +} + +func (stream *Stream) GetSample() (*media.Sample, error) { + ctx, cancel := context.WithTimeout(stream.ctx, time.Second) + defer cancel() + + return stream.buffer.Pop(ctx) +} + +func (stream *Stream) PutBack(sample *media.Sample) { + stream.buffer.PutBack(sample) +} + +func (stream *Stream) WaitForSample() chan *media.Sample { + return stream.buffer.GetChannel() +} + +func (stream *Stream) GetParameterSets() ([]byte, []byte) { + return stream.encoder.GetSPS(), stream.encoder.GetPPS() +} + +func (stream *Stream) packetToSample(packet *astiav.Packet) *media.Sample { + sample := stream.buffer.Generate() + + sample.Data = packet.Data() + sample.Timestamp = time.Now().UTC() + sample.Duration = time.Second / time.Duration(stream.encoder.GetFPS()) + sample.PacketTimestamp = uint32(float64(packet.Pts()) * float64(stream.encoder.GetVideoTimeBase()) * float64(time.Second)) + sample.PrevDroppedPackets = 0 + sample.Metadata = nil + sample.RTPHeaders = nil + + return sample +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..46a652c --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" +) + +// TIP

To run your code, right-click the code and select Run.

Alternatively, click +// the icon in the gutter and select the Run menu item from here.

+ +func main() { + // TIP

Press when your caret is at the underlined text + // to see how GoLand suggests fixing the warning.

Alternatively, if available, click the lightbulb to view possible fixes.

+ s := "gopher" + fmt.Printf("Hello and welcome, %s!\n", s) + + for i := 1; i <= 5; i++ { + // TIP

To start your debugging session, right-click your code in the editor and select the Debug option.

We have set one breakpoint + // for you, but you can always add more by pressing .

+ fmt.Println("i =", 100/i) + } +} diff --git a/pkg/errors.go b/pkg/errors.go new file mode 100644 index 0000000..57ba1ed --- /dev/null +++ b/pkg/errors.go @@ -0,0 +1,7 @@ +package mediasource + +import "errors" + +var ( + ErrorFailedTypeInference = errors.New("error failed to infer type from pointer") +) diff --git a/pkg/localtrack_options.go b/pkg/localtrack_options.go new file mode 100644 index 0000000..684eebd --- /dev/null +++ b/pkg/localtrack_options.go @@ -0,0 +1,38 @@ +package mediasource + +import "github.com/pion/webrtc/v4" + +type Option = func(*Track) error + +func WithH264(clockrate uint32) Option { + return func(track *Track) error { + var ( + err error = nil + ) + if track.track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: clockrate, + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=420029", + }, "video", "webrtc"); err != nil { + return err + } + return nil + } +} + +func WithOpus(samplerate uint32, channelLayout uint16) Option { + return func(track *Track) error { + var ( + err error = nil + ) + + if track.track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeOpus, + ClockRate: samplerate, + Channels: channelLayout, + }, "audio", "webrtc"); err != nil { + return err + } + return nil + } +} diff --git a/pkg/track.go b/pkg/track.go new file mode 100644 index 0000000..0c24592 --- /dev/null +++ b/pkg/track.go @@ -0,0 +1,72 @@ +package mediasource + +import ( + "context" + "fmt" + "time" + + "github.com/pion/webrtc/v4" + "github.com/pion/webrtc/v4/pkg/media" + + "mediasource/internal" +) + +// NO BUFFER IMPLEMENTATION + +type Track struct { + track *webrtc.TrackLocalStaticSample + stream *internal.Stream + ctx context.Context +} + +func CreateLocalTrack(ctx context.Context, stream *internal.Stream, options ...Option) (*Track, error) { + track := &Track{stream: stream, ctx: ctx} + + for _, option := range options { + if err := option(track); err != nil { + return nil, err + } + } + + return track, nil +} + +func (track *Track) GetTrack() *webrtc.TrackLocalStaticSample { + return track.track +} + +func (track *Track) Start() { + if track.track == nil { + fmt.Printf("no remote track set yet. Skipping...") + return + } + + track.stream.Start() + go track.loop() +} + +func (track *Track) loop() { + var ( + sample *media.Sample = nil + err error = nil + ticker *time.Ticker = nil + ) + + ticker = time.NewTicker(time.Second) + defer ticker.Stop() + +loop: + for { + select { + case <-track.ctx.Done(): + return + case sample = <-track.stream.WaitForSample(): + if err = track.track.WriteSample(*sample); err != nil { + fmt.Printf("Error pushing packet: %v\n", err) + track.stream.PutBack(sample) + continue loop + } + track.stream.PutBack(sample) + } + } +}