修复ffplay拉取hls流警告“Packet corrupt”

This commit is contained in:
yangjiechina
2024-07-01 20:38:32 +08:00
parent 5f619972c1
commit 4b1200d9ad
3 changed files with 52 additions and 31 deletions

View File

@@ -48,8 +48,7 @@ func (t *transStream) Input(packet utils.AVPacket) error {
if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()) && float32(t.muxer.Duration())/90000 >= float32(t.duration) {
//保存当前切片文件 //保存当前切片文件
if t.context.file != nil { if t.context.file != nil {
err := t.flushSegment() err := t.flushSegment(false)
t.context.segmentSeq++
if err != nil { if err != nil {
return err return err
} }
@@ -116,7 +115,7 @@ func (t *transStream) onTSAlloc(size int) []byte {
return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size] return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size]
} }
func (t *transStream) flushSegment() error { func (t *transStream) flushSegment(end bool) error {
//将剩余数据写入缓冲区 //将剩余数据写入缓冲区
if t.context.writeBufferSize > 0 { if t.context.writeBufferSize > 0 {
_, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize]) _, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize])
@@ -145,6 +144,10 @@ func (t *transStream) flushSegment() error {
} }
m3u8Txt := t.m3u8.ToString() m3u8Txt := t.m3u8.ToString()
if end {
m3u8Txt += "#EXT-X-ENDLIST"
}
if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil { if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil {
return err return err
} }
@@ -161,20 +164,25 @@ func (t *transStream) flushSegment() error {
// 创建一个新的ts切片 // 创建一个新的ts切片
func (t *transStream) createSegment() error { func (t *transStream) createSegment() error {
t.muxer.Reset()
defer func() {
t.context.segmentSeq++
}()
tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq) tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq)
//ts文件 //ts文件
t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName) t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName)
//m3u8列表中切片的url //m3u8列表中切片的url
t.context.url = fmt.Sprintf("%s%s", t.tsUrl, tsName) t.context.url = fmt.Sprintf("%s%s", t.tsUrl, tsName)
file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE, 0666) file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)
if err != nil { if err != nil {
log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.context.path)
return err return err
} }
t.context.file = file
t.muxer.Reset() t.context.file = file
err = t.muxer.WriteHeader() _ = t.muxer.WriteHeader()
return err return err
} }
@@ -182,7 +190,7 @@ func (t *transStream) Close() error {
var err error var err error
if t.context.file != nil { if t.context.file != nil {
err = t.flushSegment() err = t.flushSegment(true)
err = t.context.file.Close() err = t.context.file.Close()
t.context.file = nil t.context.file = nil
} }
@@ -202,8 +210,6 @@ func (t *transStream) Close() error {
func DeleteOldSegments(id string) { func DeleteOldSegments(id string) {
var index int var index int
//先删除旧的m3u8文件
_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
for ; ; index++ { for ; ; index++ {
path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index)) path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index))
fileInfo, err := os.Stat(path) fileInfo, err := os.Stat(path)
@@ -272,6 +278,8 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play
func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) { func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) {
id := source.Id() id := source.Id()
//先删除旧的m3u8文件
_ = os.Remove(stream.AppConfig.Hls.M3U8Path(id))
//删除旧的切片文件 //删除旧的切片文件
go DeleteOldSegments(id) go DeleteOldSegments(id)
return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength) return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength)

View File

@@ -197,7 +197,9 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re
s.playingEventQueue = make(chan Sink, 128) s.playingEventQueue = make(chan Sink, 128)
s.playingDoneEventQueue = make(chan Sink, 128) s.playingDoneEventQueue = make(chan Sink, 128)
s.probeTimoutEvent = make(chan bool) s.probeTimoutEvent = make(chan bool)
}
func (s *PublishSource) CreateDefaultOutStreams() {
if s.transStreams == nil { if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10) s.transStreams = make(map[TransStreamId]TransStream, 10)
} }
@@ -209,13 +211,16 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re
//创建HLS输出流 //创建HLS输出流
if AppConfig.Hls.Enable { if AppConfig.Hls.Enable {
hlsStream, err := CreateTransStream(s, ProtocolHls, nil) streams := s.OriginStreams()
utils.Assert(len(streams) > 0)
hlsStream, err := s.CreateTransStream(ProtocolHls, streams)
if err != nil { if err != nil {
panic(err) panic(err)
} }
s.hlsStream = hlsStream s.hlsStream = hlsStream
s.transStreams[0x100] = s.hlsStream s.transStreams[GenerateTransStreamId(ProtocolHls, streams...)] = s.hlsStream
} }
} }
@@ -301,6 +306,25 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID)
return true return true
} }
func (s *PublishSource) CreateTransStream(protocol Protocol, streams []utils.AVStream) (TransStream, error) {
log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.Id_)
transStream, err := CreateTransStream(s, protocol, streams)
if err != nil {
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_)
return nil, err
}
for _, avStream := range streams {
transStream.AddTrack(avStream)
}
transStream.Init()
_ = transStream.WriteHeader()
return transStream, err
}
func (s *PublishSource) AddSink(sink Sink) bool { func (s *PublishSource) AddSink(sink Sink) bool {
// 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 // 暂时不考虑多路视频流意味着只能1路视频流和多路音频流同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致
audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId()
@@ -354,24 +378,15 @@ func (s *PublishSource) AddSink(sink Sink) bool {
if s.transStreams == nil { if s.transStreams == nil {
s.transStreams = make(map[TransStreamId]TransStream, 10) s.transStreams = make(map[TransStreamId]TransStream, 10)
} }
//创建一个新的传输流
log.Sugar.Debugf("创建%s-stream source:%s", sink.Protocol().ToString(), s.Id_)
var err error var err error
transStream, err = CreateTransStream(s, sink.Protocol(), streams[:size]) transStream, err = s.CreateTransStream(sink.Protocol(), streams[:size])
if err != nil { if err != nil {
log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_) log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_)
return false return false
} }
s.transStreams[transStreamId] = transStream s.transStreams[transStreamId] = transStream
for i := 0; i < size; i++ {
transStream.AddTrack(streams[i])
}
transStream.Init()
_ = transStream.WriteHeader()
} }
sink.SetTransStreamId(transStreamId) sink.SetTransStreamId(transStreamId)
@@ -564,20 +579,15 @@ func (s *PublishSource) writeHeader() {
s.probeTimer.Stop() s.probeTimer.Stop()
} }
//创建录制流和HLS
s.CreateDefaultOutStreams()
sinks := PopWaitingSinks(s.Id_) sinks := PopWaitingSinks(s.Id_)
for _, sink := range sinks { for _, sink := range sinks {
if !s.AddSink(sink) { if !s.AddSink(sink) {
sink.Close() sink.Close()
} }
} }
if s.hlsStream != nil {
for _, stream_ := range s.originStreams.All() {
s.hlsStream.AddTrack(stream_)
}
s.hlsStream.WriteHeader()
}
} }
func (s *PublishSource) IsCompleted() bool { func (s *PublishSource) IsCompleted() bool {

View File

@@ -2,7 +2,9 @@ package stream
import "github.com/yangjiechina/avformat/utils" import "github.com/yangjiechina/avformat/utils"
// TransStreamId 每个传输流的唯一Id由协议+流Id组 // TransStreamId 每个传输流的唯一Id根据输出流协议ID+流包含的音视频编码器ID生
// 输出流协议ID占用高8位
// 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流.
type TransStreamId uint64 type TransStreamId uint64
var ( var (
@@ -46,6 +48,7 @@ func init() {
return TransStreamId(streamId) return TransStreamId(streamId)
}*/ }*/
// GenerateTransStreamId 根据输出流协议和输出流包含的音视频编码器ID生成流ID
func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId { func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId {
len_ := len(ids) len_ := len(ids)
utils.Assert(len_ > 0 && len_ < 8) utils.Assert(len_ > 0 && len_ < 8)