Update streamer NewTrack function

This commit is contained in:
Alexey Khit
2023-03-04 06:17:04 +03:00
parent c2cdf60ffc
commit 5aa20f0845
17 changed files with 73 additions and 24 deletions

View File

@@ -376,7 +376,7 @@ func (c *Client) AddVideoTrack(mediaCode byte, payload []byte) {
} }
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
c.videoTrack = streamer.NewTrack(codec, media.Direction) c.videoTrack = streamer.NewTrack(media, codec)
} }
var sampleRates = []uint32{4000, 8000, 11025, 16000, 20000, 22050, 32000, 44100, 48000} var sampleRates = []uint32{4000, 8000, 11025, 16000, 20000, 22050, 32000, 44100, 48000}
@@ -410,7 +410,7 @@ func (c *Client) AddAudioTrack(mediaCode byte, sampleRate byte) {
} }
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
c.audioTrack = streamer.NewTrack(codec, media.Direction) c.audioTrack = streamer.NewTrack(media, codec)
} }
func SofiaHash(password string) string { func SofiaHash(password string) string {

View File

@@ -77,7 +77,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame
} }
} }
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
return track return track
} }

View File

@@ -17,7 +17,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame
} }
} }
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
track = track.Bind(func(packet *rtp.Packet) (err error) { track = track.Bind(func(packet *rtp.Packet) (err error) {
if c.conn != nil { if c.conn != nil {
c.send += len(packet.Payload) c.send += len(packet.Payload)

View File

@@ -232,7 +232,7 @@ func (c *Client) getTracks() error {
} }
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
track := streamer.NewTrack(codec, streamer.DirectionSendonly) track := streamer.NewTrack(media, codec)
c.tracks[msg.TrackID] = track c.tracks[msg.TrackID] = track
case "mp4a": // mp4a.40.2 case "mp4a": // mp4a.40.2

View File

@@ -25,7 +25,7 @@ func (c *Client) GetMedias() []*streamer.Media {
func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track {
if c.track == nil { if c.track == nil {
c.track = streamer.NewTrack(codec, streamer.DirectionSendonly) c.track = streamer.NewTrack(media, codec)
} }
return c.track return c.track
} }

View File

@@ -52,7 +52,7 @@ func (c *Client) Handle() error {
continue // unsupported codec continue // unsupported codec
} }
track = streamer.NewTrack2(media, nil) track = streamer.NewTrack(media, nil)
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
c.tracks[packet.PayloadType] = track c.tracks[packet.PayloadType] = track

View File

@@ -88,7 +88,7 @@ func (c *Client) Describe() (err error) {
} }
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
case av.AAC: case av.AAC:
@@ -111,7 +111,7 @@ func (c *Client) Describe() (err error) {
} }
c.medias = append(c.medias, media) c.medias = append(c.medias, media)
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
default: default:

View File

@@ -463,7 +463,7 @@ func (c *Conn) SetupMedia(media *streamer.Media, codec *streamer.Codec, first bo
return nil, err return nil, err
} }
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
switch track.Direction { switch track.Direction {
case streamer.DirectionSendonly: case streamer.DirectionSendonly:

View File

@@ -40,7 +40,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
switch c.state { switch c.state {
case StatePlay, StateHandle: case StatePlay, StateHandle:
go c.Close() go c.Close()
return streamer.NewTrack(codec, media.Direction) return streamer.NewTrack(media, codec)
} }
track, err := c.SetupMedia(media, codec, true) track, err := c.SetupMedia(media, codec, true)

View File

@@ -78,7 +78,7 @@ func (c *Conn) Accept() error {
// TODO: fix someday... // TODO: fix someday...
c.channels = map[byte]*streamer.Track{} c.channels = map[byte]*streamer.Track{}
for i, media := range c.Medias { for i, media := range c.Medias {
track := streamer.NewTrack(media.Codecs[0], media.Direction) track := streamer.NewTrack(media, nil)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
c.channels[byte(i<<1)] = track c.channels[byte(i<<1)] = track
} }

View File

@@ -78,8 +78,17 @@ func (m *Media) MarshalJSON() ([]byte, error) {
} }
func (m *Media) Clone() *Media { func (m *Media) Clone() *Media {
clone := *m clone := &Media{
return &clone Kind: m.Kind,
Direction: m.Direction,
Codecs: make([]*Codec, len(m.Codecs)),
MID: m.MID,
Control: m.Control,
}
for i, codec := range m.Codecs {
clone.Codecs[i] = codec.Clone()
}
return clone
} }
func (m *Media) AV() bool { func (m *Media) AV() bool {

View File

@@ -1,8 +1,10 @@
package streamer package streamer
import ( import (
"fmt"
"github.com/pion/sdp/v3" "github.com/pion/sdp/v3"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/url" "net/url"
"testing" "testing"
) )
@@ -40,3 +42,22 @@ func TestParseQuery(t *testing.T) {
}, medias) }, medias)
} }
} }
func TestClone(t *testing.T) {
media1 := &Media{
Kind: KindVideo,
Direction: DirectionRecvonly,
Codecs: []*Codec{
{Name: CodecPCMU, ClockRate: 8000},
},
}
media2 := media1.Clone()
p1 := fmt.Sprintf("%p", media1)
p2 := fmt.Sprintf("%p", media2)
require.NotEqualValues(t, p1, p2)
p3 := fmt.Sprintf("%p", media1.Codecs[0])
p4 := fmt.Sprintf("%p", media2.Codecs[0])
require.NotEqualValues(t, p3, p4)
}

View File

@@ -1,3 +1,22 @@
// Package streamer
//
// 1. Consumer.GetMedias - return list of Media, that Consumer can play/load/consume:
// - Media with DirectionRecvonly for audio/video
// - Media with DirectionSendonly for backchannel
//
// 2. Producer.GetMedias - return list of Media, that Producer can generate/create/produce
// - Media with DirectionSendonly for audio/video
// - Media with DirectionRecvonly for backchannel
//
// 3. Producer.GetTrack - get Media from Producer and Codec from that Media return Track from Producer:
// - Media with DirectionSendonly should Track.WriteRTP after Producer.Start
// - Media with DirectionRecvonly should Track.Bind and wait Track.WriteRTP from Consumer
//
// 4. Consumer.AddTrack - takes Media from Consumer and Track from Producer:
// - Media with DirectionRecvonly should Track.WriteRTP
// - Media with DirectionSendonly should Track.Bind
//
// 5. Producer.Start - run loop with reading rtp.Packet from source
package streamer package streamer
// States, Queries and Events // States, Queries and Events

View File

@@ -17,11 +17,7 @@ type Track struct {
sinkMu *sync.RWMutex sinkMu *sync.RWMutex
} }
func NewTrack(codec *Codec, direction string) *Track { func NewTrack(media *Media, codec *Codec) *Track {
return &Track{Codec: codec, Direction: direction, sinkMu: new(sync.RWMutex)}
}
func NewTrack2(media *Media, codec *Codec) *Track {
if codec == nil { if codec == nil {
codec = media.Codecs[0] codec = media.Codecs[0]
} }
@@ -55,6 +51,8 @@ func (t *Track) WriteRTP(p *rtp.Packet) error {
return nil return nil
} }
// Bind - attach WriterFunc (Consumer) for receiving rtp.Packet(s)
// and return new Track copy. Later you can run Unbind for new Track
func (t *Track) Bind(w WriterFunc) *Track { func (t *Track) Bind(w WriterFunc) *Track {
t.sinkMu.Lock() t.sinkMu.Lock()
@@ -70,6 +68,8 @@ func (t *Track) Bind(w WriterFunc) *Track {
return &clone return &clone
} }
// Unbind - detach WriterFunc that related to this Track from
// consuming track data
func (t *Track) Unbind() { func (t *Track) Unbind() {
t.sinkMu.Lock() t.sinkMu.Lock()
delete(t.sink, t) delete(t.sink, t)

View File

@@ -53,7 +53,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track *
return nil return nil
} }
track = streamer.NewTrack2(media, codec) track = streamer.NewTrack(media, codec)
c.tracks[mpegts.StreamTypeH264] = track c.tracks[mpegts.StreamTypeH264] = track
} else { } else {
if media.Direction == streamer.DirectionSendonly { if media.Direction == streamer.DirectionSendonly {
@@ -61,7 +61,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track *
return nil return nil
} }
track = streamer.NewTrack2(media, codec) track = streamer.NewTrack(media, codec)
c.tracks[mpegts.StreamTypePCMATapo] = track c.tracks[mpegts.StreamTypePCMATapo] = track
} else { } else {
if err := c.SetupBackchannel(); err != nil { if err := c.SetupBackchannel(); err != nil {
@@ -69,7 +69,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track *
} }
if w := c.backchannelWriter(); w != nil { if w := c.backchannelWriter(); w != nil {
track = streamer.NewTrack2(media, codec) track = streamer.NewTrack(media, codec)
track.Bind(w) track.Bind(w)
c.tracks[0] = track c.tracks[0] = track
} }

View File

@@ -113,7 +113,7 @@ func (c *Conn) getTrack(remote *webrtc.TrackRemote) *streamer.Track {
for _, media := range c.medias { for _, media := range c.medias {
for _, codec := range media.Codecs { for _, codec := range media.Codecs {
if codec.PayloadType == payloadType { if codec.PayloadType == payloadType {
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
return track return track
} }

View File

@@ -11,7 +11,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.
} }
} }
track := streamer.NewTrack(codec, media.Direction) track := streamer.NewTrack(media, codec)
c.tracks = append(c.tracks, track) c.tracks = append(c.tracks, track)
return track return track
} }