first commit

This commit is contained in:
harshabose
2025-02-19 23:24:36 +05:30
commit e2e3a59faf
9 changed files with 399 additions and 0 deletions

2
.gitignore vendored Normal file
View File

@@ -0,0 +1,2 @@
/third_party
.idea

34
go.mod Normal file
View File

@@ -0,0 +1,34 @@
module mediasource
go 1.23
require (
github.com/google/uuid v1.6.0 // indirect
github.com/pion/datachannel v1.5.10 // indirect
github.com/pion/dtls/v3 v3.0.4 // indirect
github.com/pion/ice/v4 v4.0.6 // indirect
github.com/pion/interceptor v0.1.37 // indirect
github.com/pion/logging v0.2.3 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/randutil v0.1.0 // indirect
github.com/pion/rtcp v1.2.15 // indirect
github.com/pion/rtp v1.8.11 // indirect
github.com/pion/sctp v1.8.35 // indirect
github.com/pion/sdp/v3 v3.0.10 // indirect
github.com/pion/srtp/v3 v3.0.4 // indirect
github.com/pion/stun/v3 v3.0.0 // indirect
github.com/pion/transport/v3 v3.0.7 // indirect
github.com/pion/turn/v4 v4.0.0 // indirect
github.com/pion/webrtc/v4 v4.0.10 // indirect
github.com/wlynxg/anet v0.0.5 // indirect
golang.org/x/crypto v0.32.0 // indirect
golang.org/x/net v0.34.0 // indirect
golang.org/x/sys v0.29.0 // indirect
github.com/harshabose/simple_webrtc_comm/transcode v0.0.0
github.com/harshabose/tools/buffer v0.0.0
)
replace (
github.com/harshabose/simple_webrtc_comm/transcode => ../transcode
github.com/harshabose/tools/buffer => ../tools/buffer
)

42
go.sum Normal file
View File

@@ -0,0 +1,42 @@
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o=
github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M=
github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U=
github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg=
github.com/pion/ice/v4 v4.0.6 h1:jmM9HwI9lfetQV/39uD0nY4y++XZNPhvzIPCb8EwxUM=
github.com/pion/ice/v4 v4.0.6/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw=
github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI=
github.com/pion/interceptor v0.1.37/go.mod h1:JzxbJ4umVTlZAf+/utHzNesY8tmRkM2lVmkS82TTj8Y=
github.com/pion/logging v0.2.3 h1:gHuf0zpoh1GW67Nr6Gj4cv5Z9ZscU7g/EaoC/Ke/igI=
github.com/pion/logging v0.2.3/go.mod h1:z8YfknkquMe1csOrxK5kc+5/ZPAzMxbKLX5aXpbpC90=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo=
github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0=
github.com/pion/rtp v1.8.11 h1:17xjnY5WO5hgO6SD3/NTIUPvSFw/PbLsIJyz1r1yNIk=
github.com/pion/rtp v1.8.11/go.mod h1:8uMBJj32Pa1wwx8Fuv/AsFhn8jsgw+3rUC2PfoBZ8p4=
github.com/pion/sctp v1.8.35 h1:qwtKvNK1Wc5tHMIYgTDJhfZk7vATGVHhXbUDfHbYwzA=
github.com/pion/sctp v1.8.35/go.mod h1:EcXP8zCYVTRy3W9xtOF7wJm1L1aXfKRQzaM33SjQlzg=
github.com/pion/sdp/v3 v3.0.10 h1:6MChLE/1xYB+CjumMw+gZ9ufp2DPApuVSnDT8t5MIgA=
github.com/pion/sdp/v3 v3.0.10/go.mod h1:88GMahN5xnScv1hIMTqLdu/cOcUkj6a9ytbncwMCq2E=
github.com/pion/srtp/v3 v3.0.4 h1:2Z6vDVxzrX3UHEgrUyIGM4rRouoC7v+NiF1IHtp9B5M=
github.com/pion/srtp/v3 v3.0.4/go.mod h1:1Jx3FwDoxpRaTh1oRV8A/6G1BnFL+QI82eK4ms8EEJQ=
github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw=
github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM=
github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA=
github.com/pion/webrtc/v4 v4.0.10 h1:Hq/JLjhqLxi+NmCtE8lnRPDr8H4LcNvwg8OxVcdv56Q=
github.com/pion/webrtc/v4 v4.0.10/go.mod h1:ViHLVaNpiuvaH8pdiuQxuA9awuE6KVzAXx3vVWilOck=
github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU=
github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
golang.org/x/crypto v0.32.0 h1:euUpcYgM8WcP71gNpTqQCn6rC2t6ULUPiOzfWaXVVfc=
golang.org/x/crypto v0.32.0/go.mod h1:ZnnJkOaASj8g0AjIduWNlq2NRxL0PlBrbKVyZ6V/Ugc=
golang.org/x/net v0.34.0 h1:Mb7Mrk043xzHgnRM88suvJFwzVrRfHEHJEl5/71CKw0=
golang.org/x/net v0.34.0/go.mod h1:di0qlW3YNM5oh6GqDGQr92MyTozJPmybPK4Ev/Gm31k=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=

52
internal/sample_pool.go Normal file
View File

@@ -0,0 +1,52 @@
package internal
import (
"sync"
"github.com/harshabose/tools/buffer/pkg"
"github.com/pion/webrtc/v4/pkg/media"
)
type samplePool struct {
pool sync.Pool
}
func CreateSamplePool() buffer.Pool[media.Sample] {
return &samplePool{
pool: sync.Pool{
New: func() any {
return &media.Sample{}
},
},
}
}
func (pool *samplePool) Get() *media.Sample {
packet, ok := pool.pool.Get().(*media.Sample)
if packet == nil || !ok {
return &media.Sample{}
}
return packet
}
func (pool *samplePool) Put(sample *media.Sample) {
if sample == nil {
return
}
pool.pool.Put(sample)
}
func (pool *samplePool) Release() {
for {
sample, ok := pool.pool.Get().(*media.Sample)
if !ok {
continue
}
if sample == nil {
break
}
sample = nil
}
}

131
internal/stream.go Normal file
View File

@@ -0,0 +1,131 @@
package internal
import (
"context"
"fmt"
"time"
"github.com/asticode/go-astiav"
"github.com/harshabose/simple_webrtc_comm/transcode/pkg"
"github.com/harshabose/tools/buffer/pkg"
"github.com/pion/webrtc/v4/pkg/media"
)
type Options struct {
DemuxerOptions []transcode.DemuxerOption
DecoderOptions []transcode.DecoderOption
FilterOptions []transcode.FilterOption
EncoderOptions []transcode.EncoderOption
}
type Stream struct {
demuxer *transcode.Demuxer
decoder *transcode.Decoder
filter *transcode.Filter
encoder *transcode.Encoder
buffer buffer.BufferWithGenerator[media.Sample]
ctx context.Context
}
func CreateStream(ctx context.Context, containerAddress string, options *Options) (*Stream, error) {
var (
demuxer *transcode.Demuxer
decoder *transcode.Decoder
filter *transcode.Filter
encoder *transcode.Encoder
err error
)
if demuxer, err = transcode.CreateDemuxer(ctx, containerAddress, options.DemuxerOptions...); err != nil {
return nil, err
}
if decoder, err = transcode.CreateDecoder(ctx, demuxer, append([]transcode.DecoderOption{demuxer.GetDecoderContextOptions()}, options.DecoderOptions...)...); err != nil {
return nil, err
}
if filter, err = transcode.CreateFilter(ctx, decoder, transcode.VideoFilters, decoder.GetSrcFilterContextOptions(), transcode.WithDefaultVideoFilterContentOptions); err != nil {
return nil, err
}
if encoder, err = transcode.CreateEncoder(ctx, filter, transcode.WithLowLatencyVideoEncoderSetting); err != nil {
return nil, err
}
fmt.Println("started encoder with settings:")
return &Stream{
demuxer: demuxer,
decoder: decoder,
filter: filter,
encoder: encoder,
buffer: buffer.CreateChannelBuffer(ctx, encoder.GetFPS()*3, CreateSamplePool()),
ctx: ctx,
}, nil
}
func (stream *Stream) Start() {
stream.demuxer.Start()
stream.decoder.Start()
stream.filter.Start()
stream.encoder.Start()
go stream.loop()
}
func (stream *Stream) loop() {
var (
packet *astiav.Packet
err error
)
for {
select {
case <-stream.ctx.Done():
return
case packet = <-stream.encoder.WaitForPacket():
if err = stream.pushSample(stream.packetToSample(packet)); err != nil {
stream.encoder.PutBack(packet)
continue
}
stream.encoder.PutBack(packet)
}
}
}
func (stream *Stream) pushSample(sample *media.Sample) error {
ctx, cancel := context.WithTimeout(stream.ctx, time.Second)
defer cancel()
return stream.buffer.Push(ctx, sample)
}
func (stream *Stream) GetSample() (*media.Sample, error) {
ctx, cancel := context.WithTimeout(stream.ctx, time.Second)
defer cancel()
return stream.buffer.Pop(ctx)
}
func (stream *Stream) PutBack(sample *media.Sample) {
stream.buffer.PutBack(sample)
}
func (stream *Stream) WaitForSample() chan *media.Sample {
return stream.buffer.GetChannel()
}
func (stream *Stream) GetParameterSets() ([]byte, []byte) {
return stream.encoder.GetSPS(), stream.encoder.GetPPS()
}
func (stream *Stream) packetToSample(packet *astiav.Packet) *media.Sample {
sample := stream.buffer.Generate()
sample.Data = packet.Data()
sample.Timestamp = time.Now().UTC()
sample.Duration = time.Second / time.Duration(stream.encoder.GetFPS())
sample.PacketTimestamp = uint32(float64(packet.Pts()) * float64(stream.encoder.GetVideoTimeBase()) * float64(time.Second))
sample.PrevDroppedPackets = 0
sample.Metadata = nil
sample.RTPHeaders = nil
return sample
}

21
main.go Normal file
View File

@@ -0,0 +1,21 @@
package main
import (
"fmt"
)
// TIP <p>To run your code, right-click the code and select <b>Run</b>.</p> <p>Alternatively, click
// the <icon src="AllIcons.Actions.Execute"/> icon in the gutter and select the <b>Run</b> menu item from here.</p>
func main() {
// TIP <p>Press <shortcut actionId="ShowIntentionActions"/> when your caret is at the underlined text
// to see how GoLand suggests fixing the warning.</p><p>Alternatively, if available, click the lightbulb to view possible fixes.</p>
s := "gopher"
fmt.Printf("Hello and welcome, %s!\n", s)
for i := 1; i <= 5; i++ {
// TIP <p>To start your debugging session, right-click your code in the editor and select the Debug option.</p> <p>We have set one <icon src="AllIcons.Debugger.Db_set_breakpoint"/> breakpoint
// for you, but you can always add more by pressing <shortcut actionId="ToggleLineBreakpoint"/>.</p>
fmt.Println("i =", 100/i)
}
}

7
pkg/errors.go Normal file
View File

@@ -0,0 +1,7 @@
package mediasource
import "errors"
var (
ErrorFailedTypeInference = errors.New("error failed to infer type from pointer")
)

38
pkg/localtrack_options.go Normal file
View File

@@ -0,0 +1,38 @@
package mediasource
import "github.com/pion/webrtc/v4"
type Option = func(*Track) error
func WithH264(clockrate uint32) Option {
return func(track *Track) error {
var (
err error = nil
)
if track.track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeH264,
ClockRate: clockrate,
SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=420029",
}, "video", "webrtc"); err != nil {
return err
}
return nil
}
}
func WithOpus(samplerate uint32, channelLayout uint16) Option {
return func(track *Track) error {
var (
err error = nil
)
if track.track, err = webrtc.NewTrackLocalStaticSample(webrtc.RTPCodecCapability{
MimeType: webrtc.MimeTypeOpus,
ClockRate: samplerate,
Channels: channelLayout,
}, "audio", "webrtc"); err != nil {
return err
}
return nil
}
}

72
pkg/track.go Normal file
View File

@@ -0,0 +1,72 @@
package mediasource
import (
"context"
"fmt"
"time"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
"mediasource/internal"
)
// NO BUFFER IMPLEMENTATION
type Track struct {
track *webrtc.TrackLocalStaticSample
stream *internal.Stream
ctx context.Context
}
func CreateLocalTrack(ctx context.Context, stream *internal.Stream, options ...Option) (*Track, error) {
track := &Track{stream: stream, ctx: ctx}
for _, option := range options {
if err := option(track); err != nil {
return nil, err
}
}
return track, nil
}
func (track *Track) GetTrack() *webrtc.TrackLocalStaticSample {
return track.track
}
func (track *Track) Start() {
if track.track == nil {
fmt.Printf("no remote track set yet. Skipping...")
return
}
track.stream.Start()
go track.loop()
}
func (track *Track) loop() {
var (
sample *media.Sample = nil
err error = nil
ticker *time.Ticker = nil
)
ticker = time.NewTicker(time.Second)
defer ticker.Stop()
loop:
for {
select {
case <-track.ctx.Done():
return
case sample = <-track.stream.WaitForSample():
if err = track.track.WriteSample(*sample); err != nil {
fmt.Printf("Error pushing packet: %v\n", err)
track.stream.PutBack(sample)
continue loop
}
track.stream.PutBack(sample)
}
}
}