diff --git a/flv/http_flv.go b/flv/http_flv.go index 7a1977f..66accfc 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -33,6 +33,7 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error var videoKey bool var dts int64 var pts int64 + var keyBuffer bool dts = packet.ConvertDts(1000) pts = packet.ConvertPts(1000) @@ -48,8 +49,9 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error // 关键帧都放在切片头部,所以遇到关键帧创建新切片, 发送当前切片剩余流 if videoKey && !t.MWBuffer.IsNewSegment() { - segment := t.forceFlushSegment() + segment, key := t.forceFlushSegment() t.AppendOutStreamBuffer(segment) + keyBuffer = key } var n int @@ -73,12 +75,13 @@ func (t *TransStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error copy(bytes[n:], data) // 合并写满再发 - if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 { + if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 { // 已经分配末尾换行符内存 + keyBuffer = key t.AppendOutStreamBuffer(t.FormatSegment(segment)) } - return t.OutBuffer[:t.OutBufferSize], 0, true, nil + return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil } func (t *TransStream) AddTrack(track *stream.Track) error { @@ -91,8 +94,8 @@ func (t *TransStream) AddTrack(track *stream.Track) error { } else if utils.AVMediaTypeVideo == track.Stream.Type() { t.muxer.AddVideoTrack(track.Stream.CodecId()) - t.muxer.AddProperty("width", track.Stream.CodecParameters().Width()) - t.muxer.AddProperty("height", track.Stream.CodecParameters().Height()) + t.muxer.MetaData().AddNumberProperty("width", float64(track.Stream.CodecParameters().Width())) + t.muxer.MetaData().AddNumberProperty("height", float64(track.Stream.CodecParameters().Height())) } return nil } @@ -157,7 +160,7 @@ func (t *TransStream) Close() ([][]byte, int64, error) { // 发送剩余的流 if !t.MWBuffer.IsNewSegment() { - if segment := t.forceFlushSegment(); len(segment) > 0 { + if segment, _ := t.forceFlushSegment(); len(segment) > 0 { t.AppendOutStreamBuffer(segment) } } @@ -166,16 +169,16 @@ func (t *TransStream) Close() ([][]byte, int64, error) { } // 保存为完整的http-flv切片 -func (t *TransStream) forceFlushSegment() []byte { +func (t *TransStream) forceFlushSegment() ([]byte, bool) { // 预览末尾换行符 t.MWBuffer.Reserve(2) - segment := t.MWBuffer.FlushSegment() - return t.FormatSegment(segment) + segment, key := t.MWBuffer.FlushSegment() + return t.FormatSegment(segment), key } // GetHttpFLVBlock 跳过头部的无效数据,返回http-flv块 func (t *TransStream) GetHttpFLVBlock(data []byte) []byte { - return data[t.computeSkipCount(data):] + return data[t.computeSkipBytesSize(data):] } // FormatSegment 为切片添加包长和换行符 @@ -184,7 +187,7 @@ func (t *TransStream) FormatSegment(segment []byte) []byte { return t.GetHttpFLVBlock(segment) } -func (t *TransStream) computeSkipCount(data []byte) int { +func (t *TransStream) computeSkipBytesSize(data []byte) int { return int(6 + binary.BigEndian.Uint16(data[4:])) } @@ -220,7 +223,7 @@ func (t *TransStream) writeSeparator(dst []byte) { func NewHttpTransStream() stream.TransStream { return &TransStream{ - muxer: libflv.NewMuxer(), + muxer: libflv.NewMuxer(nil), header: make([]byte, 1024), headerSize: HttpFlvBlockHeaderSize, } diff --git a/rtc/rtc_sink.go b/rtc/rtc_sink.go index 744b7e8..ff4678a 100644 --- a/rtc/rtc_sink.go +++ b/rtc/rtc_sink.go @@ -30,7 +30,7 @@ func (s *Sink) StartStreaming(transStream stream.TransStream) error { // 创建PeerConnection var remoteTrack *webrtc.TrackLocalStaticSample - s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackCount()) + s.tracks = make([]*webrtc.TrackLocalStaticSample, transStream.TrackSize()) connection, err := webrtcApi.NewPeerConnection(webrtc.Configuration{}) connection.OnICECandidate(func(candidate *webrtc.ICECandidate) { diff --git a/rtmp/rtmp_stream.go b/rtmp/rtmp_stream.go index 54ea052..5a76da5 100644 --- a/rtmp/rtmp_stream.go +++ b/rtmp/rtmp_stream.go @@ -34,6 +34,7 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error var chunkPayloadOffset int var dts int64 var pts int64 + var keyBuffer bool dts = packet.ConvertDts(1000) pts = packet.ConvertPts(1000) @@ -55,15 +56,15 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error // 遇到视频关键帧, 发送剩余的流, 创建新切片 if videoKey { - if segment := t.MWBuffer.FlushSegment(); len(segment) > 0 { + if segment, key := t.MWBuffer.FlushSegment(); len(segment) > 0 { + keyBuffer = key t.AppendOutStreamBuffer(segment) } } - // 分配内存 - // 固定type0 + // type为0的header大小 chunkHeaderSize := 12 - // type3chunk数量 + // type为3的chunk数量 numChunks := (payloadSize - 1) / t.chunkSize rtmpMsgSize := chunkHeaderSize + payloadSize + numChunks // 如果时间戳超过3字节, 每个chunk都需要多4字节的扩展时间戳 @@ -71,29 +72,32 @@ func (t *transStream) Input(packet utils.AVPacket) ([][]byte, int64, bool, error rtmpMsgSize += (1 + numChunks) * 4 } + // 分配指定大小的内存 allocate := t.MWBuffer.Allocate(rtmpMsgSize, dts, videoKey) - // 写chunk header + // 写第一个type为0的chunk header chunk.Length = payloadSize chunk.Timestamp = uint32(dts) n := chunk.MarshalHeader(allocate) - // 写flv + // 封装成flv if videoPkt { n += t.muxer.WriteVideoData(allocate[n:], uint32(ct), packet.KeyFrame(), false) } else { n += t.muxer.WriteAudioData(allocate[n:], false) } + // 将flv data写入chunk body n += chunk.WriteBody(allocate[n:], data, t.chunkSize, chunkPayloadOffset) utils.Assert(len(allocate) == n) // 合并写满了再发 - if segment := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 { + if segment, key := t.MWBuffer.PeekCompletedSegment(); len(segment) > 0 { + keyBuffer = key t.AppendOutStreamBuffer(segment) } - return t.OutBuffer[:t.OutBufferSize], 0, true, nil + return t.OutBuffer[:t.OutBufferSize], 0, keyBuffer, nil } func (t *transStream) ReadExtraData(_ int64) ([][]byte, int64, error) { diff --git a/rtsp/rtsp_sink.go b/rtsp/rtsp_sink.go index d7abf0d..1638fb8 100644 --- a/rtsp/rtsp_sink.go +++ b/rtsp/rtsp_sink.go @@ -26,12 +26,12 @@ type Sink struct { } func (s *Sink) StartStreaming(transStream stream.TransStream) error { - utils.Assert(transStream.TrackCount() > 0) + utils.Assert(transStream.TrackSize() > 0) if s.senders != nil { return nil } - s.senders = make([]*librtp.RtpSender, transStream.TrackCount()) + s.senders = make([]*librtp.RtpSender, transStream.TrackSize()) // sdp回调给sink, sink应答给describe请求 if s.cb != nil { s.cb(transStream.(*TransStream).sdp) diff --git a/stream/mw_buffer.go b/stream/mw_buffer.go index cf18a93..90e9fc7 100644 --- a/stream/mw_buffer.go +++ b/stream/mw_buffer.go @@ -11,21 +11,22 @@ import ( type MergeWritingBuffer interface { Allocate(size int, ts int64, videoKey bool) []byte - // PeekCompletedSegment 返回当前完整切片, 如果不满, 返回nil. - PeekCompletedSegment() []byte + // PeekCompletedSegment 返回当前完整切片, 以及是否是关键帧切片, 未满返回nil. + PeekCompletedSegment() ([]byte, bool) - // FlushSegment 保存当前切片, 创建新的切片 - FlushSegment() []byte + // FlushSegment 生成并返回当前切片, 以及是否是关键帧切片. + FlushSegment() ([]byte, bool) + // IsFull 当前切片已满 IsFull(ts int64) bool - // IsNewSegment 新切片, 还未写数据 + // IsNewSegment 当前切片是否还未写数据 IsNewSegment() bool // Reserve 从当前切片中预留指定长度数据 - Reserve(number int) + Reserve(length int) - // ReadSegmentsFromKeyFrameIndex 从最近的关键帧读取切片 + // ReadSegmentsFromKeyFrameIndex 返回最近的关键帧切片 ReadSegmentsFromKeyFrameIndex(cb func([]byte)) Capacity() int @@ -36,6 +37,7 @@ type mwBlock struct { keyVideo bool buffer collections.MemoryPool completed bool + Time int64 } type mergeWritingBuffer struct { @@ -45,7 +47,7 @@ type mergeWritingBuffer struct { startTS int64 // 当前切片的开始时间 duration int // 当前切片时长 - lastKeyFrameIndex int // 最新关键帧所在切片的索引 + lastKeyFrameIndex int // 最近的关键帧所在切片的索引 keyFrameCount int // 关键帧计数 existVideo bool // 是否存在视频 @@ -55,9 +57,9 @@ type mergeWritingBuffer struct { func (m *mergeWritingBuffer) createMWBlock(videoKey bool) mwBlock { if videoKey { - return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength), false} + return mwBlock{true, videoKey, collections.NewDirectMemoryPool(m.keyFrameBufferMaxLength), false, 0} } else { - return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength), false} + return mwBlock{true, false, collections.NewDirectMemoryPool(m.nonKeyFrameBufferMaxLength), false, 0} } } @@ -77,17 +79,18 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte utils.Assert(ts != -1) - //新的切片 + // 新的切片 if m.startTS == -1 { m.startTS = ts if m.mwBlocks[m.index].buffer == nil { - //创建内存块 + // 创建内存块 m.mwBlocks[m.index] = m.createMWBlock(videoKey) } else { - //循环使用 + // 循环使用 m.mwBlocks[m.index].buffer.Clear() + // 关键帧被覆盖, 减少计数 if m.mwBlocks[m.index].keyVideo { m.keyFrameCount-- } @@ -96,14 +99,15 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte m.mwBlocks[m.index].free = false m.mwBlocks[m.index].completed = false m.mwBlocks[m.index].keyVideo = videoKey + m.mwBlocks[m.index].Time = ts } if videoKey { - //请务必确保关键帧帧从新的切片开始 - //外部遇到关键帧请先调用FlushSegment + // 请务必确保关键帧帧从新的切片开始 + // 外部遇到关键帧请先调用FlushSegment utils.Assert(m.mwBlocks[m.index].buffer.IsEmpty()) - m.lastKeyFrameIndex = m.index - m.keyFrameCount++ + //m.lastKeyFrameIndex = m.index + //m.keyFrameCount++ } if ts < m.startTS { @@ -114,49 +118,61 @@ func (m *mergeWritingBuffer) Allocate(size int, ts int64, videoKey bool) []byte return m.mwBlocks[m.index].buffer.Allocate(size) } -func (m *mergeWritingBuffer) FlushSegment() []byte { +func (m *mergeWritingBuffer) FlushSegment() ([]byte, bool) { if !AppConfig.GOPCache || !m.existVideo { - return nil + return nil, false } else if m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free { - return nil + return nil, false } data, _ := m.mwBlocks[m.index].buffer.Data() if len(data) == 0 { - return nil + return nil, false } - //更新缓冲长度 + key := m.mwBlocks[m.index].keyVideo + if key { + m.lastKeyFrameIndex = m.index + m.keyFrameCount++ + } + + // 计算最大切片数据长度,后续创建新切片按照最大长度分配内存空间 if m.lastKeyFrameIndex == m.index && m.keyFrameBufferMaxLength < len(data) { m.keyFrameBufferMaxLength = len(data) * 3 / 2 } else if m.lastKeyFrameIndex != m.index && m.nonKeyFrameBufferMaxLength < len(data) { m.nonKeyFrameBufferMaxLength = len(data) * 3 / 2 } + // 设置当前切片的完整性 + m.mwBlocks[m.index].completed = true + + // 分配下一个切片 capacity := cap(m.mwBlocks) if m.index+1 == capacity && m.keyFrameCount == 1 { m.grow() + capacity = cap(m.mwBlocks) } + // 计算下一个切片索引 m.index = (m.index + 1) % capacity - m.mwBlocks[m.index].completed = true + // 清空下一个切片的标记 m.startTS = -1 m.duration = 0 m.mwBlocks[m.index].free = true m.mwBlocks[m.index].completed = false - return data + return data, key } -func (m *mergeWritingBuffer) PeekCompletedSegment() []byte { +func (m *mergeWritingBuffer) PeekCompletedSegment() ([]byte, bool) { if !AppConfig.GOPCache || !m.existVideo { data, _ := m.mwBlocks[0].buffer.Data() m.mwBlocks[0].buffer.Clear() - return data + return data, false } if m.duration < AppConfig.MergeWriteLatency { - return nil + return nil, false } return m.FlushSegment() @@ -174,10 +190,10 @@ func (m *mergeWritingBuffer) IsNewSegment() bool { return m.mwBlocks[m.index].buffer == nil || m.mwBlocks[m.index].free } -func (m *mergeWritingBuffer) Reserve(number int) { +func (m *mergeWritingBuffer) Reserve(length int) { utils.Assert(m.mwBlocks[m.index].buffer != nil) - _ = m.mwBlocks[m.index].buffer.Allocate(number) + _ = m.mwBlocks[m.index].buffer.Allocate(length) } func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { @@ -185,18 +201,25 @@ func (m *mergeWritingBuffer) ReadSegmentsFromKeyFrameIndex(cb func([]byte)) { return } - for i := m.lastKeyFrameIndex; i < cap(m.mwBlocks); i++ { - if m.mwBlocks[i].buffer == nil || !m.mwBlocks[i].completed { - continue - } + ranges := [2][2]int{{-1, -1}, {-1, -1}} + if m.lastKeyFrameIndex <= m.index { + ranges[0][0] = m.lastKeyFrameIndex + ranges[0][1] = m.index + 1 + } else { + // 回环, 先遍历后面和前面的数据 + ranges[0][0] = m.lastKeyFrameIndex + ranges[0][1] = cap(m.mwBlocks) - data, _ := m.mwBlocks[i].buffer.Data() - cb(data) + ranges[1][0] = 0 + ranges[1][1] = m.index + 1 } - //回调循环使用的头部数据 - if m.index < m.lastKeyFrameIndex { - for i := 0; i < m.index; i++ { + for _, index := range ranges { + for i := index[0]; i > -1 && i < index[1]; i++ { + if m.mwBlocks[i].buffer == nil || !m.mwBlocks[i].completed { + break + } + data, _ := m.mwBlocks[i].buffer.Data() cb(data) } @@ -217,7 +240,7 @@ func NewMergeWritingBuffer(existVideo bool) MergeWritingBuffer { } if !existVideo || !AppConfig.GOPCache { - blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100), false} + blocks[0] = mwBlock{true, false, collections.NewDirectMemoryPool(1024 * 100), false, 0} } return &mergeWritingBuffer{ diff --git a/stream/source.go b/stream/source.go index 4081f28..4f834ee 100644 --- a/stream/source.go +++ b/stream/source.go @@ -300,7 +300,9 @@ func IsSupportMux(protocol TransStreamProtocol, audioCodecId, videoCodecId utils func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) { log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), s.ID) - transStream, err := CreateTransStream(s, protocol, tracks) + source := SourceManager.Find(s.ID) + utils.Assert(source != nil) + transStream, err := CreateTransStream(source, protocol, tracks) if err != nil { log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID) return nil, err @@ -313,7 +315,7 @@ func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStream } transStream.SetID(id) - transStream.SetTransStreamProtocol(protocol) + transStream.SetProtocol(protocol) // 创建输出流对应的拉流队列 s.TransStreamSinks[id] = make(map[SinkID]Sink, 128) @@ -461,6 +463,15 @@ func (s *PublishSource) doAddSink(sink Sink) bool { return true } + // 累加拉流计数 + if s.recordSink != sink { + s.sinkCount++ + log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) + } + + s.sinks[sink.GetID()] = sink + s.TransStreamSinks[transStreamId][sink.GetID()] = sink + // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. conn, ok := sink.GetConn().(*transport.Conn) if ok && sink.IsTCPStreaming() && transStream.OutStreamBufferCapacity() > 2 { @@ -478,15 +489,6 @@ func (s *PublishSource) doAddSink(sink Sink) bool { s.write(sink, 0, data, timestamp) } - // 累加拉流计数 - if s.recordSink != sink { - s.sinkCount++ - log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) - } - - s.sinks[sink.GetID()] = sink - s.TransStreamSinks[transStreamId][sink.GetID()] = sink - // 新建传输流,发送已经缓存的音视频帧 if !exist && AppConfig.GOPCache && s.existVideo { s.DispatchGOPBuffer(transStream) diff --git a/stream/trans_stream.go b/stream/trans_stream.go index b982279..c3e9e33 100644 --- a/stream/trans_stream.go +++ b/stream/trans_stream.go @@ -10,37 +10,42 @@ type TransStream interface { SetID(id TransStreamID) + // Input 封装传输流, 返回合并写块、时间戳、合并写块是否包含视频关键帧 Input(packet utils.AVPacket) ([][]byte, int64, bool, error) AddTrack(track *Track) error - TrackCount() int + TrackSize() int GetTracks() []*Track + // WriteHeader track添加完毕, 通过调用此函数告知 WriteHeader() error // GetProtocol 返回输出流协议 GetProtocol() TransStreamProtocol - // ReadExtraData 获取封装后的编码器扩展数据 + SetProtocol(protocol TransStreamProtocol) + + // ReadExtraData 读取传输流的编码器扩展数据 ReadExtraData(timestamp int64) ([][]byte, int64, error) - // ReadKeyFrameBuffer 读取已经缓存的包含关键视频帧的输出流 + // ReadKeyFrameBuffer 读取最近的包含视频关键帧的合并写队列 ReadKeyFrameBuffer() ([][]byte, int64, error) + // Close 关闭传输流, 返回还未flush的合并写块 Close() ([][]byte, int64, error) + // ClearOutStreamBuffer 清空传输流的合并写块队列 ClearOutStreamBuffer() + // AppendOutStreamBuffer 添加合并写块到队列 AppendOutStreamBuffer(buffer []byte) - // OutStreamBufferCapacity 返回输出流缓冲区的容量大小, 输出流缓冲区同时作为向sink推流的发送缓冲区, 容量大小决定向sink异步推流的队列大小; + // OutStreamBufferCapacity 返回合并写块队列容量大小, 作为sink异步推流的队列大小; OutStreamBufferCapacity() int IsExistVideo() bool - - SetTransStreamProtocol(protocol TransStreamProtocol) } type BaseTransStream struct { @@ -50,8 +55,8 @@ type BaseTransStream struct { ExistVideo bool Protocol TransStreamProtocol - OutBuffer [][]byte // 输出流的返回队列 - OutBufferSize int + OutBuffer [][]byte // 传输流的合并写块队列 + OutBufferSize int // 传输流返合并写块队列大小 } func (t *BaseTransStream) GetID() TransStreamID { @@ -82,6 +87,10 @@ func (t *BaseTransStream) GetProtocol() TransStreamProtocol { return t.Protocol } +func (t *BaseTransStream) SetProtocol(protocol TransStreamProtocol) { + t.Protocol = protocol +} + func (t *BaseTransStream) ClearOutStreamBuffer() { t.OutBufferSize = 0 } @@ -106,7 +115,7 @@ func (t *BaseTransStream) OutStreamBufferCapacity() int { return 0 } -func (t *BaseTransStream) TrackCount() int { +func (t *BaseTransStream) TrackSize() int { return len(t.Tracks) } @@ -126,10 +135,6 @@ func (t *BaseTransStream) ReadKeyFrameBuffer() ([][]byte, int64, error) { return nil, 0, nil } -func (t *BaseTransStream) SetTransStreamProtocol(protocol TransStreamProtocol) { - t.Protocol = protocol -} - type TCPTransStream struct { BaseTransStream