From 0e281e36d399f799206fbd32c2b46f9ea51e6779 Mon Sep 17 00:00:00 2001 From: Alexey Khit Date: Tue, 22 Nov 2022 20:03:36 +0300 Subject: [PATCH] Fix race (concurency) for Track --- pkg/fake/producer.go | 2 +- pkg/homekit/client.go | 2 +- pkg/ivideon/client.go | 5 +---- pkg/rtmp/client.go | 8 ++------ pkg/rtsp/conn.go | 8 ++------ pkg/streamer/track.go | 14 ++++++++------ 6 files changed, 15 insertions(+), 24 deletions(-) diff --git a/pkg/fake/producer.go b/pkg/fake/producer.go index 6c9f992e..1001ba37 100644 --- a/pkg/fake/producer.go +++ b/pkg/fake/producer.go @@ -24,7 +24,7 @@ func (p *Producer) GetTrack(media *streamer.Media, codec *streamer.Codec) *strea panic("you shall not pass!") } - track := &streamer.Track{Codec: codec, Direction: media.Direction} + track := streamer.NewTrack(codec, media.Direction) switch media.Direction { case streamer.DirectionSendonly: diff --git a/pkg/homekit/client.go b/pkg/homekit/client.go index 195bbf9a..ce532698 100644 --- a/pkg/homekit/client.go +++ b/pkg/homekit/client.go @@ -75,7 +75,7 @@ func (c *Client) GetTrack(media *streamer.Media, codec *streamer.Codec) *streame } } - track := &streamer.Track{Codec: codec, Direction: media.Direction} + track := streamer.NewTrack(codec, media.Direction) c.tracks = append(c.tracks, track) return track } diff --git a/pkg/ivideon/client.go b/pkg/ivideon/client.go index a43e8953..a49bf274 100644 --- a/pkg/ivideon/client.go +++ b/pkg/ivideon/client.go @@ -192,10 +192,7 @@ func (c *Client) getTracks() error { } c.medias = append(c.medias, media) - track := &streamer.Track{ - Direction: streamer.DirectionSendonly, - Codec: codec, - } + track := streamer.NewTrack(codec, streamer.DirectionSendonly) c.tracks[msg.TrackID] = track case "mp4a": // mp4a.40.2 diff --git a/pkg/rtmp/client.go b/pkg/rtmp/client.go index 4efe3115..0cedfbdc 100644 --- a/pkg/rtmp/client.go +++ b/pkg/rtmp/client.go @@ -83,9 +83,7 @@ func (c *Client) Dial() (err error) { } c.medias = append(c.medias, media) - track := &streamer.Track{ - Codec: codec, Direction: media.Direction, - } + track := streamer.NewTrack(codec, media.Direction) c.tracks = append(c.tracks, track) case av.AAC: @@ -108,9 +106,7 @@ func (c *Client) Dial() (err error) { } c.medias = append(c.medias, media) - track := &streamer.Track{ - Codec: codec, Direction: media.Direction, - } + track := streamer.NewTrack(codec, media.Direction) c.tracks = append(c.tracks, track) default: diff --git a/pkg/rtsp/conn.go b/pkg/rtsp/conn.go index 86fbe3a4..02dc597f 100644 --- a/pkg/rtsp/conn.go +++ b/pkg/rtsp/conn.go @@ -429,9 +429,7 @@ func (c *Conn) SetupMedia( return nil, err } - track := &streamer.Track{ - Codec: codec, Direction: media.Direction, - } + track := streamer.NewTrack(codec, media.Direction) switch track.Direction { case streamer.DirectionSendonly: @@ -519,9 +517,7 @@ func (c *Conn) Accept() error { // TODO: fix someday... c.channels = map[byte]*streamer.Track{} for i, media := range c.Medias { - track := &streamer.Track{ - Codec: media.Codecs[0], Direction: media.Direction, - } + track := streamer.NewTrack(media.Codecs[0], media.Direction) c.tracks = append(c.tracks, track) c.channels[byte(i<<1)] = track } diff --git a/pkg/streamer/track.go b/pkg/streamer/track.go index d2d81a1e..17e2c92a 100644 --- a/pkg/streamer/track.go +++ b/pkg/streamer/track.go @@ -13,7 +13,11 @@ type Track struct { Codec *Codec Direction string sink map[*Track]WriterFunc - sinkMu sync.RWMutex + sinkMu *sync.RWMutex +} + +func NewTrack(codec *Codec, direction string) *Track { + return &Track{Codec: codec, Direction: direction, sinkMu: new(sync.RWMutex)} } func (t *Track) String() string { @@ -40,14 +44,12 @@ func (t *Track) Bind(w WriterFunc) *Track { t.sink = map[*Track]WriterFunc{} } - clone := &Track{ - Codec: t.Codec, Direction: t.Direction, sink: t.sink, - } - t.sink[clone] = w + clone := *t + t.sink[&clone] = w t.sinkMu.Unlock() - return clone + return &clone } func (t *Track) Unbind() {