diff --git a/hls/hls_stream.go b/hls/hls_stream.go index 2058955..a9a420f 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -48,8 +48,7 @@ func (t *transStream) Input(packet utils.AVPacket) error { if (!t.ExistVideo || utils.AVMediaTypeVideo == packet.MediaType() && packet.KeyFrame()) && float32(t.muxer.Duration())/90000 >= float32(t.duration) { //保存当前切片文件 if t.context.file != nil { - err := t.flushSegment() - t.context.segmentSeq++ + err := t.flushSegment(false) if err != nil { return err } @@ -116,7 +115,7 @@ func (t *transStream) onTSAlloc(size int) []byte { return t.context.writeBuffer[t.context.writeBufferSize : t.context.writeBufferSize+size] } -func (t *transStream) flushSegment() error { +func (t *transStream) flushSegment(end bool) error { //将剩余数据写入缓冲区 if t.context.writeBufferSize > 0 { _, _ = t.context.file.Write(t.context.writeBuffer[:t.context.writeBufferSize]) @@ -145,6 +144,10 @@ func (t *transStream) flushSegment() error { } m3u8Txt := t.m3u8.ToString() + if end { + m3u8Txt += "#EXT-X-ENDLIST" + } + if _, err := t.m3u8File.Write([]byte(m3u8Txt)); err != nil { return err } @@ -161,20 +164,25 @@ func (t *transStream) flushSegment() error { // 创建一个新的ts切片 func (t *transStream) createSegment() error { + t.muxer.Reset() + defer func() { + t.context.segmentSeq++ + }() + tsName := fmt.Sprintf(t.tsFormat, t.context.segmentSeq) //ts文件 t.context.path = fmt.Sprintf("%s/%s", t.dir, tsName) //m3u8列表中切片的url t.context.url = fmt.Sprintf("%s%s", t.tsUrl, tsName) - file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE, 0666) + file, err := os.OpenFile(t.context.path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) if err != nil { + log.Sugar.Errorf("创建ts切片文件失败 err:%s path:%s", err.Error(), t.context.path) return err } - t.context.file = file - t.muxer.Reset() - err = t.muxer.WriteHeader() + t.context.file = file + _ = t.muxer.WriteHeader() return err } @@ -182,7 +190,7 @@ func (t *transStream) Close() error { var err error if t.context.file != nil { - err = t.flushSegment() + err = t.flushSegment(true) err = t.context.file.Close() t.context.file = nil } @@ -202,8 +210,6 @@ func (t *transStream) Close() error { func DeleteOldSegments(id string) { var index int - //先删除旧的m3u8文件 - _ = os.Remove(stream.AppConfig.Hls.M3U8Path(id)) for ; ; index++ { path := stream.AppConfig.Hls.TSPath(id, strconv.Itoa(index)) fileInfo, err := os.Stat(path) @@ -272,6 +278,8 @@ func NewTransStream(dir, m3u8Name, tsFormat, tsUrl string, segmentDuration, play func TransStreamFactory(source stream.Source, protocol stream.Protocol, streams []utils.AVStream) (stream.TransStream, error) { id := source.Id() + //先删除旧的m3u8文件 + _ = os.Remove(stream.AppConfig.Hls.M3U8Path(id)) //删除旧的切片文件 go DeleteOldSegments(id) return NewTransStream(stream.AppConfig.Hls.M3U8Dir(id), stream.AppConfig.Hls.M3U8Format(id), stream.AppConfig.Hls.TSFormat(id), "", stream.AppConfig.Hls.Duration, stream.AppConfig.Hls.PlaylistLength) diff --git a/stream/source.go b/stream/source.go index 5a131e0..49b2cd0 100644 --- a/stream/source.go +++ b/stream/source.go @@ -197,7 +197,9 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re s.playingEventQueue = make(chan Sink, 128) s.playingDoneEventQueue = make(chan Sink, 128) s.probeTimoutEvent = make(chan bool) +} +func (s *PublishSource) CreateDefaultOutStreams() { if s.transStreams == nil { s.transStreams = make(map[TransStreamId]TransStream, 10) } @@ -209,13 +211,16 @@ func (s *PublishSource) Init(inputCB func(data []byte) error, closeCB func(), re //创建HLS输出流 if AppConfig.Hls.Enable { - hlsStream, err := CreateTransStream(s, ProtocolHls, nil) + streams := s.OriginStreams() + utils.Assert(len(streams) > 0) + + hlsStream, err := s.CreateTransStream(ProtocolHls, streams) if err != nil { panic(err) } s.hlsStream = hlsStream - s.transStreams[0x100] = s.hlsStream + s.transStreams[GenerateTransStreamId(ProtocolHls, streams...)] = s.hlsStream } } @@ -301,6 +306,25 @@ func IsSupportMux(protocol Protocol, audioCodecId, videoCodecId utils.AVCodecID) return true } +func (s *PublishSource) CreateTransStream(protocol Protocol, streams []utils.AVStream) (TransStream, error) { + log.Sugar.Debugf("创建%s-stream source:%s", protocol.ToString(), s.Id_) + + transStream, err := CreateTransStream(s, protocol, streams) + if err != nil { + log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_) + return nil, err + } + + for _, avStream := range streams { + transStream.AddTrack(avStream) + } + + transStream.Init() + _ = transStream.WriteHeader() + + return transStream, err +} + func (s *PublishSource) AddSink(sink Sink) bool { // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() @@ -354,24 +378,15 @@ func (s *PublishSource) AddSink(sink Sink) bool { if s.transStreams == nil { s.transStreams = make(map[TransStreamId]TransStream, 10) } - //创建一个新的传输流 - log.Sugar.Debugf("创建%s-stream source:%s", sink.Protocol().ToString(), s.Id_) var err error - transStream, err = CreateTransStream(s, sink.Protocol(), streams[:size]) + transStream, err = s.CreateTransStream(sink.Protocol(), streams[:size]) if err != nil { log.Sugar.Errorf("创建传输流失败 err:%s source:%s", err.Error(), s.Id_) return false } s.transStreams[transStreamId] = transStream - - for i := 0; i < size; i++ { - transStream.AddTrack(streams[i]) - } - - transStream.Init() - _ = transStream.WriteHeader() } sink.SetTransStreamId(transStreamId) @@ -564,20 +579,15 @@ func (s *PublishSource) writeHeader() { s.probeTimer.Stop() } + //创建录制流和HLS + s.CreateDefaultOutStreams() + sinks := PopWaitingSinks(s.Id_) for _, sink := range sinks { if !s.AddSink(sink) { sink.Close() } } - - if s.hlsStream != nil { - for _, stream_ := range s.originStreams.All() { - s.hlsStream.AddTrack(stream_) - } - - s.hlsStream.WriteHeader() - } } func (s *PublishSource) IsCompleted() bool { diff --git a/stream/trans_utils.go b/stream/trans_utils.go index 801ea9e..0bad1f8 100644 --- a/stream/trans_utils.go +++ b/stream/trans_utils.go @@ -2,7 +2,9 @@ package stream import "github.com/yangjiechina/avformat/utils" -// TransStreamId 每个传输流的唯一Id,由协议+流Id组成 +// TransStreamId 每个传输流的唯一Id,根据输出流协议ID+流包含的音视频编码器ID生成 +// 输出流协议ID占用高8位 +// 每个音视频编译器ID占用8位. 意味着每个输出流至多7路流. type TransStreamId uint64 var ( @@ -46,6 +48,7 @@ func init() { return TransStreamId(streamId) }*/ +// GenerateTransStreamId 根据输出流协议和输出流包含的音视频编码器ID生成流ID func GenerateTransStreamId(protocol Protocol, ids ...utils.AVStream) TransStreamId { len_ := len(ids) utils.Assert(len_ > 0 && len_ < 8)