Files
lkm/hls/hls_stream.go
2025-07-24 14:32:51 +08:00

320 lines
9.2 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package hls
import (
"fmt"
"github.com/lkmio/avformat"
"github.com/lkmio/avformat/collections"
"github.com/lkmio/avformat/utils"
"github.com/lkmio/lkm/log"
"github.com/lkmio/lkm/stream"
"github.com/lkmio/mpeg"
"os"
"path/filepath"
"strconv"
"unsafe"
)
type TransStream struct {
stream.BaseTransStream
muxer *mpeg.TSMuxer
ctx struct {
segmentSeq int // 切片序号
writeBuffer []byte // ts流的缓冲区, 由TSMuxer使用. 减少用户态和内核态交互以及磁盘IO频率
writeBufferSize int // 已缓存TS流大小
url string // @See TransStream.tsUrl
path string // ts切片位于磁盘中的绝对路径
file *os.File // ts切片文件句柄
}
M3U8Writer stream.M3U8Writer
m3u8Name string // m3u8文件名
m3u8File *os.File // m3u8文件句柄
dir string // m3u8文件父目录
tsUrl string // m3u8中每个url的前缀, 默认为空, 为了支持绝对路径访问:http://xxx/xxx/xxx.ts
tsFormat string // ts文件名格式
duration int // 切片时长, 单位秒
playlistLength int // 最大切片文件个数
PlaylistFormat *string // 位于内存中的m3u8播放列表每个sink都引用指针地址.
PlaylistFormatPtr []*collections.ReferenceCounter[[]byte] // string指针转byte[], 方便发送给sink
}
func (t *TransStream) Input(packet *avformat.AVPacket, index int) ([]*collections.ReferenceCounter[[]byte], int64, bool, error) {
// 创建一下个切片
// 已缓存时长>=指定时长, 如果存在视频, 还需要等遇到关键帧才切片
var newSegment bool
if (!t.HasVideo() || utils.AVMediaTypeVideo == packet.MediaType && packet.Key) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
// 保存当前切片文件
if t.ctx.file != nil {
err := t.flushSegment(false)
if err != nil {
return nil, -1, false, err
}
}
// 创建新的切片
if err := t.createSegment(); err != nil {
return nil, -1, false, err
}
newSegment = true
}
duration := packet.GetDuration(90000)
dts := t.Tracks[index].Dts
pts := t.Tracks[index].Pts
t.Tracks[index].Dts += duration
t.Tracks[index].Pts = t.Tracks[index].Dts + packet.GetPtsDtsDelta(90000)
data := packet.Data
if utils.AVMediaTypeVideo == packet.MediaType {
data = avformat.AVCCPacket2AnnexB(t.FindTrackWithStreamIndex(packet.Index).Stream, packet)
}
// 写入ts切片
length := len(data)
capacity := cap(t.ctx.writeBuffer)
for i := 0; i < length; {
if capacity-t.ctx.writeBufferSize < mpeg.TsPacketSize {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
bytes := t.ctx.writeBuffer[t.ctx.writeBufferSize : t.ctx.writeBufferSize+mpeg.TsPacketSize]
i += t.muxer.Input(bytes, index, data[i:], length, dts, pts, packet.Key, i == 0)
t.ctx.writeBufferSize += mpeg.TsPacketSize
}
// 缓存完第二个切片, 才响应发送m3u8文件. 如果一个切片就发, 播放器缓存少会卡顿.
if newSegment && t.M3U8Writer.Size() > 1 {
return t.PlaylistFormatPtr, -1, true, nil
}
return nil, -1, true, nil
}
func (t *TransStream) AddTrack(track *stream.Track) (int, error) {
var err error
var trackIndex int
if utils.AVMediaTypeVideo == track.Stream.MediaType {
data := track.Stream.CodecParameters.AnnexBExtraData()
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, data)
} else {
trackIndex, err = t.muxer.AddTrack(track.Stream.MediaType, track.Stream.CodecID, track.Stream.Data)
}
return trackIndex, err
}
func (t *TransStream) WriteHeader() error {
return t.createSegment()
}
// 写入新的TS切片更新M3U8
func (t *TransStream) flushSegment(end bool) error {
// 写入剩余TS包
if t.ctx.writeBufferSize > 0 {
_, _ = t.ctx.file.Write(t.ctx.writeBuffer[:t.ctx.writeBufferSize])
t.ctx.writeBufferSize = 0
}
if err := t.ctx.file.Close(); err != nil {
return err
}
// 删除多余的ts切片文件
if t.M3U8Writer.Size() >= t.playlistLength {
_ = os.Remove(t.M3U8Writer.Get(0).Path)
}
// 更新m3u8列表
duration := float32(t.muxer.Duration()) / 90000
t.M3U8Writer.AddSegment(duration, t.ctx.url, t.ctx.segmentSeq, t.ctx.path)
m3u8Txt := t.M3U8Writer.String()
//if end {
// m3u8Txt += "#EXT-X-ENDLIST"
//}
*t.PlaylistFormat = m3u8Txt
// 写入最新的m3u8到文件
if t.m3u8File != nil {
if _, err := t.m3u8File.Seek(0, 0); err != nil {
return err
} else if err = t.m3u8File.Truncate(0); err != nil {
return err
} else if _, err = t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
return err
}
}
return nil
}
// 创建一个新的ts切片
func (t *TransStream) createSegment() error {
t.muxer.Reset()
var tsFile *os.File
startSeq := t.ctx.segmentSeq + 1
for {
tsName := fmt.Sprintf(t.tsFormat, startSeq)
// ts文件
t.ctx.path = fmt.Sprintf("%s/%s", t.dir, tsName)
// m3u8列表中切片的url
t.ctx.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
file, err := os.OpenFile(t.ctx.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err == nil {
tsFile = file
break
}
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.ctx.path)
if os.IsPermission(err) || os.IsTimeout(err) || os.IsNotExist(err) {
return err
}
// 继续创建TS文件, 认为是文件名冲突, 并且文件已经被打开.
startSeq++
}
t.ctx.segmentSeq = startSeq
t.ctx.file = tsFile
n, err := t.muxer.WriteHeader(t.ctx.writeBuffer)
if err != nil {
return err
}
t.ctx.writeBufferSize = n
return nil
}
func (t *TransStream) Close() ([]*collections.ReferenceCounter[[]byte], int64, error) {
var err error
if t.ctx.file != nil {
err = t.flushSegment(true)
err = t.ctx.file.Close()
t.ctx.file = nil
}
if t.muxer != nil {
t.muxer.Close()
t.muxer = nil
}
if t.m3u8File != nil {
err = t.m3u8File.Close()
t.m3u8File = nil
}
return nil, 0, err
}
func stringPtrToBytes(ptr *string) []byte {
ptrAddr := uintptr(unsafe.Pointer(ptr))
return (*[unsafe.Sizeof(ptr)]byte)(unsafe.Pointer(&ptrAddr))[:]
}
func bytesToStringPtr(b []byte) *string {
ptrAddr := *(*uintptr)(unsafe.Pointer(&b[0]))
return (*string)(unsafe.Pointer(ptrAddr))
}
func DeleteOldSegments(id string) {
var index int
for ; ; index++ {
path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index))
fileInfo, err := os.Stat(path)
if err != nil && os.IsNotExist(err) {
break
} else if fileInfo.IsDir() {
continue
}
_ = os.Remove(path)
}
}
// NewTransStream 创建HLS传输流
// @Params dir m3u8的文件夹目录
// @Params m3u8Name m3u8文件名
// @Params tsFormat ts文件格式, 例如: %d.ts
// @Params tsUrl m3u8中ts切片的url前缀
// @Params parentDir 保存切片的绝对路径. mu38和ts切片放在同一目录下, 目录地址使用parentDir+urlPrefix
// @Params segmentDuration 单个切片时长
// @Params playlistLength 缓存多少个切片
func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, playlistLength int, seq int, playlistFormat *string, writer stream.M3U8Writer) (stream.TransStream, error) {
// 创建文件夹
m3u8Path := fmt.Sprintf("%s/%s", dir, m3u8Name)
if err := os.MkdirAll(filepath.Dir(m3u8Path), 0666); err != nil {
log.Sugar.Errorf("创建HLS目录失败 err: %s path: %s", err.Error(), m3u8Path)
return nil, err
}
// 创建m3u8文件
file, err := os.OpenFile(m3u8Path, os.O_CREATE|os.O_RDWR, 0666)
if err != nil {
log.Sugar.Errorf("创建m3u8文件失败 err: %s path: %s", err.Error(), m3u8Path)
//return nil, err
}
transStream := &TransStream{
m3u8Name: m3u8Name,
tsFormat: tsFormat,
tsUrl: tsUrl,
dir: dir,
duration: segmentDuration,
playlistLength: playlistLength,
}
if writer != nil {
transStream.M3U8Writer = writer
} else {
transStream.M3U8Writer = stream.NewM3U8Writer(playlistLength)
}
if playlistFormat != nil {
transStream.PlaylistFormat = playlistFormat
} else {
transStream.PlaylistFormat = new(string)
}
playlistFormatPtrCounter := collections.NewReferenceCounter[[]byte](stringPtrToBytes(transStream.PlaylistFormat))
transStream.PlaylistFormatPtr = append(transStream.PlaylistFormatPtr, playlistFormatPtrCounter)
// 创建TS封装器
muxer := mpeg.NewTSMuxer()
// 初始化ts封装上下文对象
transStream.ctx.segmentSeq = seq
transStream.ctx.writeBuffer = make([]byte, 1024*1024)
transStream.muxer = muxer
transStream.m3u8File = file
return transStream, nil
}
func TransStreamFactory(source stream.Source, _ stream.TransStreamProtocol, _ []*stream.Track, _ stream.Sink) (stream.TransStream, error) {
id := source.GetID()
var writer stream.M3U8Writer
var playlistFormat *string
startSeq := -1
endInfo := source.GetTransStreamPublisher().GetStreamEndInfo()
if endInfo != nil && endInfo.M3U8Writer != nil {
writer = endInfo.M3U8Writer
playlistFormat = endInfo.PlaylistFormat
startSeq = writer.Get(writer.Size() - 1).Sequence
}
// 删除旧的m3u8文件
//_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
// 删除旧的切片文件
//go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength, startSeq, playlistFormat, writer)
}