diff --git a/api.go b/api.go index 1a1a30a..0c79404 100644 --- a/api.go +++ b/api.go @@ -404,7 +404,7 @@ func (api *ApiServer) OnSourceList(w http.ResponseWriter, r *http.Request) { ID: source.GetID(), Protocol: source.GetType().String(), Time: source.CreateTime(), - SinkCount: source.SinkCount(), + SinkCount: source.GetTransStreamPublisher().SinkCount(), Bitrate: strconv.Itoa(source.GetBitrateStatistics().PreviousSecond()/1024) + "KBS", // 后续开发 Tracks: codecs, Urls: stream.GetStreamPlayUrls(source.GetID()), @@ -430,7 +430,7 @@ func (api *ApiServer) OnSinkList(v *IDS, w http.ResponseWriter, r *http.Request) } var details []SinkDetails - sinks := source.Sinks() + sinks := source.GetTransStreamPublisher().Sinks() for _, sink := range sinks { details = append(details, SinkDetails{ @@ -468,7 +468,7 @@ func (api *ApiServer) OnSinkClose(v *IDS, w http.ResponseWriter, r *http.Request } if source := stream.SourceManager.Find(v.Source); source != nil { - if sink := source.FindSink(sinkId); sink != nil { + if sink := source.GetTransStreamPublisher().FindSink(sinkId); sink != nil { sink.Close() } } else { diff --git a/bridge.go b/bridge.go index 4bc3491..3d15cfd 100644 --- a/bridge.go +++ b/bridge.go @@ -10,16 +10,13 @@ import ( // 处理不同包不能相互引用的需求 -func NewStreamEndInfo(source stream.Source) *stream.StreamEndInfo { - tracks := source.OriginTracks() - streams := source.GetTransStreams() - +func NewStreamEndInfo(source string, tracks []*stream.Track, streams map[stream.TransStreamID]stream.TransStream) *stream.StreamEndInfo { if len(tracks) < 1 || len(streams) < 1 { return nil } info := stream.StreamEndInfo{ - ID: source.GetID(), + ID: source, Timestamps: make(map[utils.AVCodecID][2]int64, len(tracks)), } diff --git a/flv/http_flv.go b/flv/http_flv.go index 95f60ad..49a728b 100644 --- a/flv/http_flv.go +++ b/flv/http_flv.go @@ -185,7 +185,7 @@ func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtoco var prevTagSize uint32 var metaData *amf0.Object - endInfo := source.GetStreamEndInfo() + endInfo := source.GetTransStreamPublisher().GetStreamEndInfo() if endInfo != nil { prevTagSize = endInfo.FLVPrevTagSize } diff --git a/gb28181/source.go b/gb28181/source.go index d80c9bd..551e00c 100644 --- a/gb28181/source.go +++ b/gb28181/source.go @@ -1,6 +1,7 @@ package gb28181 import ( + "encoding/binary" "fmt" "github.com/lkmio/avformat" "github.com/lkmio/avformat/utils" @@ -97,6 +98,8 @@ type BaseGBSource struct { audioPacketCreatedTime int64 videoPacketCreatedTime int64 isSystemClock bool // 推流时间戳不正确, 是否使用系统时间. + lastRtpTimestamp int64 + sameTimePackets [][]byte } func (source *BaseGBSource) Init(receiveQueueSize int) { @@ -108,19 +111,40 @@ func (source *BaseGBSource) Init(receiveQueueSize int) { source.SetType(stream.SourceType28181) source.probeBuffer = mpeg.NewProbeBuffer(PsProbeBufferSize) source.PublishSource.Init(receiveQueueSize) + source.lastRtpTimestamp = -1 } // Input 输入rtp包, 处理PS流, 负责解析->封装->推流 func (source *BaseGBSource) Input(data []byte) error { - // 国标级联转发 - if source.ForwardTransStream != nil { - packet := avformat.AVPacket{Data: data} - source.DispatchPacket(source.ForwardTransStream, &packet) - } - packet := rtp.Packet{} _ = packet.Unmarshal(data) + // 国标级联转发 + if source.GetTransStreamPublisher().GetTransStreams() != nil { + if source.lastRtpTimestamp == -1 { + source.lastRtpTimestamp = int64(packet.Timestamp) + } + + // 相同时间戳的RTP包, 积攒一起发送, 降低管道压力 + length := len(data) + if int64(packet.Timestamp) != source.lastRtpTimestamp { + source.lastRtpTimestamp = int64(packet.Timestamp) + if len(source.sameTimePackets) > 0 { + source.GetTransStreamPublisher().Post(&stream.StreamEvent{Type: stream.StreamEventTypeRawPacket, Data: source.sameTimePackets}) + source.sameTimePackets = nil + } + } + + if stream.UDPReceiveBufferSize-2 < length { + log.Sugar.Errorf("rtp包过大, 不转发. source: %s ssrc: %x size: %d", source.ID, source.ssrc, len(data)) + } else { + bytes := stream.UDPReceiveBufferPool.Get().([]byte) + copy(bytes[2:], data) + binary.BigEndian.PutUint16(bytes[:2], uint16(length)) + source.sameTimePackets = append(source.sameTimePackets, bytes[:2+length]) + } + } + var bytes []byte var n int var err error diff --git a/gb28181/tcp_session.go b/gb28181/tcp_session.go index 7497851..3a1b80c 100644 --- a/gb28181/tcp_session.go +++ b/gb28181/tcp_session.go @@ -36,6 +36,9 @@ func DecodeGBRTPOverTCPPacket(data []byte, source GBSource, decoder *transport.L } i += n + if bytes == nil { + break + } // 单端口模式,ssrc匹配source if source == nil || stream.SessionStateHandshakeSuccess == source.State() { diff --git a/hls/hls_stream.go b/hls/hls_stream.go index b7fd43b..9bf5303 100644 --- a/hls/hls_stream.go +++ b/hls/hls_stream.go @@ -299,7 +299,7 @@ func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtoco var playlistFormat *string startSeq := -1 - endInfo := source.GetStreamEndInfo() + endInfo := source.GetTransStreamPublisher().GetStreamEndInfo() if endInfo != nil && endInfo.M3U8Writer != nil { writer = endInfo.M3U8Writer playlistFormat = endInfo.PlaylistFormat diff --git a/rtsp/rtsp_handler.go b/rtsp/rtsp_handler.go index 23452f1..6400722 100644 --- a/rtsp/rtsp_handler.go +++ b/rtsp/rtsp_handler.go @@ -1,6 +1,5 @@ package rtsp - import ( "fmt" "github.com/lkmio/avformat/utils" @@ -234,7 +233,7 @@ func (h handler) OnPlay(request Request) (*http.Response, []byte, error) { return nil, nil, fmt.Errorf("Source with ID %s does not exist.", request.sourceId) } - source.AddSink(sink) + source.GetTransStreamPublisher().AddSink(sink) return response, nil, nil } diff --git a/rtsp/rtsp_stream.go b/rtsp/rtsp_stream.go index 8b57822..1bbb7d0 100644 --- a/rtsp/rtsp_stream.go +++ b/rtsp/rtsp_stream.go @@ -292,7 +292,7 @@ func NewTransStream(addr net.IPAddr, urlFormat string, oldTracks map[byte]uint16 func TransStreamFactory(source stream.Source, protocol stream.TransStreamProtocol, tracks []*stream.Track) (stream.TransStream, error) { trackFormat := "?track=%d" var oldTracks map[byte]uint16 - if endInfo := source.GetStreamEndInfo(); endInfo != nil { + if endInfo := source.GetTransStreamPublisher().GetStreamEndInfo(); endInfo != nil { oldTracks = endInfo.RtspTracks } diff --git a/stream/forward_sink.go b/stream/forward_sink.go index e9876fb..5c6d95c 100644 --- a/stream/forward_sink.go +++ b/stream/forward_sink.go @@ -43,7 +43,7 @@ func (f *ForwardSink) OnConnected(conn net.Conn) []byte { // 如果f.Conn赋值后, 发送数据先于EnableAsyncWriteMode执行, 可能会panic // 所以保险一点, 放在主协程执行 - ExecuteSyncEventOnSource(f.SourceID, func() { + ExecuteSyncEventOnTransStreamPublisher(f.SourceID, func() { f.Conn = conn f.BaseSink.EnableAsyncWriteMode(512) }) diff --git a/stream/gop_buffer.go b/stream/gop_buffer.go index ec0d363..aff3f2d 100644 --- a/stream/gop_buffer.go +++ b/stream/gop_buffer.go @@ -10,30 +10,30 @@ import ( type GOPBuffer interface { // AddPacket Return bool 缓存帧是否成功, 如果首帧非关键帧, 缓存失败 - AddPacket(packet *avformat.AVPacket) bool + AddPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) bool - PeekAll(handler func(packet *avformat.AVPacket)) + PeekAll(handler func(*collections.ReferenceCounter[*avformat.AVPacket])) - Peek(index int) *avformat.AVPacket + Peek(index int) *collections.ReferenceCounter[*avformat.AVPacket] - PopAll(handler func(packet *avformat.AVPacket)) + PopAll(handler func(*collections.ReferenceCounter[*avformat.AVPacket])) - RequiresClear(nextPacket *avformat.AVPacket) bool + RequiresClear(nextPacket *collections.ReferenceCounter[*avformat.AVPacket]) bool Size() int } type streamBuffer struct { - buffer collections.RingBuffer[*avformat.AVPacket] + buffer collections.RingBuffer[*collections.ReferenceCounter[*avformat.AVPacket]] hasVideoKeyFrame bool } -func (s *streamBuffer) AddPacket(packet *avformat.AVPacket) bool { - if utils.AVMediaTypeVideo == packet.MediaType { - if packet.Key { +func (s *streamBuffer) AddPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) bool { + if utils.AVMediaTypeVideo == packet.Get().MediaType { + if packet.Get().Key { s.hasVideoKeyFrame = true } else if !s.hasVideoKeyFrame { - // 丢弃首帧视频非关键帧 + // 丢弃首帧非关键视频帧 return false } } @@ -42,7 +42,7 @@ func (s *streamBuffer) AddPacket(packet *avformat.AVPacket) bool { return true } -func (s *streamBuffer) Peek(index int) *avformat.AVPacket { +func (s *streamBuffer) Peek(index int) *collections.ReferenceCounter[*avformat.AVPacket] { utils.Assert(index < s.buffer.Size()) head, tail := s.buffer.Data() @@ -53,7 +53,7 @@ func (s *streamBuffer) Peek(index int) *avformat.AVPacket { } } -func (s *streamBuffer) PeekAll(handler func(packet *avformat.AVPacket)) { +func (s *streamBuffer) PeekAll(handler func(packet *collections.ReferenceCounter[*avformat.AVPacket])) { head, tail := s.buffer.Data() if head != nil { @@ -73,7 +73,7 @@ func (s *streamBuffer) Size() int { return s.buffer.Size() } -func (s *streamBuffer) PopAll(handler func(packet *avformat.AVPacket)) { +func (s *streamBuffer) PopAll(handler func(packet *collections.ReferenceCounter[*avformat.AVPacket])) { for s.buffer.Size() > 0 { pkt := s.buffer.Pop() handler(pkt) @@ -82,10 +82,10 @@ func (s *streamBuffer) PopAll(handler func(packet *avformat.AVPacket)) { s.hasVideoKeyFrame = false } -func (s *streamBuffer) RequiresClear(nextPacket *avformat.AVPacket) bool { - return s.Size()+1 == s.buffer.Capacity() || (s.hasVideoKeyFrame && utils.AVMediaTypeVideo == nextPacket.MediaType && nextPacket.Key) +func (s *streamBuffer) RequiresClear(nextPacket *collections.ReferenceCounter[*avformat.AVPacket]) bool { + return s.Size()+1 == s.buffer.Capacity() || (s.hasVideoKeyFrame && utils.AVMediaTypeVideo == nextPacket.Get().MediaType && nextPacket.Get().Key) } func NewStreamBuffer() GOPBuffer { - return &streamBuffer{buffer: collections.NewRingBuffer[*avformat.AVPacket](1000), hasVideoKeyFrame: false} + return &streamBuffer{buffer: collections.NewRingBuffer[*collections.ReferenceCounter[*avformat.AVPacket]](1000), hasVideoKeyFrame: false} } diff --git a/stream/hook_sink.go b/stream/hook_sink.go index f4e9aa9..84f06b4 100644 --- a/stream/hook_sink.go +++ b/stream/hook_sink.go @@ -42,7 +42,7 @@ func PreparePlaySinkWithReady(sink Sink, ok bool) (*http.Response, utils.HookSta } } } else { - source.AddSink(sink) + source.GetTransStreamPublisher().AddSink(sink) } return response, utils.HookStateOK diff --git a/stream/hook_source.go b/stream/hook_source.go index 82d68bc..b5f9a0a 100644 --- a/stream/hook_source.go +++ b/stream/hook_source.go @@ -11,19 +11,20 @@ import ( func PreparePublishSource(source Source, hook bool) (*http.Response, utils.HookState) { var response *http.Response + if err := SourceManager.Add(source); err != nil { + return nil, utils.HookStateOccupy + } + if hook && AppConfig.Hooks.IsEnablePublishEvent() { rep, state := HookPublishEvent(source) if utils.HookStateOK != state { + _, _ = SourceManager.Remove(source.GetID()) return rep, state } response = rep } - if err := SourceManager.Add(source); err != nil { - return nil, utils.HookStateOccupy - } - source.SetCreateTime(time.Now()) urls := GetStreamPlayUrls(source.GetID()) diff --git a/stream/nonblock_channel.go b/stream/nonblock_channel.go new file mode 100644 index 0000000..d811add --- /dev/null +++ b/stream/nonblock_channel.go @@ -0,0 +1,54 @@ +package stream + +import "github.com/lkmio/avformat/collections" + +type NonBlockingChannel[T any] struct { + Channel chan T + PendingQueue *collections.LinkedList[T] + zero T +} + +func (p *NonBlockingChannel[T]) Post(event T) { + for oldSize := 0; p.PendingQueue.Size() > 0 && p.PendingQueue.Size() != oldSize; { + oldSize = p.PendingQueue.Size() + select { + case p.Channel <- p.PendingQueue.Get(0): + p.PendingQueue.Remove(0) + default: + break + } + } + + if p.PendingQueue.Size() != 0 { + p.PendingQueue.Add(event) + } else { + select { + case p.Channel <- event: + break + default: + p.PendingQueue.Add(event) + } + } +} + +func (p *NonBlockingChannel[T]) Pop() T { + if len(p.Channel) > 0 { + select { + case event := <-p.Channel: + return event + } + } + + if p.PendingQueue.Size() > 0 { + return p.PendingQueue.Remove(0) + } + + return p.zero +} + +func NewNonBlockingChannel[T any](size int) *NonBlockingChannel[T] { + return &NonBlockingChannel[T]{ + Channel: make(chan T, size), + PendingQueue: &collections.LinkedList[T]{}, + } +} diff --git a/stream/sink.go b/stream/sink.go index e535af5..ae2d678 100644 --- a/stream/sink.go +++ b/stream/sink.go @@ -97,8 +97,6 @@ type Sink interface { // EnableAsyncWriteMode 开启异步发送 EnableAsyncWriteMode(queueSize int) - - PendingSendQueueSize() int } type BaseSink struct { @@ -124,9 +122,7 @@ type BaseSink struct { totalDataSize atomic.Uint64 writtenDataSize atomic.Uint64 lastKeyVideoDataSegment *collections.ReferenceCounter[[]byte] - - pendingSendQueue chan *collections.ReferenceCounter[[]byte] // 等待发送的数据队列 - blockedBufferList *collections.LinkedList[*collections.ReferenceCounter[[]byte]] // 异步队列阻塞后的切片数据 + pendingSendQueue *NonBlockingChannel[*collections.ReferenceCounter[[]byte]] cancelFunc func() cancelCtx context.Context @@ -148,8 +144,8 @@ func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte firstSegment.Release() s.writtenDataSize.Add(uint64(len(firstSegment.Get()))) - for len(s.pendingSendQueue) > 0 { - buffer := <-s.pendingSendQueue + for len(s.pendingSendQueue.Channel) > 0 { + buffer := <-s.pendingSendQueue.Channel // 不存在视频, 清空队列 // 还没有追到最近的关键帧, 继续追帧 if s.lastKeyVideoDataSegment == nil || buffer != s.lastKeyVideoDataSegment { @@ -173,16 +169,9 @@ func (s *BaseSink) fastForward(firstSegment *collections.ReferenceCounter[[]byte func (s *BaseSink) doAsyncWrite() { defer func() { // 释放未发送的数据 - for len(s.pendingSendQueue) > 0 { - buffer := <-s.pendingSendQueue + for buffer := s.pendingSendQueue.Pop(); buffer != nil; buffer = s.pendingSendQueue.Pop() { buffer.Release() } - - for s.blockedBufferList.Size() > 0 { - buffer := s.blockedBufferList.Remove(0) - buffer.Release() - } - ReleasePendingBuffers(s.SourceID, s.TransStreamID) }() @@ -191,7 +180,7 @@ func (s *BaseSink) doAsyncWrite() { select { case <-s.cancelCtx.Done(): return - case data := <-s.pendingSendQueue: + case data := <-s.pendingSendQueue.Channel: // 追帧到最近的关键帧 if fastForward { var ok bool @@ -245,8 +234,7 @@ func (s *BaseSink) doAsyncWrite() { func (s *BaseSink) EnableAsyncWriteMode(queueSize int) { utils.Assert(s.Conn != nil) - s.pendingSendQueue = make(chan *collections.ReferenceCounter[[]byte], queueSize) - s.blockedBufferList = &collections.LinkedList[*collections.ReferenceCounter[[]byte]]{} + s.pendingSendQueue = NewNonBlockingChannel[*collections.ReferenceCounter[[]byte]](queueSize) s.cancelCtx, s.cancelFunc = context.WithCancel(context.Background()) go s.doAsyncWrite() } @@ -265,50 +253,23 @@ func (s *BaseSink) Write(index int, data []*collections.ReferenceCounter[[]byte] s.totalDataSize.Add(uint64(len(datum.Get()))) } - // 发送被阻塞的数据 - for s.blockedBufferList.Size() > 0 { - bytes := s.blockedBufferList.Get(0) - select { - case s.pendingSendQueue <- bytes: - s.blockedBufferList.Remove(0) - break - default: - // 发送被阻塞的数据失败, 将本次发送的数据加入阻塞队列 - for _, datum := range data { - s.blockedBufferList.Add(datum) - datum.Refer() - } - return nil - } - } - - for _, bytes := range data { - if s.cancelCtx != nil { - bytes.Refer() - select { - case s.pendingSendQueue <- bytes: - break - default: - // 将本次发送的数据加入阻塞队列 - s.blockedBufferList.Add(bytes) - //return transport.ZeroWindowSizeError{} - return nil - } - } else { - _, err := s.Conn.Write(bytes.Get()) + if s.cancelCtx == nil { + for _, datum := range data { + _, err := s.Conn.Write(datum.Get()) if err != nil { return err } } + } else { + for _, datum := range data { + datum.Refer() + s.pendingSendQueue.Post(datum) + } } return nil } -func (s *BaseSink) PendingSendQueueSize() int { - return len(s.pendingSendQueue) -} - func (s *BaseSink) GetSourceID() string { return s.SourceID } @@ -392,7 +353,7 @@ func (s *BaseSink) Close() { } else if s.State == SessionStateTransferring { // 从source中删除sink, 如果source为nil, 已经结束推流. if source := SourceManager.Find(s.SourceID); source != nil { - source.RemoveSink(s) + source.GetTransStreamPublisher().RemoveSink(s) } } else if s.State == SessionStateWaiting { // 从等待队列中删除sink diff --git a/stream/sink_utils.go b/stream/sink_utils.go index 42ff28b..159d49e 100644 --- a/stream/sink_utils.go +++ b/stream/sink_utils.go @@ -54,10 +54,10 @@ func CreateSinkDisconnectionMessage(sink Sink) string { return fmt.Sprintf("%s sink断开连接. id: %s", sink.GetProtocol(), sink.GetID()) } -func ExecuteSyncEventOnSource(sourceId string, event func()) bool { +func ExecuteSyncEventOnTransStreamPublisher(sourceId string, event func()) bool { source := SourceManager.Find(sourceId) if source != nil { - source.ExecuteSyncEvent(event) + source.GetTransStreamPublisher().ExecuteSyncEvent(event) return true } diff --git a/stream/source.go b/stream/source.go index 02de79e..7176341 100644 --- a/stream/source.go +++ b/stream/source.go @@ -5,7 +5,6 @@ import ( "github.com/lkmio/avformat" "github.com/lkmio/avformat/collections" "github.com/lkmio/lkm/log" - "github.com/lkmio/transport" "net" "net/url" "strings" @@ -14,11 +13,10 @@ import ( "time" "github.com/lkmio/avformat/utils" - "github.com/lkmio/lkm/transcode" ) var ( - StreamEndInfoBride func(s Source) *StreamEndInfo + StreamEndInfoBride func(source string, tracks []*Track, streams map[TransStreamID]TransStream) *StreamEndInfo ) // Source 对推流源的封装 @@ -39,20 +37,6 @@ type Source interface { // OriginTracks 返回所有的推流track OriginTracks() []*Track - // TranscodeTracks 返回所有的转码track - TranscodeTracks() []*Track - - // AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader,先将Sink添加到等待队列. - // 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink - AddSink(sink Sink) - - // RemoveSink 同步删除Sink - RemoveSink(sink Sink) - - RemoveSinkWithID(id SinkID) - - FindSink(id SinkID) Sink - SetState(state SessionState) // Close 关闭Source @@ -78,21 +62,15 @@ type Source interface { SetUrlValues(values url.Values) // PostEvent 切换到主协程执行当前函数 - PostEvent(cb func()) + postEvent(cb func()) - ExecuteSyncEvent(cb func()) + executeSyncEvent(cb func()) // LastPacketTime 返回最近收流时间戳 LastPacketTime() time.Time SetLastPacketTime(time2 time.Time) - // SinkCount 返回拉流计数 - SinkCount() int - - // LastStreamEndTime 返回最近结束拉流时间戳 - LastStreamEndTime() time.Time - IsClosed() bool StreamPipe() chan []byte @@ -103,15 +81,11 @@ type Source interface { SetCreateTime(time time.Time) - Sinks() []Sink - GetBitrateStatistics() *BitrateStatistics - GetTransStreams() map[TransStreamID]TransStream - - GetStreamEndInfo() *StreamEndInfo - ProbeTimeout() + + GetTransStreamPublisher() TransStreamPublisher } type PublishSource struct { @@ -120,38 +94,22 @@ type PublishSource struct { state SessionState Conn net.Conn - TransDemuxer avformat.Demuxer // 负责从推流协议中解析出AVStream和AVPacket - recordSink Sink // 每个Source的录制流 - recordFilePath string // 录制流文件路径 - hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟. - audioTranscoders []transcode.Transcoder // 音频解码器 - videoTranscoders []transcode.Transcoder // 视频解码器 - originTracks TrackManager // 推流的音视频Streams - allStreamTracks TrackManager // 推流Streams+转码器获得的Stream - gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop. 如果不存在视频流, 不缓存音频 + streamPipe *NonBlockingChannel[[]byte] // 推流数据管道 + mainContextEvents chan func() // 切换到主协程执行函数的事件管道 + streamPublisher TransStreamPublisher // 解析出来的AVStream和AVPacket, 交由streamPublisher处理 - closed atomic.Bool // source是否已经关闭 - completed atomic.Bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true + TransDemuxer avformat.Demuxer // 负责从推流协议中解析出AVStream和AVPacket + originTracks TrackManager // 推流的音视频Streams + + closed atomic.Bool // 是否已经关闭 + completed atomic.Bool // 推流track是否解析完毕, @see writeHeader 函数中赋值为true existVideo bool // 是否存在视频 - TransStreams map[TransStreamID]TransStream // 所有输出流 - ForwardTransStream TransStream // 转发流 - sinks map[SinkID]Sink // 保存所有Sink - TransStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink - streamEndInfo *StreamEndInfo // 之前推流源信息 - accumulateTimestamps bool // 是否累加时间戳 - timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳 - - streamPipe chan []byte // 推流数据管道 - mainContextEvents chan func() // 切换到主协程执行函数的事件管道 - - lastPacketTime time.Time // 最近收到推流包的时间 - lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 - sinkCount int // 拉流端计数 - urlValues url.Values // 推流url携带的参数 - createTime time.Time // source创建时间 - statistics *BitrateStatistics // 码流统计 - streamLogger avformat.OnUnpackStream2FileHandler + lastPacketTime time.Time // 最近收到推流包的时间 + urlValues url.Values // 推流url携带的参数 + createTime time.Time // source创建时间 + statistics *BitrateStatistics // 码流统计 + streamLogger avformat.OnUnpackStream2FileHandler } func (s *PublishSource) SetLastPacketTime(time2 time.Time) { @@ -163,31 +121,26 @@ func (s *PublishSource) IsClosed() bool { } func (s *PublishSource) StreamPipe() chan []byte { - return s.streamPipe + return s.streamPipe.Channel } func (s *PublishSource) MainContextEvents() chan func() { return s.mainContextEvents } -func (s *PublishSource) LastStreamEndTime() time.Time { - return s.lastStreamEndTime -} - func (s *PublishSource) LastPacketTime() time.Time { return s.lastPacketTime } -func (s *PublishSource) SinkCount() int { - return s.sinkCount -} - func (s *PublishSource) GetID() string { return s.ID } func (s *PublishSource) SetID(id string) { s.ID = id + if s.streamPublisher != nil { + s.streamPublisher.SetSourceID(id) + } } func (s *PublishSource) Init(receiveQueueSize int) { @@ -195,52 +148,16 @@ func (s *PublishSource) Init(receiveQueueSize int) { // 初始化事件接收管道 // -2是为了保证从管道取到流, 到处理完流整个过程安全的, 不会被覆盖 - s.streamPipe = make(chan []byte, receiveQueueSize-2) + s.streamPipe = NewNonBlockingChannel[[]byte](receiveQueueSize - 1) s.mainContextEvents = make(chan func(), 128) - - s.TransStreams = make(map[TransStreamID]TransStream, 10) - s.sinks = make(map[SinkID]Sink, 128) - s.TransStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) s.statistics = NewBitrateStatistics() + s.streamPublisher = NewTransStreamPublisher(s.ID) // 设置探测时长 s.TransDemuxer.SetProbeDuration(AppConfig.ProbeTimeout) } -func (s *PublishSource) CreateDefaultOutStreams() { - if s.TransStreams == nil { - s.TransStreams = make(map[TransStreamID]TransStream, 10) - } - - // 创建录制流 - if AppConfig.Record.Enable { - sink, path, err := CreateRecordStream(s.ID) - if err != nil { - log.Sugar.Errorf("创建录制sink失败 source:%s err:%s", s.ID, err.Error()) - } else { - s.recordSink = sink - s.recordFilePath = path - } - } - - // 创建HLS输出流 - if AppConfig.Hls.Enable { - streams := s.OriginTracks() - utils.Assert(len(streams) > 0) - - id := GenerateTransStreamID(TransStreamHls, streams...) - hlsStream, err := s.CreateTransStream(id, TransStreamHls, streams) - if err != nil { - panic(err) - } - - s.DispatchGOPBuffer(hlsStream) - s.hlsStream = hlsStream - s.TransStreams[id] = s.hlsStream - } -} - func (s *PublishSource) Input(data []byte) error { - s.streamPipe <- data + s.streamPipe.Post(data) s.statistics.Input(len(data)) return nil } @@ -249,300 +166,6 @@ func (s *PublishSource) OriginTracks() []*Track { return s.originTracks.All() } -func (s *PublishSource) TranscodeTracks() []*Track { - return s.allStreamTracks.All() -} - -func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool { - if TransStreamRtmp == protocol || TransStreamFlv == protocol { - - } - - return true -} - -func (s *PublishSource) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) { - log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), s.ID) - - source := SourceManager.Find(s.ID) - utils.Assert(source != nil) - transStream, err := CreateTransStream(source, protocol, tracks) - if err != nil { - log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), s.ID) - return nil, err - } - - for _, track := range tracks { - // 重新拷贝一个track,传输流内部使用track的时间戳, - newTrack := *track - if err = transStream.AddTrack(&newTrack); err != nil { - return nil, err - } - } - - transStream.SetID(id) - transStream.SetProtocol(protocol) - - // 创建输出流对应的拉流队列 - s.TransStreamSinks[id] = make(map[SinkID]Sink, 128) - _ = transStream.WriteHeader() - - // 设置转发流 - if TransStreamGBCascadedForward == transStream.GetProtocol() { - s.ForwardTransStream = transStream - } - - return transStream, err -} - -func (s *PublishSource) DispatchGOPBuffer(transStream TransStream) { - if s.gopBuffer != nil { - s.gopBuffer.PeekAll(func(packet *avformat.AVPacket) { - s.DispatchPacket(transStream, packet) - }) - } -} - -// DispatchPacket 分发AVPacket -func (s *PublishSource) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) { - data, timestamp, videoKey, err := transStream.Input(packet) - if err != nil || len(data) < 1 { - return - } - - s.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey) -} - -// DispatchBuffer 分发传输流 -func (s *PublishSource) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { - sinks := s.TransStreamSinks[transStream.GetID()] - exist := transStream.IsExistVideo() - - for _, sink := range sinks { - - if sink.GetSentPacketCount() < 1 { - // 如果存在视频, 确保向sink发送的第一帧是关键帧 - if exist && !keyVideo { - continue - } - - if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - if ok := s.write(sink, index, extraData, timestamp, false); !ok { - continue - } - } - } - - if ok := s.write(sink, index, data, timestamp, keyVideo); !ok { - continue - } - } -} - -func (s *PublishSource) pendingSink(sink Sink) { - log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), s.ID) - go sink.Close() -} - -// 向sink推流 -func (s *PublishSource) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) bool { - err := sink.Write(index, data, timestamp, keyVideo) - if err == nil { - sink.IncreaseSentPacketCount() - return true - } - - // 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. - // 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流. - if _, ok := err.(transport.ZeroWindowSizeError); ok { - s.pendingSink(sink) - } - - return false -} - -// 创建sink需要的输出流 -func (s *PublishSource) doAddSink(sink Sink, resume bool) bool { - // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 - audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() - audioTrack := s.originTracks.FindWithType(utils.AVMediaTypeAudio) - videoTrack := s.originTracks.FindWithType(utils.AVMediaTypeVideo) - - disableAudio := audioTrack == nil - disableVideo := videoTrack == nil || !sink.EnableVideo() - if disableAudio && disableVideo { - return false - } - - // 不支持对期望编码的流封装. 降级 - if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) { - audioCodecId = utils.AVCodecIdNONE - videoCodecId = utils.AVCodecIdNONE - } - - if !disableAudio && utils.AVCodecIdNONE == audioCodecId { - audioCodecId = audioTrack.Stream.CodecID - } - if !disableVideo && utils.AVCodecIdNONE == videoCodecId { - videoCodecId = videoTrack.Stream.CodecID - } - - // 创建音频转码器 - if !disableAudio && audioCodecId != audioTrack.Stream.CodecID { - utils.Assert(false) - } - - // 创建视频转码器 - if !disableVideo && videoCodecId != videoTrack.Stream.CodecID { - utils.Assert(false) - } - - // 查找传输流需要的所有track - var tracks []*Track - for _, track := range s.originTracks.All() { - if disableVideo && track.Stream.MediaType == utils.AVMediaTypeVideo { - continue - } - - tracks = append(tracks, track) - } - - transStreamId := GenerateTransStreamID(sink.GetProtocol(), tracks...) - transStream, exist := s.TransStreams[transStreamId] - if !exist { - var err error - transStream, err = s.CreateTransStream(transStreamId, sink.GetProtocol(), tracks) - if err != nil { - log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), s.ID) - return false - } - - s.TransStreams[transStreamId] = transStream - } - - sink.SetTransStreamID(transStreamId) - - { - sink.Lock() - defer sink.UnLock() - - if SessionStateClosed == sink.GetState() { - log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String()) - return false - } else { - sink.SetState(SessionStateTransferring) - } - } - - err := sink.StartStreaming(transStream) - if err != nil { - log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), s.ID) - return false - } - - // 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流 - if !sink.IsReady() { - return true - } - - // 累加拉流计数 - if !resume && s.recordSink != sink { - s.sinkCount++ - log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) - } - - s.sinks[sink.GetID()] = sink - s.TransStreamSinks[transStreamId][sink.GetID()] = sink - - // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. - _, ok := sink.GetConn().(*transport.Conn) - if ok && sink.IsTCPStreaming() { - sink.EnableAsyncWriteMode(24) - } - - // 发送已有的缓存数据 - // 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。 - data, timestamp, _ := transStream.ReadKeyFrameBuffer() - if len(data) > 0 { - if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { - s.write(sink, 0, extraData, timestamp, false) - } - - s.write(sink, 0, data, timestamp, true) - } - - // 新建传输流,发送已经缓存的音视频帧 - if !exist && AppConfig.GOPCache && s.existVideo && TransStreamGBCascadedForward != transStream.GetProtocol() { - s.DispatchGOPBuffer(transStream) - } - - return true -} - -func (s *PublishSource) AddSink(sink Sink) { - s.PostEvent(func() { - if !s.completed.Load() { - AddSinkToWaitingQueue(sink.GetSourceID(), sink) - } else { - if !s.doAddSink(sink, false) { - go sink.Close() - } - } - }) -} - -func (s *PublishSource) RemoveSink(sink Sink) { - s.ExecuteSyncEvent(func() { - s.doRemoveSink(sink) - }) -} - -func (s *PublishSource) RemoveSinkWithID(id SinkID) { - s.PostEvent(func() { - sink, ok := s.sinks[id] - if ok { - s.doRemoveSink(sink) - } - }) -} - -func (s *PublishSource) FindSink(id SinkID) Sink { - var result Sink - s.ExecuteSyncEvent(func() { - sink, ok := s.sinks[id] - if ok { - result = sink - } - }) - - return result -} - -func (s *PublishSource) cleanupSinkStreaming(sink Sink) { - transStreamSinks := s.TransStreamSinks[sink.GetTransStreamID()] - delete(transStreamSinks, sink.GetID()) - s.lastStreamEndTime = time.Now() - - if sink.GetProtocol() == TransStreamHls { - // 从HLS拉流队列删除Sink - _, _ = SinkManager.Remove(sink.GetID()) - } - - sink.StopStreaming(s.TransStreams[sink.GetTransStreamID()]) -} - -func (s *PublishSource) doRemoveSink(sink Sink) bool { - s.cleanupSinkStreaming(sink) - delete(s.sinks, sink.GetID()) - - s.sinkCount-- - log.Sugar.Infof("sink count: %d source: %s", s.sinkCount, s.ID) - utils.Assert(s.sinkCount > -1) - - HookPlayDoneEvent(sink) - return true -} - func (s *PublishSource) SetState(state SessionState) { s.state = state } @@ -556,24 +179,14 @@ func (s *PublishSource) DoClose() { s.closed.Store(true) - // 释放GOP缓存 - if s.gopBuffer != nil { - s.gopBuffer.PopAll(func(packet *avformat.AVPacket) { - s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) - }) - s.gopBuffer = nil - } - - // 关闭推流源的解复用器 + // 关闭推流源的解复用器, 不再接收数据 if s.TransDemuxer != nil { s.TransDemuxer.Close() s.TransDemuxer = nil } - // 关闭录制流 - if s.recordSink != nil { - s.recordSink.Close() - } + // 等传输流发布器关闭结束 + s.streamPublisher.close() // 释放解复用器 // 释放转码器 @@ -581,62 +194,10 @@ func (s *PublishSource) DoClose() { _, err := SourceManager.Remove(s.ID) if err != nil { // source不存在, 在创建source时, 未添加到manager中, 目前只有1078流会出现这种情况(tcp连接到端口, 没有推流或推流数据无效, 无法定位到手机号, 以至于无法执行PreparePublishSource函数), 将不再处理后续事情. - log.Sugar.Errorf("删除源失败 source:%s err:%s", s.ID, err.Error()) + log.Sugar.Errorf("删除源失败 source: %s err: %s", s.ID, err.Error()) return } - // 保留推流信息 - if s.sinkCount > 0 && len(s.originTracks.All()) > 0 { - sourceHistory := StreamEndInfoBride(s) - streamEndInfoManager.Add(sourceHistory) - } - - // 关闭所有输出流 - for _, transStream := range s.TransStreams { - // 发送剩余包 - data, ts, _ := transStream.Close() - if len(data) > 0 { - s.DispatchBuffer(transStream, -1, data, ts, true) - } - - // 如果是tcp传输流, 归还合并写缓冲区 - if !transStream.IsTCPStreaming() || transStream.GetMWBuffer() == nil { - continue - } else if buffers := transStream.GetMWBuffer().Close(); buffers != nil { - AddMWBuffersToPending(s.ID, transStream.GetID(), buffers) - } - } - - // 将所有sink添加到等待队列 - for _, sink := range s.sinks { - transStreamID := sink.GetTransStreamID() - sink.SetTransStreamID(0) - if s.recordSink == sink { - continue - } - - { - sink.Lock() - - if SessionStateClosed == sink.GetState() { - log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String()) - } else { - sink.SetState(SessionStateWaiting) - AddSinkToWaitingQueue(s.ID, sink) - } - - sink.UnLock() - } - - if SessionStateClosed != sink.GetState() { - sink.StopStreaming(s.TransStreams[transStreamID]) - } - } - - s.TransStreams = nil - s.sinks = nil - s.TransStreamSinks = nil - // 异步hook go func() { if s.Conn != nil { @@ -645,10 +206,6 @@ func (s *PublishSource) DoClose() { } HookPublishDoneEvent(s) - - if s.recordSink != nil { - HookRecordEvent(s, s.recordFilePath) - } }() } @@ -658,7 +215,7 @@ func (s *PublishSource) Close() { } // 同步执行, 确保close后, 主协程已经退出, 不会再处理任何推拉流、查询等任何事情. - s.ExecuteSyncEvent(func() { + s.executeSyncEvent(func() { s.DoClose() }) } @@ -672,46 +229,13 @@ func (s *PublishSource) writeHeader() { s.completed.Store(true) + s.streamPublisher.Post(&StreamEvent{StreamEventTypeTrackCompleted, nil}) + if len(s.originTracks.All()) == 0 { log.Sugar.Errorf("没有一路track, 删除source: %s", s.ID) s.DoClose() return } - - // 尝试恢复上次推流的会话 - if streamInfo := streamEndInfoManager.Remove(s.ID); streamInfo != nil && EqualsTracks(streamInfo, s.originTracks.All()) { - s.streamEndInfo = streamInfo - - // 恢复每路track的时间戳 - tracks := s.originTracks.All() - for _, track := range tracks { - timestamps := streamInfo.Timestamps[track.Stream.CodecID] - track.Dts = timestamps[0] - track.Pts = timestamps[1] - } - } - - // 纠正GOP中的时间戳 - if s.gopBuffer != nil && s.gopBuffer.Size() != 0 { - s.gopBuffer.PeekAll(func(packet *avformat.AVPacket) { - s.CorrectTimestamp(packet) - }) - } - - // 创建录制流和HLS - s.CreateDefaultOutStreams() - - // 将等待队列的sink添加到输出流队列 - sinks := PopWaitingSinks(s.ID) - if s.recordSink != nil { - sinks = append(sinks, s.recordSink) - } - - for _, sink := range sinks { - if !s.doAddSink(sink, false) { - go sink.Close() - } - } } func (s *PublishSource) IsCompleted() bool { @@ -729,31 +253,6 @@ func (s *PublishSource) NotTrackAdded(index int) bool { return true } -func (s *PublishSource) CorrectTimestamp(packet *avformat.AVPacket) { - // 对比第一包的时间戳和上次推流的最后时间戳。如果小于上次的推流时间戳,则在原来的基础上累加。 - if s.streamEndInfo != nil && !s.timestampModeDecided { - s.timestampModeDecided = true - - timestamps := s.streamEndInfo.Timestamps[packet.CodecID] - s.accumulateTimestamps = true - log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1]) - } - - track := s.originTracks.Find(packet.CodecID) - duration := packet.GetDuration(packet.Timebase) - - // 根据duration来累加时间戳 - if s.accumulateTimestamps { - offset := packet.Pts - packet.Dts - packet.Dts = track.Dts + duration - packet.Pts = packet.Dts + offset - } - - track.Dts = packet.Dts - track.Pts = packet.Pts - track.FrameDuration = int(duration) -} - func (s *PublishSource) OnNewTrack(track avformat.Track) { if AppConfig.Debug { s.streamLogger.Path = "dump/" + strings.ReplaceAll(s.ID, "/", "_") @@ -770,17 +269,14 @@ func (s *PublishSource) OnNewTrack(track avformat.Track) { return } - s.originTracks.Add(NewTrack(stream, 0, 0)) - s.allStreamTracks.Add(NewTrack(stream, 0, 0)) + newTrack := NewTrack(stream, 0, 0) + s.originTracks.Add(newTrack) if utils.AVMediaTypeVideo == stream.MediaType { s.existVideo = true } - // 创建GOPBuffer - if AppConfig.GOPCache && s.existVideo && s.gopBuffer == nil { - s.gopBuffer = NewStreamBuffer() - } + s.streamPublisher.Post(&StreamEvent{StreamEventTypeTrack, newTrack}) } func (s *PublishSource) OnTrackComplete() { @@ -810,32 +306,16 @@ func (s *PublishSource) OnPacket(packet *avformat.AVPacket) { return } - // 保存到GOP缓存 - if AppConfig.GOPCache && s.existVideo { - // GOP队列溢出 - if s.gopBuffer.RequiresClear(packet) { - s.gopBuffer.PopAll(func(packet *avformat.AVPacket) { - s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) - }) - } + packetPtr := collections.NewReferenceCounter(packet) + packetPtr.Refer() // 引用计数加1 - s.gopBuffer.AddPacket(packet) - } + packets := s.originTracks.FindWithType(packet.MediaType).Packets + packets.Add(packetPtr) + s.streamPublisher.Post(&StreamEvent{StreamEventTypePacket, packetPtr}) - // track解析完毕后,才能生成传输流 - if s.completed.Load() { - s.CorrectTimestamp(packet) - - // 分发给各个传输流 - for _, transStream := range s.TransStreams { - if TransStreamGBCascadedForward != transStream.GetProtocol() { - s.DispatchPacket(transStream, packet) - } - } - } - - // 未开启GOP缓存或只存在音频流, 立即释放 - if !AppConfig.GOPCache || !s.existVideo { + // 释放未引用的AVPacket + for old := packets.Get(0); old.UseCount() < 2; old = packets.Get(0) { + packets.Remove(0).Release() s.TransDemuxer.DiscardHeadPacket(packet.BufferIndex) } } @@ -872,15 +352,15 @@ func (s *PublishSource) SetUrlValues(values url.Values) { s.urlValues = values } -func (s *PublishSource) PostEvent(cb func()) { +func (s *PublishSource) postEvent(cb func()) { s.mainContextEvents <- cb } -func (s *PublishSource) ExecuteSyncEvent(cb func()) { +func (s *PublishSource) executeSyncEvent(cb func()) { group := sync.WaitGroup{} group.Add(1) - s.PostEvent(func() { + s.postEvent(func() { cb() group.Done() }) @@ -896,32 +376,16 @@ func (s *PublishSource) SetCreateTime(time time.Time) { s.createTime = time } -func (s *PublishSource) Sinks() []Sink { - var sinks []Sink - - s.ExecuteSyncEvent(func() { - for _, sink := range s.sinks { - sinks = append(sinks, sink) - } - }) - - return sinks -} - func (s *PublishSource) GetBitrateStatistics() *BitrateStatistics { return s.statistics } -func (s *PublishSource) GetTransStreams() map[TransStreamID]TransStream { - return s.TransStreams -} - -func (s *PublishSource) GetStreamEndInfo() *StreamEndInfo { - return s.streamEndInfo -} - func (s *PublishSource) ProbeTimeout() { if s.TransDemuxer != nil { s.TransDemuxer.ProbeComplete() } } + +func (s *PublishSource) GetTransStreamPublisher() TransStreamPublisher { + return s.streamPublisher +} diff --git a/stream/source_utils.go b/stream/source_utils.go index 7768bff..7a41b4d 100644 --- a/stream/source_utils.go +++ b/stream/source_utils.go @@ -133,97 +133,6 @@ func ParseUrl(name string) (string, url.Values) { return name, nil } -// -//func ExtractVideoPacket(codec utils.AVCodecID, key, extractStream bool, data []byte, pts, dts int64, index, timebase int) (*avformat.AVStream, *avformat.AVPacket, error) { -// var stream *avformat.AVStream -// -// if utils.AVCodecIdH264 == codec { -// //从关键帧中解析出sps和pps -// if key && extractStream { -// sps, pps, err := avc.ParseExtraDataFromKeyNALU(data) -// if err != nil { -// log.Sugar.Errorf("从关键帧中解析sps pps失败 data:%s", hex.EncodeToString(data)) -// return nil, nil, err -// } -// -// codecData, err := utils.NewAVCCodecData(sps, pps) -// if err != nil { -// log.Sugar.Errorf("解析sps pps失败 data:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(sps), hex.EncodeToString(pps)) -// return nil, nil, err -// } -// -// stream = avformat.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) -// } -// -// } else if utils.AVCodecIdH265 == codec { -// if key && extractStream { -// vps, sps, pps, err := hevc.ParseExtraDataFromKeyNALU(data) -// if err != nil { -// log.Sugar.Errorf("从关键帧中解析vps sps pps失败 data:%s", hex.EncodeToString(data)) -// return nil, nil, err -// } -// -// codecData, err := utils.NewHEVCCodecData(vps, sps, pps) -// if err != nil { -// log.Sugar.Errorf("解析sps pps失败 data:%s vps:%s sps:%s, pps:%s", hex.EncodeToString(data), hex.EncodeToString(vps), hex.EncodeToString(sps), hex.EncodeToString(pps)) -// return nil, nil, err -// } -// -// stream = avformat.NewAVStream(utils.AVMediaTypeVideo, 0, codec, codecData.AnnexBExtraData(), codecData) -// } -// -// } -// -// packet := avformat.NewVideoPacket(data, dts, pts, key, utils.PacketTypeAnnexB, codec, index, timebase) -// return stream, packet, nil -//} -// -//func ExtractAudioPacket(codec utils.AVCodecID, extractStream bool, data []byte, pts, dts int64, index, timebase int) (*avformat.AVStream, *avformat.AVPacket, error) { -// var stream *avformat.AVStream -// var packet *avformat.AVPacket -// if utils.AVCodecIdAAC == codec { -// //必须包含ADTSHeader -// if len(data) < 7 { -// return nil, nil, fmt.Errorf("need more data") -// } -// -// var skip int -// header, err := utils.ReadADtsFixedHeader(data) -// if err != nil { -// log.Sugar.Errorf("读取ADTSHeader失败 data:%s", hex.EncodeToString(data[:7])) -// return nil, nil, err -// } else { -// skip = 7 -// //跳过ADtsHeader长度 -// if header.ProtectionAbsent() == 0 { -// skip += 2 -// } -// } -// -// if extractStream { -// configData, err := utils.ADtsHeader2MpegAudioConfigData(header) -// config, err := utils.ParseMpeg4AudioConfig(configData) -// println(config) -// if err != nil { -// log.Sugar.Errorf("adt头转m4ac失败 data:%s", hex.EncodeToString(data[:7])) -// return nil, nil, err -// } -// -// stream = avformat.NewAVStream(utils.AVMediaTypeAudio, index, codec, configData, nil) -// } -// -// packet = utils.NewAudioPacket(data[skip:], dts, pts, codec, index, timebase) -// } else if utils.AVCodecIdPCMALAW == codec || utils.AVCodecIdPCMMULAW == codec { -// if extractStream { -// stream = avformat.NewAVStream(utils.AVMediaTypeAudio, index, codec, nil, nil) -// } -// -// packet = utils.NewAudioPacket(data, dts, pts, codec, index, timebase) -// } -// -// return stream, packet, nil -//} - // StartReceiveDataTimer 启动收流超时计时器 // 收流超时, 客观上认为是流中断, 应该关闭Source. 如果开启了Hook, 并且Hook返回200应答, 则不关闭Source. func StartReceiveDataTimer(source Source) *time.Timer { @@ -264,9 +173,9 @@ func StartIdleTimer(source Source) *time.Timer { var idleTimer *time.Timer idleTimer = time.AfterFunc(time.Duration(AppConfig.IdleTimeout), func() { - dis := time.Now().Sub(source.LastStreamEndTime()) + dis := time.Now().Sub(source.GetTransStreamPublisher().LastStreamEndTime()) - if source.SinkCount() < 1 && dis >= time.Duration(AppConfig.IdleTimeout) { + if source.GetTransStreamPublisher().SinkCount() < 1 && dis >= time.Duration(AppConfig.IdleTimeout) { log.Sugar.Errorf("拉流空闲超时 source: %s", source.GetID()) // 此处不参考返回值err, 客观希望不关闭Source @@ -334,7 +243,7 @@ func LoopEvent(source Source) { } var ok bool - source.ExecuteSyncEvent(func() { + source.executeSyncEvent(func() { source.ProbeTimeout() ok = len(source.OriginTracks()) > 0 }) @@ -345,6 +254,9 @@ func LoopEvent(source Source) { } }) + // 启动协程, 生成发布传输流 + go source.GetTransStreamPublisher().run() + for { select { // 读取推流数据 diff --git a/stream/stream_publisher.go b/stream/stream_publisher.go new file mode 100644 index 0000000..3a11ceb --- /dev/null +++ b/stream/stream_publisher.go @@ -0,0 +1,731 @@ +package stream + +import ( + "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" + "github.com/lkmio/avformat/utils" + "github.com/lkmio/lkm/log" + "github.com/lkmio/lkm/transcode" + "github.com/lkmio/transport" + "sync" + "sync/atomic" + "time" +) + +type StreamEventType int + +const ( + StreamEventTypeTrack StreamEventType = iota + 1 + StreamEventTypeTrackCompleted + StreamEventTypePacket + StreamEventTypeRawPacket +) + +type StreamEvent struct { + Type StreamEventType + Data interface{} +} + +type TransStreamPublisher interface { + Post(event *StreamEvent) + + run() + + close() + + Sinks() []Sink + + GetTransStreams() map[TransStreamID]TransStream + + GetForwardTransStream() TransStream + + GetStreamEndInfo() *StreamEndInfo + + // SinkCount 返回拉流计数 + SinkCount() int + + // LastStreamEndTime 返回最近结束拉流时间戳 + LastStreamEndTime() time.Time + + // TranscodeTracks 返回所有的转码track + TranscodeTracks() []*Track + + // AddSink 添加Sink, 在此之前请确保Sink已经握手、授权通过. 如果Source还未WriteHeader,先将Sink添加到等待队列. + // 匹配拉流期望的编码器, 创建TransStream或向已经存在TransStream添加Sink + AddSink(sink Sink) + + // RemoveSink 同步删除Sink + RemoveSink(sink Sink) + + RemoveSinkWithID(id SinkID) + + FindSink(id SinkID) Sink + + ExecuteSyncEvent(cb func()) + + SetSourceID(id string) +} + +type transStreamPublisher struct { + source string + streamEvents *NonBlockingChannel[*StreamEvent] + mainContextEvents chan func() + + sinkCount int + gopBuffer GOPBuffer // GOP缓存, 音频和视频混合使用, 以视频关键帧为界, 缓存第二个视频关键帧时, 释放前一组gop + + recordSink Sink // 每个Source的录制流 + recordFilePath string // 录制流文件路径 + hlsStream TransStream // HLS传输流, 如果开启, 在@see writeHeader 函数中直接创建, 如果等拉流时再创建, 会进一步加大HLS延迟. + _ []transcode.Transcoder // 音频解码器 + _ []transcode.Transcoder // 视频解码器 + originTracks TrackManager // 推流的音视频Streams + allStreamTracks TrackManager // 推流Streams+转码器获得的Stream + + transStreams map[TransStreamID]TransStream // 所有输出流 + forwardTransStream TransStream // 转发流 + sinks map[SinkID]Sink // 保存所有Sink + transStreamSinks map[TransStreamID]map[SinkID]Sink // 输出流对应的Sink + + existVideo bool // 是否存在视频 + completed atomic.Bool // 所有推流track是否解析完毕, @see writeHeader 函数中赋值为true + closed atomic.Bool + streamEndInfo *StreamEndInfo // 之前推流源信息 + accumulateTimestamps bool // 是否累加时间戳 + timestampModeDecided bool // 是否已经决定使用推流的时间戳,或者累加时间戳 + lastStreamEndTime time.Time // 最近拉流端结束拉流的时间 +} + +func (t *transStreamPublisher) Post(event *StreamEvent) { + t.streamEvents.Post(event) +} + +func (t *transStreamPublisher) run() { + t.streamEvents = NewNonBlockingChannel[*StreamEvent](256) + t.mainContextEvents = make(chan func(), 256) + + t.transStreams = make(map[TransStreamID]TransStream, 10) + t.sinks = make(map[SinkID]Sink, 128) + t.transStreamSinks = make(map[TransStreamID]map[SinkID]Sink, len(transStreamFactories)+1) + + defer func() { + // 清空管道 + for event := t.streamEvents.Pop(); event != nil; event = t.streamEvents.Pop() { + if StreamEventTypePacket == event.Type { + event.Data.(*collections.ReferenceCounter[*avformat.AVPacket]).Release() + } + } + }() + + for { + select { + case event := <-t.streamEvents.Channel: + switch event.Type { + case StreamEventTypeTrack: + // 添加track + t.OnNewTrack(event.Data.(*Track)) + case StreamEventTypeTrackCompleted: + t.WriteHeader() + // track完成 + case StreamEventTypePacket: + // 发送数据包 + t.OnPacket(event.Data.(*collections.ReferenceCounter[*avformat.AVPacket])) + case StreamEventTypeRawPacket: + // 发送原始数据包, 目前仅用于国标级联转发 + if t.forwardTransStream != nil && t.forwardTransStream.GetProtocol() == TransStreamGBCascadedForward { + packets := event.Data.([][]byte) + for _, data := range packets { + t.DispatchPacket(t.forwardTransStream, &avformat.AVPacket{Data: data[2:]}) + UDPReceiveBufferPool.Put(data[:cap(data)]) + } + } + } + case event := <-t.mainContextEvents: + event() + if t.closed.Load() { + return + } + } + } +} + +func (t *transStreamPublisher) PostEvent(cb func()) { + t.mainContextEvents <- cb +} + +func (t *transStreamPublisher) ExecuteSyncEvent(cb func()) { + group := sync.WaitGroup{} + group.Add(1) + + t.PostEvent(func() { + cb() + group.Done() + }) + + group.Wait() +} + +func (t *transStreamPublisher) CreateDefaultOutStreams() { + if t.transStreams == nil { + t.transStreams = make(map[TransStreamID]TransStream, 10) + } + + // 创建录制流 + if AppConfig.Record.Enable { + sink, path, err := CreateRecordStream(t.source) + if err != nil { + log.Sugar.Errorf("创建录制sink失败 source: %s err: %s", t.source, err.Error()) + } else { + t.recordSink = sink + t.recordFilePath = path + } + } + + // 创建HLS输出流 + if AppConfig.Hls.Enable { + streams := t.originTracks.All() + utils.Assert(len(streams) > 0) + + id := GenerateTransStreamID(TransStreamHls, streams...) + hlsStream, err := t.CreateTransStream(id, TransStreamHls, streams) + if err != nil { + panic(err) + } + + t.DispatchGOPBuffer(hlsStream) + t.hlsStream = hlsStream + t.transStreams[id] = t.hlsStream + } +} + +func IsSupportMux(protocol TransStreamProtocol, _, _ utils.AVCodecID) bool { + if TransStreamRtmp == protocol || TransStreamFlv == protocol { + + } + + return true +} + +func (t *transStreamPublisher) CreateTransStream(id TransStreamID, protocol TransStreamProtocol, tracks []*Track) (TransStream, error) { + log.Sugar.Infof("创建%s-stream source: %s", protocol.String(), t.source) + + source := SourceManager.Find(t.source) + utils.Assert(source != nil) + transStream, err := CreateTransStream(source, protocol, tracks) + if err != nil { + log.Sugar.Errorf("创建传输流失败 err: %s source: %s", err.Error(), t.source) + return nil, err + } + + for _, track := range tracks { + // 重新拷贝一个track,传输流内部使用track的时间戳, + newTrack := *track + if err = transStream.AddTrack(&newTrack); err != nil { + return nil, err + } + } + + transStream.SetID(id) + transStream.SetProtocol(protocol) + + // 创建输出流对应的拉流队列 + t.transStreamSinks[id] = make(map[SinkID]Sink, 128) + _ = transStream.WriteHeader() + + // 设置转发流 + if TransStreamGBCascadedForward == transStream.GetProtocol() { + t.forwardTransStream = transStream + } + + return transStream, err +} + +func (t *transStreamPublisher) DispatchGOPBuffer(transStream TransStream) { + if t.gopBuffer != nil { + t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + t.DispatchPacket(transStream, packet.Get()) + }) + } +} + +// DispatchPacket 分发AVPacket +func (t *transStreamPublisher) DispatchPacket(transStream TransStream, packet *avformat.AVPacket) { + data, timestamp, videoKey, err := transStream.Input(packet) + if err != nil || len(data) < 1 { + return + } + + t.DispatchBuffer(transStream, packet.Index, data, timestamp, videoKey) +} + +// DispatchBuffer 分发传输流 +func (t *transStreamPublisher) DispatchBuffer(transStream TransStream, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) { + sinks := t.transStreamSinks[transStream.GetID()] + exist := transStream.IsExistVideo() + + for _, sink := range sinks { + + if sink.GetSentPacketCount() < 1 { + // 如果存在视频, 确保向sink发送的第一帧是关键帧 + if exist && !keyVideo { + continue + } + + if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { + if ok := t.write(sink, index, extraData, timestamp, false); !ok { + continue + } + } + } + + if ok := t.write(sink, index, data, timestamp, keyVideo); !ok { + continue + } + } +} + +func (t *transStreamPublisher) pendingSink(sink Sink) { + log.Sugar.Errorf("向sink推流超时,关闭连接. %s-sink: %s source: %s", sink.GetProtocol().String(), sink.GetID(), t.source) + go sink.Close() +} + +// 向sink推流 +func (t *transStreamPublisher) write(sink Sink, index int, data []*collections.ReferenceCounter[[]byte], timestamp int64, keyVideo bool) bool { + err := sink.Write(index, data, timestamp, keyVideo) + if err == nil { + sink.IncreaseSentPacketCount() + return true + } + + // 推流超时, 可能是服务器或拉流端带宽不够、拉流端不读取数据等情况造成内核发送缓冲区满, 进而阻塞. + // 直接关闭连接. 当然也可以将sink先挂起, 后续再继续推流. + if _, ok := err.(transport.ZeroWindowSizeError); ok { + t.pendingSink(sink) + } + + return false +} + +// 创建sink需要的输出流 +func (t *transStreamPublisher) doAddSink(sink Sink, resume bool) bool { + // 暂时不考虑多路视频流,意味着只能1路视频流和多路音频流,同理originStreams和allStreams里面的Stream互斥. 同时多路音频流的Codec必须一致 + audioCodecId, videoCodecId := sink.DesiredAudioCodecId(), sink.DesiredVideoCodecId() + audioTrack := t.originTracks.FindWithType(utils.AVMediaTypeAudio) + videoTrack := t.originTracks.FindWithType(utils.AVMediaTypeVideo) + + disableAudio := audioTrack == nil + disableVideo := videoTrack == nil || !sink.EnableVideo() + if disableAudio && disableVideo { + return false + } + + // 不支持对期望编码的流封装. 降级 + if (utils.AVCodecIdNONE != audioCodecId || utils.AVCodecIdNONE != videoCodecId) && !IsSupportMux(sink.GetProtocol(), audioCodecId, videoCodecId) { + audioCodecId = utils.AVCodecIdNONE + videoCodecId = utils.AVCodecIdNONE + } + + if !disableAudio && utils.AVCodecIdNONE == audioCodecId { + audioCodecId = audioTrack.Stream.CodecID + } + if !disableVideo && utils.AVCodecIdNONE == videoCodecId { + videoCodecId = videoTrack.Stream.CodecID + } + + // 创建音频转码器 + if !disableAudio && audioCodecId != audioTrack.Stream.CodecID { + utils.Assert(false) + } + + // 创建视频转码器 + if !disableVideo && videoCodecId != videoTrack.Stream.CodecID { + utils.Assert(false) + } + + // 查找传输流需要的所有track + var tracks []*Track + for _, track := range t.originTracks.All() { + if disableVideo && track.Stream.MediaType == utils.AVMediaTypeVideo { + continue + } + + tracks = append(tracks, track) + } + + transStreamId := GenerateTransStreamID(sink.GetProtocol(), tracks...) + transStream, exist := t.transStreams[transStreamId] + if !exist { + var err error + transStream, err = t.CreateTransStream(transStreamId, sink.GetProtocol(), tracks) + if err != nil { + log.Sugar.Errorf("添加sink失败,创建传输流发生err: %s source: %s", err.Error(), t.source) + return false + } + + t.transStreams[transStreamId] = transStream + } + + sink.SetTransStreamID(transStreamId) + + { + sink.Lock() + defer sink.UnLock() + + if SessionStateClosed == sink.GetState() { + log.Sugar.Warnf("添加sink失败, sink已经断开连接 %s", sink.String()) + return false + } else { + sink.SetState(SessionStateTransferring) + } + } + + err := sink.StartStreaming(transStream) + if err != nil { + log.Sugar.Errorf("添加sink失败,开始推流发生err: %s sink: %s source: %s ", err.Error(), SinkId2String(sink.GetID()), t.source) + return false + } + + // 还没做好准备(rtsp拉流还在协商sdp中), 暂不推流 + if !sink.IsReady() { + return true + } + + // 累加拉流计数 + if !resume && t.recordSink != sink { + t.sinkCount++ + log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) + } + + t.sinks[sink.GetID()] = sink + t.transStreamSinks[transStreamId][sink.GetID()] = sink + + // TCP拉流开启异步发包, 一旦出现网络不好的链路, 其余正常链路不受影响. + _, ok := sink.GetConn().(*transport.Conn) + if ok && sink.IsTCPStreaming() { + sink.EnableAsyncWriteMode(24) + } + + // 发送已有的缓存数据 + // 此处发送缓存数据,必须要存在关键帧的输出流才发,否则等DispatchPacket时再发送extra。 + data, timestamp, _ := transStream.ReadKeyFrameBuffer() + if len(data) > 0 { + if extraData, _, _ := transStream.ReadExtraData(timestamp); len(extraData) > 0 { + t.write(sink, 0, extraData, timestamp, false) + } + + t.write(sink, 0, data, timestamp, true) + } + + // 新建传输流,发送已经缓存的音视频帧 + if !exist && AppConfig.GOPCache && t.existVideo && TransStreamGBCascadedForward != transStream.GetProtocol() { + t.DispatchGOPBuffer(transStream) + } + + return true +} + +func (t *transStreamPublisher) AddSink(sink Sink) { + t.PostEvent(func() { + if !t.completed.Load() { + AddSinkToWaitingQueue(sink.GetSourceID(), sink) + } else { + if !t.doAddSink(sink, false) { + go sink.Close() + } + } + }) +} + +func (t *transStreamPublisher) RemoveSink(sink Sink) { + t.ExecuteSyncEvent(func() { + t.doRemoveSink(sink) + }) +} + +func (t *transStreamPublisher) RemoveSinkWithID(id SinkID) { + t.PostEvent(func() { + sink, ok := t.sinks[id] + if ok { + t.doRemoveSink(sink) + } + }) +} + +func (t *transStreamPublisher) FindSink(id SinkID) Sink { + var result Sink + t.ExecuteSyncEvent(func() { + sink, ok := t.sinks[id] + if ok { + result = sink + } + }) + + return result +} + +func (t *transStreamPublisher) cleanupSinkStreaming(sink Sink) { + transStreamSinks := t.transStreamSinks[sink.GetTransStreamID()] + delete(transStreamSinks, sink.GetID()) + t.lastStreamEndTime = time.Now() + + if sink.GetProtocol() == TransStreamHls { + // 从HLS拉流队列删除Sink + _, _ = SinkManager.Remove(sink.GetID()) + } + + sink.StopStreaming(t.transStreams[sink.GetTransStreamID()]) +} + +func (t *transStreamPublisher) doRemoveSink(sink Sink) bool { + t.cleanupSinkStreaming(sink) + delete(t.sinks, sink.GetID()) + + t.sinkCount-- + log.Sugar.Infof("sink count: %d source: %s", t.sinkCount, t.source) + utils.Assert(t.sinkCount > -1) + + HookPlayDoneEvent(sink) + return true +} + +func (t *transStreamPublisher) close() { + t.ExecuteSyncEvent(func() { + t.doClose() + }) +} + +func (t *transStreamPublisher) doClose() { + t.closed.Store(true) + + // 释放GOP缓存 + if t.gopBuffer != nil { + t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + packet.Release() + }) + t.gopBuffer = nil + } + + // 关闭录制流 + if t.recordSink != nil { + t.recordSink.Close() + } + + // 保留推流信息 + if t.sinkCount > 0 && len(t.originTracks.All()) > 0 { + sourceHistory := StreamEndInfoBride(t.source, t.originTracks.All(), t.transStreams) + streamEndInfoManager.Add(sourceHistory) + } + + // 关闭所有输出流 + for _, transStream := range t.transStreams { + // 发送剩余包 + data, ts, _ := transStream.Close() + if len(data) > 0 { + t.DispatchBuffer(transStream, -1, data, ts, true) + } + + // 如果是tcp传输流, 归还合并写缓冲区 + if !transStream.IsTCPStreaming() || transStream.GetMWBuffer() == nil { + continue + } else if buffers := transStream.GetMWBuffer().Close(); buffers != nil { + AddMWBuffersToPending(t.source, transStream.GetID(), buffers) + } + } + + // 将所有sink添加到等待队列 + for _, sink := range t.sinks { + transStreamID := sink.GetTransStreamID() + sink.SetTransStreamID(0) + if t.recordSink == sink { + continue + } + + { + sink.Lock() + + if SessionStateClosed == sink.GetState() { + log.Sugar.Warnf("添加到sink到等待队列失败, sink已经断开连接 %s", sink.String()) + } else { + sink.SetState(SessionStateWaiting) + AddSinkToWaitingQueue(t.source, sink) + } + + sink.UnLock() + } + + if SessionStateClosed != sink.GetState() { + sink.StopStreaming(t.transStreams[transStreamID]) + } + } + + t.transStreams = nil + t.sinks = nil + t.transStreamSinks = nil +} + +func (t *transStreamPublisher) WriteHeader() { + t.completed.Store(true) + + // 尝试使用上次结束推流的时间戳 + if streamInfo := streamEndInfoManager.Remove(t.source); streamInfo != nil && EqualsTracks(streamInfo, t.originTracks.All()) { + t.streamEndInfo = streamInfo + + // 恢复每路track的时间戳 + tracks := t.originTracks.All() + for _, track := range tracks { + timestamps := streamInfo.Timestamps[track.Stream.CodecID] + track.Dts = timestamps[0] + track.Pts = timestamps[1] + } + } + + // 纠正GOP中的时间戳 + if t.gopBuffer != nil && t.gopBuffer.Size() != 0 { + t.gopBuffer.PeekAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + t.CorrectTimestamp(packet.Get()) + }) + } + + // 创建录制流和HLS + t.CreateDefaultOutStreams() + + // 将等待队列的sink添加到输出流队列 + sinks := PopWaitingSinks(t.source) + if t.recordSink != nil { + sinks = append(sinks, t.recordSink) + } + + for _, sink := range sinks { + if !t.doAddSink(sink, false) { + go sink.Close() + } + } + + // 如果不存在视频帧, 清空GOP缓存 + if !t.existVideo { + t.gopBuffer.PopAll(func(c *collections.ReferenceCounter[*avformat.AVPacket]) { + c.Refer() + }) + t.gopBuffer = nil + } +} + +func (t *transStreamPublisher) Sinks() []Sink { + var sinks []Sink + + t.ExecuteSyncEvent(func() { + for _, sink := range t.sinks { + sinks = append(sinks, sink) + } + }) + + return sinks +} + +func (t *transStreamPublisher) OnPacket(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + // 保存到GOP缓存 + if (AppConfig.GOPCache && t.existVideo) || !t.completed.Load() { + // GOP队列溢出 + if t.gopBuffer.RequiresClear(packet) { + t.gopBuffer.PopAll(func(packet *collections.ReferenceCounter[*avformat.AVPacket]) { + packet.Release() + }) + } + + t.gopBuffer.AddPacket(packet) + } + + // track解析完毕后,才能生成传输流 + if t.completed.Load() { + t.CorrectTimestamp(packet.Get()) + + // 分发给各个传输流 + for _, transStream := range t.transStreams { + if TransStreamGBCascadedForward != transStream.GetProtocol() { + t.DispatchPacket(transStream, packet.Get()) + } + } + } + + // 未开启GOP缓存或只存在音频流, 立即释放 + if !AppConfig.GOPCache || !t.existVideo { + packet.Release() + } +} + +func (t *transStreamPublisher) OnNewTrack(track *Track) { + stream := track.Stream + t.originTracks.Add(track) + + if utils.AVMediaTypeVideo == stream.MediaType { + t.existVideo = true + } + + // 创建GOPBuffer + if t.gopBuffer == nil { + t.gopBuffer = NewStreamBuffer() + } +} + +// CorrectTimestamp 纠正时间戳 +func (t *transStreamPublisher) CorrectTimestamp(packet *avformat.AVPacket) { + // 对比第一包的时间戳和上次推流的最后时间戳。如果小于上次的推流时间戳,则在原来的基础上累加。 + if t.streamEndInfo != nil && !t.timestampModeDecided { + t.timestampModeDecided = true + + timestamps := t.streamEndInfo.Timestamps[packet.CodecID] + t.accumulateTimestamps = true + log.Sugar.Infof("累加时间戳 上次推流dts: %d, pts: %d", timestamps[0], timestamps[1]) + } + + track := t.originTracks.Find(packet.CodecID) + duration := packet.GetDuration(packet.Timebase) + + // 根据duration来累加时间戳 + if t.accumulateTimestamps { + offset := packet.Pts - packet.Dts + packet.Dts = track.Dts + duration + packet.Pts = packet.Dts + offset + } + + track.Dts = packet.Dts + track.Pts = packet.Pts + track.FrameDuration = int(duration) +} + +func (t *transStreamPublisher) GetTransStreams() map[TransStreamID]TransStream { + return t.transStreams +} + +func (t *transStreamPublisher) GetStreamEndInfo() *StreamEndInfo { + return t.streamEndInfo +} + +func (t *transStreamPublisher) TranscodeTracks() []*Track { + return t.allStreamTracks.All() +} + +func (t *transStreamPublisher) LastStreamEndTime() time.Time { + return t.lastStreamEndTime +} + +func (t *transStreamPublisher) SinkCount() int { + return t.sinkCount +} + +func (t *transStreamPublisher) GetForwardTransStream() TransStream { + return t.forwardTransStream +} + +func (t *transStreamPublisher) SetSourceID(id string) { + t.source = id +} + +func NewTransStreamPublisher(source string) TransStreamPublisher { + return &transStreamPublisher{ + transStreams: make(map[TransStreamID]TransStream), + transStreamSinks: make(map[TransStreamID]map[SinkID]Sink), + sinks: make(map[SinkID]Sink), + source: source, + } +} diff --git a/stream/track.go b/stream/track.go index ca8095a..f60e9d0 100644 --- a/stream/track.go +++ b/stream/track.go @@ -2,6 +2,7 @@ package stream import ( "github.com/lkmio/avformat" + "github.com/lkmio/avformat/collections" ) type Track struct { @@ -9,8 +10,9 @@ type Track struct { Pts int64 // 最新的PTS Dts int64 // 最新的DTS FrameDuration int // 单帧时长, timebase和推流一致 + Packets collections.LinkedList[*collections.ReferenceCounter[*avformat.AVPacket]] } func NewTrack(stream *avformat.AVStream, dts, pts int64) *Track { - return &Track{stream, dts, pts, 0} + return &Track{stream, dts, pts, 0, collections.LinkedList[*collections.ReferenceCounter[*avformat.AVPacket]]{}} } diff --git a/web/rtc.html b/web/rtc.html index a205bf0..e05325e 100644 --- a/web/rtc.html +++ b/web/rtc.html @@ -38,7 +38,7 @@
- +
@@ -50,7 +50,7 @@ let remote_view = document.getElementById("videoview"); let source = document.getElementById("source").value; let pc = new RTCPeerConnection(null); - // pc.addTransceiver("audio", {direction: "recvonly"}); + // pc.addTransceiver("audio", {direction: "recvonly"}); pc.addTransceiver("video", {direction: "recvonly"}); let offer = await pc.createOffer(); @@ -80,7 +80,9 @@ } console.log("offer:" + offer.sdp); - let url = source + ".rtc"; + + //source = generateRandomAlphanumeric10(); + let url = window.location.origin + "/" + source + ".rtc"; fetch(url, { method: 'POST', body: JSON.stringify(data),