diff --git a/pkg/dvrip/client.go b/pkg/dvrip/client.go index 77b060f4..9b8f6251 100644 --- a/pkg/dvrip/client.go +++ b/pkg/dvrip/client.go @@ -376,7 +376,7 @@ func (c *Client) AddVideoTrack(mediaCode byte, payload []byte) { } c.medias = append(c.medias, media) - c.videoTrack = streamer.NewTrack(codec, media.Direction) + c.videoTrack = streamer.NewTrack(media, codec) } var sampleRates = []uint32{4000, 8000, 11025, 16000, 20000, 22050, 32000, 44100, 48000} @@ -410,7 +410,7 @@ func (c *Client) AddAudioTrack(mediaCode byte, sampleRate byte) { } c.medias = append(c.medias, media) - c.audioTrack = streamer.NewTrack(codec, media.Direction) + c.audioTrack = streamer.NewTrack(media, codec) } func SofiaHash(password string) string { diff --git a/pkg/homekit/client.go b/pkg/homekit/client.go index 5100865f..72f95666 100644 --- a/pkg/homekit/client.go +++ b/pkg/homekit/client.go @@ -77,7 +77,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame } } - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) c.tracks = append(c.tracks, track) return track } diff --git a/pkg/isapi/producer.go b/pkg/isapi/producer.go index e9cd371a..305aa61f 100644 --- a/pkg/isapi/producer.go +++ b/pkg/isapi/producer.go @@ -17,7 +17,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame } } - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) track = track.Bind(func(packet *rtp.Packet) (err error) { if c.conn != nil { c.send += len(packet.Payload) diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go index 96121fba..e07496e1 100644 --- a/pkg/ivideon/client.go +++ b/pkg/ivideon/client.go @@ -232,7 +232,7 @@ func (c *Client) getTracks() error { } c.medias = append(c.medias, media) - track := streamer.NewTrack(codec, streamer.DirectionSendonly) + track := streamer.NewTrack(media, codec) c.tracks[msg.TrackID] = track case "mp4a": // mp4a.40.2 diff --git a/pkg/mjpeg/producer.go b/pkg/mjpeg/producer.go index 333d4f2c..b23cfcac 100644 --- a/pkg/mjpeg/producer.go +++ b/pkg/mjpeg/producer.go @@ -25,7 +25,7 @@ func (c *Client) GetMedias() []*streamer.Media { func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer.Track { if c.track == nil { - c.track = streamer.NewTrack(codec, streamer.DirectionSendonly) + c.track = streamer.NewTrack(media, codec) } return c.track } diff --git a/pkg/mpegts/client.go b/pkg/mpegts/client.go index 0b515a43..f194f3b1 100644 --- a/pkg/mpegts/client.go +++ b/pkg/mpegts/client.go @@ -52,7 +52,7 @@ func (c *Client) Handle() error { continue // unsupported codec } - track = streamer.NewTrack2(media, nil) + track = streamer.NewTrack(media, nil) c.medias = append(c.medias, media) c.tracks[packet.PayloadType] = track diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index 8d0eb85f..8ee885d8 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -88,7 +88,7 @@ func (c *Client) Describe() (err error) { } c.medias = append(c.medias, media) - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) c.tracks = append(c.tracks, track) case av.AAC: @@ -111,7 +111,7 @@ func (c *Client) Describe() (err error) { } c.medias = append(c.medias, media) - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) c.tracks = append(c.tracks, track) default: diff --git a/pkg/rtsp/client.go b/pkg/rtsp/client.go index c0c30bcf..f720278f 100644 --- a/pkg/rtsp/client.go +++ b/pkg/rtsp/client.go @@ -463,7 +463,7 @@ func (c *Conn) SetupMedia(media *streamer.Media, codec *streamer.Codec, first bo return nil, err } - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) switch track.Direction { case streamer.DirectionSendonly: diff --git a/pkg/rtsp/producer.go b/pkg/rtsp/producer.go index 7a6139b3..398d7baf 100644 --- a/pkg/rtsp/producer.go +++ b/pkg/rtsp/producer.go @@ -40,7 +40,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. switch c.state { case StatePlay, StateHandle: go c.Close() - return streamer.NewTrack(codec, media.Direction) + return streamer.NewTrack(media, codec) } track, err := c.SetupMedia(media, codec, true) diff --git a/pkg/rtsp/server.go b/pkg/rtsp/server.go index 59b2484b..8a1875bf 100644 --- a/pkg/rtsp/server.go +++ b/pkg/rtsp/server.go @@ -78,7 +78,7 @@ func (c *Conn) Accept() error { // TODO: fix someday... c.channels = map[byte]*streamer.Track{} for i, media := range c.Medias { - track := streamer.NewTrack(media.Codecs[0], media.Direction) + track := streamer.NewTrack(media, nil) c.tracks = append(c.tracks, track) c.channels[byte(i<<1)] = track } diff --git a/pkg/streamer/media.go b/pkg/streamer/media.go index ecb1f43a..9a806995 100644 --- a/pkg/streamer/media.go +++ b/pkg/streamer/media.go @@ -78,8 +78,17 @@ func (m *Media) MarshalJSON() ([]byte, error) { } func (m *Media) Clone() *Media { - clone := *m - return &clone + clone := &Media{ + Kind: m.Kind, + Direction: m.Direction, + Codecs: make([]*Codec, len(m.Codecs)), + MID: m.MID, + Control: m.Control, + } + for i, codec := range m.Codecs { + clone.Codecs[i] = codec.Clone() + } + return clone } func (m *Media) AV() bool { diff --git a/pkg/streamer/media_test.go b/pkg/streamer/media_test.go index c098f414..f283882d 100644 --- a/pkg/streamer/media_test.go +++ b/pkg/streamer/media_test.go @@ -1,8 +1,10 @@ package streamer import ( + "fmt" "github.com/pion/sdp/v3" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "net/url" "testing" ) @@ -40,3 +42,22 @@ func TestParseQuery(t *testing.T) { }, medias) } } + +func TestClone(t *testing.T) { + media1 := &Media{ + Kind: KindVideo, + Direction: DirectionRecvonly, + Codecs: []*Codec{ + {Name: CodecPCMU, ClockRate: 8000}, + }, + } + media2 := media1.Clone() + + p1 := fmt.Sprintf("%p", media1) + p2 := fmt.Sprintf("%p", media2) + require.NotEqualValues(t, p1, p2) + + p3 := fmt.Sprintf("%p", media1.Codecs[0]) + p4 := fmt.Sprintf("%p", media2.Codecs[0]) + require.NotEqualValues(t, p3, p4) +} diff --git a/pkg/streamer/streamer.go b/pkg/streamer/streamer.go index 9db88998..aa599d2d 100644 --- a/pkg/streamer/streamer.go +++ b/pkg/streamer/streamer.go @@ -1,3 +1,22 @@ +// Package streamer +// +// 1. Consumer.GetMedias - return list of Media, that Consumer can play/load/consume: +// - Media with DirectionRecvonly for audio/video +// - Media with DirectionSendonly for backchannel +// +// 2. Producer.GetMedias - return list of Media, that Producer can generate/create/produce +// - Media with DirectionSendonly for audio/video +// - Media with DirectionRecvonly for backchannel +// +// 3. Producer.GetTrack - get Media from Producer and Codec from that Media return Track from Producer: +// - Media with DirectionSendonly should Track.WriteRTP after Producer.Start +// - Media with DirectionRecvonly should Track.Bind and wait Track.WriteRTP from Consumer +// +// 4. Consumer.AddTrack - takes Media from Consumer and Track from Producer: +// - Media with DirectionRecvonly should Track.WriteRTP +// - Media with DirectionSendonly should Track.Bind +// +// 5. Producer.Start - run loop with reading rtp.Packet from source package streamer // States, Queries and Events diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index 188d4439..0eded691 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -17,11 +17,7 @@ type Track struct { sinkMu *sync.RWMutex } -func NewTrack(codec *Codec, direction string) *Track { - return &Track{Codec: codec, Direction: direction, sinkMu: new(sync.RWMutex)} -} - -func NewTrack2(media *Media, codec *Codec) *Track { +func NewTrack(media *Media, codec *Codec) *Track { if codec == nil { codec = media.Codecs[0] } @@ -55,6 +51,8 @@ func (t *Track) WriteRTP(p *rtp.Packet) error { return nil } +// Bind - attach WriterFunc (Consumer) for receiving rtp.Packet(s) +// and return new Track copy. Later you can run Unbind for new Track func (t *Track) Bind(w WriterFunc) *Track { t.sinkMu.Lock() @@ -70,6 +68,8 @@ func (t *Track) Bind(w WriterFunc) *Track { return &clone } +// Unbind - detach WriterFunc that related to this Track from +// consuming track data func (t *Track) Unbind() { t.sinkMu.Lock() delete(t.sink, t) diff --git a/pkg/tapo/producer.go b/pkg/tapo/producer.go index bcb9aa3d..18227244 100644 --- a/pkg/tapo/producer.go +++ b/pkg/tapo/producer.go @@ -53,7 +53,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track * return nil } - track = streamer.NewTrack2(media, codec) + track = streamer.NewTrack(media, codec) c.tracks[mpegts.StreamTypeH264] = track } else { if media.Direction == streamer.DirectionSendonly { @@ -61,7 +61,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track * return nil } - track = streamer.NewTrack2(media, codec) + track = streamer.NewTrack(media, codec) c.tracks[mpegts.StreamTypePCMATapo] = track } else { if err := c.SetupBackchannel(); err != nil { @@ -69,7 +69,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) (track * } if w := c.backchannelWriter(); w != nil { - track = streamer.NewTrack2(media, codec) + track = streamer.NewTrack(media, codec) track.Bind(w) c.tracks[0] = track } diff --git a/pkg/webrtc/conn.go b/pkg/webrtc/conn.go index 78e6a082..59753376 100644 --- a/pkg/webrtc/conn.go +++ b/pkg/webrtc/conn.go @@ -113,7 +113,7 @@ func (c *Conn) getTrack(remote *webrtc.TrackRemote) *streamer.Track { for _, media := range c.medias { for _, codec := range media.Codecs { if codec.PayloadType == payloadType { - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) c.tracks = append(c.tracks, track) return track } diff --git a/pkg/webrtc/producer.go b/pkg/webrtc/producer.go index 0a7a913b..c382e061 100644 --- a/pkg/webrtc/producer.go +++ b/pkg/webrtc/producer.go @@ -11,7 +11,7 @@ func (c *Conn) GetTrack(media *streamer.Media, codec *streamer.Codec) *streamer. } } - track := streamer.NewTrack(codec, media.Direction) + track := streamer.NewTrack(media, codec) c.tracks = append(c.tracks, track) return track }