hls删除超出范围的ts切片

This commit is contained in:
yangjiechina
2024-03-10 14:32:00 +08:00
parent 596502d215
commit 14b297827e
3 changed files with 78 additions and 28 deletions

View File

@@ -11,7 +11,7 @@ type sink struct {
}
func NewSink(id stream.SinkId, sourceId string, w http.ResponseWriter) stream.ISink {
return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId}, w}
return &sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}, w}
}
func (s *sink) Input(data []byte) error {
@@ -23,3 +23,16 @@ func (s *sink) Input(data []byte) error {
return nil
}
type m3u8Sink struct {
stream.SinkImpl
}
func (s *m3u8Sink) Input(data []byte) error {
return nil
}
func NewM3U8Sink(id stream.SinkId, sourceId string, w http.ResponseWriter) stream.ISink {
return &m3u8Sink{stream.SinkImpl{Id_: id, SourceId_: sourceId, Protocol_: stream.ProtocolHls}}
}

View File

@@ -19,18 +19,19 @@ type tsContext struct {
file *os.File
}
type Stream struct {
type transStream struct {
stream.TransStreamImpl
muxer libmpeg.TSMuxer
context *tsContext
m3u8 M3U8Writer
url string
m3u8Name string
tsFormat string
dir string
duration int
m3u8File *os.File
m3u8 M3U8Writer
url string
m3u8Name string
tsFormat string
dir string
duration int
m3u8File *os.File
playlistLength int
}
// NewTransStream 创建HLS传输流
@@ -52,12 +53,13 @@ func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playli
return nil, err
}
stream_ := &Stream{
url: url,
m3u8Name: m3u8Name,
tsFormat: tsFormat,
dir: dir,
duration: segmentDuration,
stream_ := &transStream{
url: url,
m3u8Name: m3u8Name,
tsFormat: tsFormat,
dir: dir,
duration: segmentDuration,
playlistLength: playlistLength,
}
muxer := libmpeg.NewTSMuxer()
@@ -76,7 +78,7 @@ func NewTransStream(url, m3u8Name, tsFormat, dir string, segmentDuration, playli
return stream_, nil
}
func (t *Stream) Input(packet utils.AVPacket) error {
func (t *transStream) Input(packet utils.AVPacket) error {
if packet.Index() >= t.muxer.TrackCount() {
return fmt.Errorf("track not available")
}
@@ -95,7 +97,7 @@ func (t *Stream) Input(packet utils.AVPacket) error {
}
}
func (t *Stream) AddTrack(stream utils.AVStream) error {
func (t *transStream) AddTrack(stream utils.AVStream) error {
err := t.TransStreamImpl.AddTrack(stream)
if err != nil {
return err
@@ -114,15 +116,15 @@ func (t *Stream) AddTrack(stream utils.AVStream) error {
return err
}
func (t *Stream) WriteHeader() error {
func (t *transStream) WriteHeader() error {
return t.createSegment()
}
func (t *Stream) onTSWrite(data []byte) {
func (t *transStream) onTSWrite(data []byte) {
t.context.writeBufferSize += len(data)
}
func (t *Stream) onTSAlloc(size int) []byte {
func (t *transStream) onTSAlloc(size int) []byte {
n := len(t.context.writeBuffer) - t.context.writeBufferSize
if n < size {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
@@ -132,7 +134,7 @@ func (t *Stream) onTSAlloc(size int) []byte {
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
}
func (t *Stream) flushSegment() error {
func (t *transStream) flushSegment() error {
//将剩余数据写入缓冲区
if t.context.writeBufferSize > 0 {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
@@ -143,10 +145,15 @@ func (t *Stream) flushSegment() error {
return err
}
duration := float32(t.muxer.Duration()) / 90000
t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq)
//删除多余的ts切片文件
if t.m3u8.Size() >= t.playlistLength {
_ = os.Remove(t.m3u8.Head().path)
}
//更新m3u8
duration := float32(t.muxer.Duration()) / 90000
t.m3u8.AddSegment(duration, t.context.url, t.context.segmentSeq, t.context.path)
if _, err := t.m3u8File.Seek(0, 0); err != nil {
return err
}
@@ -162,13 +169,14 @@ func (t *Stream) flushSegment() error {
return nil
}
func (t *Stream) createSegment() error {
func (t *transStream) createSegment() error {
if t.context.file != nil {
err := t.flushSegment()
t.context.segmentSeq++
if err != nil {
return err
}
}
tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq)
@@ -185,7 +193,7 @@ func (t *Stream) createSegment() error {
return err
}
func (t *Stream) Close() error {
func (t *transStream) Close() error {
var err error
if t.context.file != nil {

View File

@@ -40,9 +40,13 @@ const (
//无BOM
type M3U8Writer interface {
AddSegment(duration float32, url string, sequence int)
AddSegment(duration float32, url string, sequence int, path string)
ToString() string
Size() int
Head() Segment
}
func NewM3U8Writer(len int) M3U8Writer {
@@ -56,6 +60,7 @@ type Segment struct {
duration float32
url string
sequence int
path string
}
type m3u8Writer struct {
@@ -63,12 +68,12 @@ type m3u8Writer struct {
playlist *stream.Queue
}
func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int) {
func (m *m3u8Writer) AddSegment(duration float32 /*title string,*/, url string, sequence int, path string) {
if m.playlist.IsFull() {
m.playlist.Pop()
}
m.playlist.Push(Segment{duration: duration, url: url, sequence: sequence})
m.playlist.Push(Segment{duration: duration, url: url, sequence: sequence, path: path})
}
func (m *m3u8Writer) targetDuration() int {
@@ -134,3 +139,27 @@ func (m *m3u8Writer) ToString() string {
return m.stringBuffer.String()
}
func (m *m3u8Writer) Size() int {
var size int
head, tail := m.playlist.Data()
if head != nil {
size += len(head)
}
if tail != nil {
size += len(tail)
}
return size
}
func (m *m3u8Writer) Head() Segment {
head, _ := m.playlist.Data()
if head != nil {
return head[0].(Segment)
}
return Segment{}
}