From 14b297827e88de2834a531e77be6d96c39aa883c Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Sun, 10 Mar 2024 14:32:00 +0800 Subject: [PATCH] =?UTF-8?q?hls=E5=88=A0=E9=99=A4=E8=B6=85=E5=87=BA?= =?UTF-8?q?=E8=8C=83=E5=9B=B4=E7=9A=84ts=E5=88=87=E7=89=87?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hls/hls_sink.go | 15 ++++++++++++- hls/hls_stream.go | 56 +++++++++++++++++++++++++++-------------------- hls/m3u8.go | 35 ++++++++++++++++++++++++++--- 3 files changed, 78 insertions(+), 28 deletions(-) diff --git a/hls/hls_sink.go b/hls/hls_sink.go index 1a8afd1..fb78720 100644 --- a/hls/hls_sink.go +++ b/hls/hls_sink.go @@ -11,7 +11,7 @@ type sink struct { } func NewSink(id stream.SinkId, sourceId string, w http.ResponseWriter) stream.ISink { - return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId}, w} + return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, w} } func (s *sink) Input(data []byte) error { @@ -23,3 +23,16 @@ func (s *sink) Input(data []byte) error { return nil } + +type m3u8Sink struct { + stream.SinkImpl +} + +func (s *m3u8Sink) Input(data []byte) error { + + return nil +} + +func NewM3U8Sink(id stream.SinkId, sourceId string, w http.ResponseWriter) stream.ISink { + return &m3u8Sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}} +} diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 3c24bff..feecf9a 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -19,18 +19,19 @@ type tsContext struct { file *os.File } -type Stream struct { +type transStream struct { stream.TransStreamImpl muxer libmpeg.TSMuxer context *tsContext - m3u8 M3U8Writer - url string - m3u8Name string - tsFormat string - dir string - duration int - m3u8File *os.File + m3u8 M3U8Writer + url string + m3u8Name string + tsFormat string + dir string + duration int + m3u8File *os.File + playlistLength int } // NewTransStream 创建HLS传输流 @@ -52,12 +53,13 @@ func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playli return nil, err } - stream_ := &Stream{ - url: url, - m3u8Name: m3u8Name, - tsFormat: tsFormat, - dir: dir, - duration: segmentDuration, + stream_ := &transStream{ + url: url, + m3u8Name: m3u8Name, + tsFormat: tsFormat, + dir: dir, + duration: segmentDuration, + playlistLength: playlistLength, } muxer := libmpeg.NewTSMuxer() @@ -76,7 +78,7 @@ func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playli return stream_, nil } -func (t *Stream) Input(packet utils.AVPacket) error { +func (t *transStream) Input(packet utils.AVPacket) error { if packet.Index() >= t.muxer.TrackCount() { return fmt.Errorf("track not available") } @@ -95,7 +97,7 @@ func (t *Stream) Input(packet utils.AVPacket) error { } } -func (t *Stream) AddTrack(stream utils.AVStream) error { +func (t *transStream) AddTrack(stream utils.AVStream) error { err := t.TransStreamImpl.AddTrack(stream) if err != nil { return err @@ -114,15 +116,15 @@ func (t *Stream) AddTrack(stream utils.AVStream) error { return err } -func (t *Stream) WriteHeader() error { +func (t *transStream) WriteHeader() error { return t.createSegment() } -func (t *Stream) onTSWrite(data []byte) { +func (t *transStream) onTSWrite(data []byte) { t.context.writeBufferSize += len(data) } -func (t *Stream) onTSAlloc(size int) []byte { +func (t *transStream) onTSAlloc(size int) []byte { n := len(t.context.writeBuffer) - t.context.writeBufferSize if n < size { _, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize]) @@ -132,7 +134,7 @@ func (t *Stream) onTSAlloc(size int) []byte { return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size] } -func (t *Stream) flushSegment() error { +func (t *transStream) flushSegment() error { //将剩余数据写入缓冲区 if t.context.writeBufferSize > 0 { _, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize]) @@ -143,10 +145,15 @@ func (t *Stream) flushSegment() error { return err } - duration := float32(t.muxer.Duration()) / 90000 - t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq) + //删除多余的ts切片文件 + if t.m3u8.Size() >= t.playlistLength { + _ = os.Remove(t.m3u8.Head().path) + } //更新m3u8 + duration := float32(t.muxer.Duration()) / 90000 + t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path) + if _, err := t.m3u8File.Seek(0, 0); err != nil { return err } @@ -162,13 +169,14 @@ func (t *Stream) flushSegment() error { return nil } -func (t *Stream) createSegment() error { +func (t *transStream) createSegment() error { if t.context.file != nil { err := t.flushSegment() t.context.segmentSeq++ if err != nil { return err } + } tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq) @@ -185,7 +193,7 @@ func (t *Stream) createSegment() error { return err } -func (t *Stream) Close() error { +func (t *transStream) Close() error { var err error if t.context.file != nil { diff --git a/hls/m3u8.go b/hls/m3u8.go index 62fca69..322860c 100644 --- a/hls/m3u8.go +++ b/hls/m3u8.go @@ -40,9 +40,13 @@ const ( //无BOM type M3U8Writer interface { - AddSegment(duration float32, url string, sequence int) + AddSegment(duration float32, url string, sequence int, path string) ToString() string + + Size() int + + Head() Segment } func NewM3U8Writer(len int) M3U8Writer { @@ -56,6 +60,7 @@ type Segment struct { duration float32 url string sequence int + path string } type m3u8Writer struct { @@ -63,12 +68,12 @@ type m3u8Writer struct { playlist *stream.Queue } -func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int) { +func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) { if m.playlist.IsFull() { m.playlist.Pop() } - m.playlist.Push(Segment{duration: duration, url: url, sequence: sequence}) + m.playlist.Push(Segment{duration: duration, url: url, sequence: sequence, path: path}) } func (m *m3u8Writer) targetDuration() int { @@ -134,3 +139,27 @@ func (m *m3u8Writer) ToString() string { return m.stringBuffer.String() } + +func (m *m3u8Writer) Size() int { + var size int + head, tail := m.playlist.Data() + + if head != nil { + size += len(head) + } + + if tail != nil { + size += len(tail) + } + + return size +} + +func (m *m3u8Writer) Head() Segment { + head, _ := m.playlist.Data() + if head != nil { + return head[0].(Segment) + } + + return Segment{} +}