From d3f37e63a680da49ab04d144ad6d42dfa285109c Mon Sep 17 00:00:00 2001 From: yangjie <1534796060@qq.com> Date: Wed, 29 Nov 2023 22:24:18 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84source=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rtmp/rtmp_publisher.go | 12 +++---- rtmp/rtmp_transdemuxer.go | 7 ---- stream/source.go | 75 ++++++++++++++++++++++++++++++++------- stream/trans_demuxer.go | 35 ------------------ stream/trans_muxer.go | 4 --- 5 files changed, 68 insertions(+), 65 deletions(-) delete mode 100644 rtmp/rtmp_transdemuxer.go delete mode 100644 stream/trans_demuxer.go delete mode 100644 stream/trans_muxer.go diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index ba50204..d4940e0 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -9,7 +9,6 @@ import ( type Publisher struct { stream.SourceImpl - deMuxer libflv.DeMuxer audioMemoryPool stream.MemoryPool videoMemoryPool stream.MemoryPool @@ -18,10 +17,11 @@ type Publisher struct { } func NewPublisher(sourceId string) *Publisher { - publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId}, audioUnmark: false, videoUnmark: false} - publisher.deMuxer = libflv.NewDeMuxer() + deMuxer := libflv.NewDeMuxer() + publisher := &Publisher{SourceImpl: stream.SourceImpl{Id_: sourceId, TransDeMuxer: nil}, audioUnmark: false, videoUnmark: false} //设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.SourceImpl - publisher.deMuxer.SetHandler(publisher) + deMuxer.SetHandler(publisher) + publisher.SourceImpl.SetState(stream.SessionStateTransferring) //创建内存池 publisher.audioMemoryPool = stream.NewMemoryPool(48000 * (stream.AppConfig.GOPCache + 1)) @@ -92,7 +92,7 @@ func (p *Publisher) OnVideo(data []byte, ts uint32) { p.videoUnmark = false } - _ = p.deMuxer.InputVideo(data, ts) + //_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts) } func (p *Publisher) OnAudio(data []byte, ts uint32) { @@ -101,7 +101,7 @@ func (p *Publisher) OnAudio(data []byte, ts uint32) { p.audioUnmark = false } - _ = p.deMuxer.InputAudio(data, ts) + //_ = p.SourceImpl.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts) } // OnPartPacket 从rtmp解析过来的部分音视频包 diff --git a/rtmp/rtmp_transdemuxer.go b/rtmp/rtmp_transdemuxer.go deleted file mode 100644 index f738959..0000000 --- a/rtmp/rtmp_transdemuxer.go +++ /dev/null @@ -1,7 +0,0 @@ -package rtmp - -import "github.com/yangjiechina/avformat" - -type TransDeMuxer struct { - avformat.DeMuxerImpl -} diff --git a/stream/source.go b/stream/source.go index bee03a0..fb4267b 100644 --- a/stream/source.go +++ b/stream/source.go @@ -2,6 +2,7 @@ package stream import ( "fmt" + "github.com/yangjiechina/avformat" "github.com/yangjiechina/avformat/utils" "github.com/yangjiechina/live-server/transcode" "time" @@ -13,6 +14,8 @@ type SourceType byte // Protocol 输出协议 type Protocol uint32 +type SourceEvent byte + const ( SourceTypeRtmp = SourceType(1) SourceType28181 = SourceType(2) @@ -25,6 +28,11 @@ const ( ProtocolRtc = Protocol(5) ProtocolRtmpStr = "rtmp" + + SourceEventPlay = SourceEvent(1) + SourceEventPlayDone = SourceEvent(1) + SourceEventInput = SourceEvent(1) + SourceEventClose = SourceEvent(1) ) // SessionState 推拉流Session状态 @@ -46,9 +54,6 @@ type ISource interface { // Input 输入推流数据 Input(data []byte) - // CreateTransDeMuxer 创建推流的解服用器 - CreateTransDeMuxer() ITransDeMuxer - // CreateTranscoder 创建转码器 CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder @@ -63,7 +68,11 @@ type ISource interface { AddSink(sink ISink) bool // RemoveSink 删除Sink/** - RemoveSink(tid TransStreamId, sinkId string) bool + RemoveSink(sink ISink) bool + + AddEvent(event SourceEvent, data interface{}) + + SetState(state SessionState) // Close 关闭Source // 停止一切封装和转发流以及转码工作 @@ -71,16 +80,14 @@ type ISource interface { Close() } -type onSourceHandler interface { - onDeMuxStream(stream utils.AVStream) -} +type CreateSource func(id string, type_ SourceType, handler avformat.OnDeMuxerHandler) type SourceImpl struct { Id_ string type_ SourceType state SessionState - deMuxer ITransDeMuxer //负责从推流协议中解析出AVStream和AVPacket + TransDeMuxer avformat.DeMuxer //负责从推流协议中解析出AVStream和AVPacket recordSink ISink //每个Source唯一的一个录制流 audioTranscoders []transcode.ITranscoder //音频解码器 videoTranscoders []transcode.ITranscoder //视频解码器 @@ -93,6 +100,13 @@ type SourceImpl struct { //所有的输出协议, 持有Sink transStreams map[TransStreamId]ITransStream + + //sink的拉流和断开拉流事件,都通过管道交给Source处理. 意味着Source内部解析流、封装流、传输流都可以做到无锁操作 + //golang的管道是有锁的(https://github.com/golang/go/blob/d38f1d13fa413436d38d86fe86d6a146be44bb84/src/runtime/chan.go#L202), 后面使用cas队列传输事件, 并且可以做到一次读取多个事件 + inputEvent chan []byte + closeEvent chan byte + playingEventQueue chan ISink + playingDoneEventQueue chan ISink } func (s *SourceImpl) Id() string { @@ -100,12 +114,31 @@ func (s *SourceImpl) Id() string { } func (s *SourceImpl) Input(data []byte) { - s.deMuxer.Input(data) + if SessionStateTransferring == s.state { + s.inputEvent <- data + } else { + s.TransDeMuxer.Input(data, nil) + } } -func (s *SourceImpl) CreateTransDeMuxer() ITransDeMuxer { - //TODO implement me - panic("implement me") +func (s *SourceImpl) LoopEvent() { + for { + select { + case data := <-s.inputEvent: + s.TransDeMuxer.Input(data, nil) + break + case sink := <-s.playingEventQueue: + s.AddSink(sink) + break + case sink := <-s.playingDoneEventQueue: + s.AddSink(sink) + break + case _ = <-s.closeEvent: + s.Close() + return + } + } + } func (s *SourceImpl) CreateTranscoder(src utils.AVStream, dst utils.AVStream) transcode.ITranscoder { @@ -228,10 +261,26 @@ func (s *SourceImpl) AddSink(sink ISink) bool { return false } -func (s *SourceImpl) RemoveSink(tid TransStreamId, sinkId string) bool { +func (s *SourceImpl) RemoveSink(sink ISink) bool { return true } +func (s *SourceImpl) AddEvent(event SourceEvent, data interface{}) { + if SourceEventInput == event { + + } else if SourceEventPlay == event { + + } else if SourceEventPlayDone == event { + + } else if SourceEventClose == event { + + } +} + +func (s *SourceImpl) SetState(state SessionState) { + s.state = state +} + func (s *SourceImpl) Close() { } diff --git a/stream/trans_demuxer.go b/stream/trans_demuxer.go deleted file mode 100644 index 9ee56fe..0000000 --- a/stream/trans_demuxer.go +++ /dev/null @@ -1,35 +0,0 @@ -package stream - -import "github.com/yangjiechina/avformat/utils" - -// OnTransDeMuxerHandler 解复用器回调 /** -type OnTransDeMuxerHandler interface { - OnDeMuxStream(stream utils.AVStream) - OnDeMuxStreamDone() - OnDeMuxPacket(index int, packet utils.AVPacket) - OnDeMuxDone() -} - -type ITransDeMuxer interface { - Input(data []byte) - - SetHandler(handler OnTransDeMuxerHandler) - - Close() -} - -type TransDeMuxerImpl struct { - handler OnTransDeMuxerHandler -} - -func (impl *TransDeMuxerImpl) Input(data []byte) { - panic("implement me") -} - -func (impl *TransDeMuxerImpl) SetHandler(handler OnTransDeMuxerHandler) { - impl.handler = handler -} - -func (impl *TransDeMuxerImpl) Close() { - panic("implement me") -} diff --git a/stream/trans_muxer.go b/stream/trans_muxer.go deleted file mode 100644 index 3b80131..0000000 --- a/stream/trans_muxer.go +++ /dev/null @@ -1,4 +0,0 @@ -package stream - -type ITransMuxer interface { -}