diff --git a/internal/core/hls_muxer.go b/internal/core/hls_muxer.go index 304159c9..58974524 100644 --- a/internal/core/hls_muxer.go +++ b/internal/core/hls_muxer.go @@ -256,6 +256,8 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) m.writer = asyncwriter.New(m.writeQueueSize, m) + defer res.stream.RemoveReader(m.writer) + var medias []*description.Media videoMedia, videoTrack := m.createVideoTrack(res.stream) @@ -268,8 +270,6 @@ func (m *hlsMuxer) runInner(innerCtx context.Context, innerReady chan struct{}) medias = append(medias, audioMedia) } - defer res.stream.RemoveReader(m.writer) - if medias == nil { return fmt.Errorf( "the stream doesn't contain any supported codec, which are currently H265, H264, Opus, MPEG-4 Audio") diff --git a/internal/core/rtmp_conn.go b/internal/core/rtmp_conn.go index 3f230c7f..968fed39 100644 --- a/internal/core/rtmp_conn.go +++ b/internal/core/rtmp_conn.go @@ -243,6 +243,8 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { writer := asyncwriter.New(c.writeQueueSize, c) + defer res.stream.RemoveReader(writer) + var medias []*description.Media var w *rtmp.Writer @@ -267,8 +269,6 @@ func (c *rtmpConn) runRead(conn *rtmp.Conn, u *url.URL) error { "the stream doesn't contain any supported codec, which are currently H264, MPEG-4 Audio, MPEG-1/2 Audio") } - defer res.stream.RemoveReader(writer) - c.Log(logger.Info, "is reading from path '%s', %s", res.path.name, sourceMediaInfo(medias)) diff --git a/internal/core/srt_conn.go b/internal/core/srt_conn.go index abd2d4ce..257c9db3 100644 --- a/internal/core/srt_conn.go +++ b/internal/core/srt_conn.go @@ -256,6 +256,25 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { var medi *description.Media switch tcodec := track.Codec.(type) { + case *mpegts.CodecH265: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AU: au, + }) + return nil + }) + case *mpegts.CodecH264: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -276,21 +295,22 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { return nil }) - case *mpegts.CodecH265: + case *mpegts.CodecOpus: medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{ PayloadTyp: 96, + IsStereo: (tcodec.ChannelCount == 2), }}, } - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ Base: unit.Base{ NTP: time.Now(), PTS: decodeTime(pts), }, - AU: au, + Packets: packets, }) return nil }) @@ -318,26 +338,6 @@ func (c *srtConn) runPublishReader(sconn srt.Conn, path *path) error { return nil }) - case *mpegts.CodecOpus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - IsStereo: (tcodec.ChannelCount == 2), - }}, - } - - r.OnDataOpus(track, func(pts int64, packets [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Packets: packets, - }) - return nil - }) - case *mpegts.CodecMPEG1Audio: medi = &description.Media{ Type: description.MediaTypeAudio, @@ -424,6 +424,8 @@ func (c *srtConn) runRead(req srtNewConnReq, pathName string, user string, pass writer := asyncwriter.New(c.writeQueueSize, c) + defer res.stream.RemoveReader(writer) + var w *mpegts.Writer var tracks []*mpegts.Track var medias []*description.Media diff --git a/internal/core/udp_source.go b/internal/core/udp_source.go index 8840f1ad..66628633 100644 --- a/internal/core/udp_source.go +++ b/internal/core/udp_source.go @@ -161,6 +161,25 @@ func (s *udpSource) runReader(pc net.PacketConn) error { var medi *description.Media switch tcodec := track.Codec.(type) { + case *mpegts.CodecH265: + medi = &description.Media{ + Type: description.MediaTypeVideo, + Formats: []format.Format{&format.H265{ + PayloadTyp: 96, + }}, + } + + r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + Base: unit.Base{ + NTP: time.Now(), + PTS: decodeTime(pts), + }, + AU: au, + }) + return nil + }) + case *mpegts.CodecH264: medi = &description.Media{ Type: description.MediaTypeVideo, @@ -181,21 +200,22 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return nil }) - case *mpegts.CodecH265: + case *mpegts.CodecOpus: medi = &description.Media{ - Type: description.MediaTypeVideo, - Formats: []format.Format{&format.H265{ + Type: description.MediaTypeAudio, + Formats: []format.Format{&format.Opus{ PayloadTyp: 96, + IsStereo: (tcodec.ChannelCount == 2), }}, } - r.OnDataH26x(track, func(pts int64, _ int64, au [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.H265{ + r.OnDataOpus(track, func(pts int64, packets [][]byte) error { + stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ Base: unit.Base{ NTP: time.Now(), PTS: decodeTime(pts), }, - AU: au, + Packets: packets, }) return nil }) @@ -223,26 +243,6 @@ func (s *udpSource) runReader(pc net.PacketConn) error { return nil }) - case *mpegts.CodecOpus: - medi = &description.Media{ - Type: description.MediaTypeAudio, - Formats: []format.Format{&format.Opus{ - PayloadTyp: 96, - IsStereo: (tcodec.ChannelCount == 2), - }}, - } - - r.OnDataOpus(track, func(pts int64, packets [][]byte) error { - stream.WriteUnit(medi, medi.Formats[0], &unit.Opus{ - Base: unit.Base{ - NTP: time.Now(), - PTS: decodeTime(pts), - }, - Packets: packets, - }) - return nil - }) - case *mpegts.CodecMPEG1Audio: medi = &description.Media{ Type: description.MediaTypeAudio, diff --git a/internal/core/webrtc_session.go b/internal/core/webrtc_session.go index ab988b34..3fd1b951 100644 --- a/internal/core/webrtc_session.go +++ b/internal/core/webrtc_session.go @@ -514,12 +514,12 @@ func (s *webRTCSession) runRead() (int, error) { writer := asyncwriter.New(s.writeQueueSize, s) + defer res.stream.RemoveReader(writer) + for _, track := range tracks { track.start(res.stream, writer) } - defer res.stream.RemoveReader(writer) - s.Log(logger.Info, "is reading from path '%s', %s", res.path.name, sourceMediaInfo(webrtcMediasOfOutgoingTracks(tracks)))