From 1b30fedc7dc8efd00a08705a588412efea84176b Mon Sep 17 00:00:00 2001 From: yangjiechina <1534796060@qq.com> Date: Wed, 6 Nov 2024 20:33:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84=E4=BB=A3=E7=A0=81=E7=BB=93?= =?UTF-8?q?=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rtmp/rtmp_publisher.go | 24 +++++++++++------------- rtmp/rtmp_session.go | 41 +++++++++++++++++++++-------------------- rtsp/rtsp_server.go | 14 +++++++------- 3 files changed, 39 insertions(+), 40 deletions(-) diff --git a/rtmp/rtmp_publisher.go b/rtmp/rtmp_publisher.go index a98d51b..3e95082 100644 --- a/rtmp/rtmp_publisher.go +++ b/rtmp/rtmp_publisher.go @@ -16,22 +16,12 @@ type Publisher struct { stack *librtmp.Stack } -func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) *Publisher { - deMuxer := libflv.NewDeMuxer() - publisher_ := &Publisher{PublishSource: stream.PublishSource{ID: sourceId, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack} - //设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.PublishSource - deMuxer.SetHandler(publisher_) - //为推流方分配足够多的缓冲区 - //conn.(*transport.Conn).ReallocateRecvBuffer(1024 * 1024) - return publisher_ -} - func (p *Publisher) Input(data []byte) error { return p.stack.Input(nil, data) } func (p *Publisher) OnDeMuxStream(stream utils.AVStream) { - //AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存 + // AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存 p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail() if !p.IsCompleted() { p.PublishSource.OnDeMuxStream(stream) @@ -42,10 +32,9 @@ func (p *Publisher) OnDeMuxStream(stream utils.AVStream) { } // OnVideo 解析出来的完整视频包 -// @ts rtmp chunk的相对时间戳 func (p *Publisher) OnVideo(index int, data []byte, ts uint32) { data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeVideo).Fetch() - //交给flv解复用器, 解析回调出AVPacket + // 交给flv解复用器, 解析出AVPacket p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts) } @@ -54,6 +43,7 @@ func (p *Publisher) OnAudio(index int, data []byte, ts uint32) { p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts) } +// OnPartPacket AVPacket的部分数据包 func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []byte, first bool) { buffer := p.FindOrCreatePacketBuffer(index, mediaType) if first { @@ -67,3 +57,11 @@ func (p *Publisher) Close() { p.PublishSource.Close() p.stack = nil } + +func NewPublisher(source string, stack *librtmp.Stack, conn net.Conn) *Publisher { + deMuxer := libflv.NewDeMuxer() + publisher := &Publisher{PublishSource: stream.PublishSource{ID: source, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack} + // 设置回调, 接受从DeMuxer解析出来的音视频包 + deMuxer.SetHandler(publisher) + return publisher +} diff --git a/rtmp/rtmp_session.go b/rtmp/rtmp_session.go index 0ac915b..a0ed8b1 100644 --- a/rtmp/rtmp_session.go +++ b/rtmp/rtmp_session.go @@ -8,23 +8,23 @@ import ( "net" ) -// Session 负责除连接和断开以外的所有RTMP生命周期处理 +// Session RTMP会话, 解析处理Message type Session struct { - stack *librtmp.Stack //rtmp协议栈 - handle interface{} //Publisher/sink, 在publish或play成功后赋值 - isPublisher bool + stack *librtmp.Stack // rtmp协议栈, 解析message + handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值 + isPublisher bool // 是否时推流会话 conn net.Conn - receiveBuffer *stream.ReceiveBuffer + receiveBuffer *stream.ReceiveBuffer // 推流源收流队列 } -func (s *Session) generateSourceId(app, stream_ string) string { +func (s *Session) generateSourceID(app, stream string) string { if len(app) == 0 { - return stream_ - } else if len(stream_) == 0 { + return stream + } else if len(stream) == 0 { return app } else { - return app + "/" + stream_ + return app + "/" + stream } } @@ -33,16 +33,16 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState { streamName, values := stream.ParseUrl(stream_) - sourceId := s.generateSourceId(app, streamName) + sourceId := s.generateSourceID(app, streamName) source := NewPublisher(sourceId, s.stack, s.conn) - //设置推流的音视频回调 + // 设置推流的音视频回调 s.stack.SetOnPublishHandler(source) - //初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针. + // 初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针. source.Init(stream.ReceiveBufferTCPBlockCount) source.SetUrlValues(values) - //统一处理source推流事件, source是否已经存在, hook回调.... + // 统一处理source推流事件, source是否已经存在, hook回调.... _, state := stream.PreparePublishSource(source, true) if utils.HookStateOK != state { log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId) @@ -60,15 +60,15 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState { func (s *Session) OnPlay(app, stream_ string) utils.HookState { streamName, values := stream.ParseUrl(stream_) - sourceId := s.generateSourceId(app, streamName) + sourceId := s.generateSourceID(app, streamName) sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack) sink.SetUrlValues(values) - log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String()) + log.Sugar.Infof("rtmp onplay app: %s stream: %s sink: %v conn: %s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String()) _, state := stream.PreparePlaySink(sink) if utils.HookStateOK != state { - log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.GetID()) + log.Sugar.Errorf("rtmp拉流失败 source: %s sink: %s", sourceId, sink.GetID()) } else { s.handle = sink } @@ -77,7 +77,7 @@ func (s *Session) OnPlay(app, stream_ string) utils.HookState { } func (s *Session) Input(conn net.Conn, data []byte) error { - //如果是推流,并且握手成功,后续收到的包,都将发送给LoopEvent处理 + // 推流会话, 收到的包都将交由主协程处理 if s.isPublisher { s.handle.(*Publisher).PublishSource.Input(data) return nil @@ -87,7 +87,7 @@ func (s *Session) Input(conn net.Conn, data []byte) error { } func (s *Session) Close() { - //session/conn/stack相互引用, go释放不了...手动赋值为nil + // session/conn/stack相互引用, go释放不了...手动赋值为nil s.conn = nil defer func() { @@ -97,7 +97,7 @@ func (s *Session) Close() { } }() - //还没到publish/play + // 还未确定会话类型, 无需处理 if s.handle == nil { return } @@ -107,11 +107,12 @@ func (s *Session) Close() { log.Sugar.Infof("rtmp推流结束 %s", publisher.String()) if s.isPublisher { - s.handle.(*Publisher).Close() + publisher.Close() s.receiveBuffer = nil } } else { sink := s.handle.(*Sink) + log.Sugar.Infof("rtmp拉流结束 %s", sink.String()) sink.Close() } diff --git a/rtsp/rtsp_server.go b/rtsp/rtsp_server.go index 28a5998..ab126ce 100644 --- a/rtsp/rtsp_server.go +++ b/rtsp/rtsp_server.go @@ -14,12 +14,6 @@ type Server interface { Close() } -func NewServer(password string) Server { - return &server{ - handler: NewHandler(password), - } -} - type server struct { tcp *transport.TCPServer handler *handler @@ -28,7 +22,7 @@ type server struct { func (s *server) Start(addr net.Addr) error { utils.Assert(s.tcp == nil) - //监听TCP端口 + // 监听TCP端口 tcp := &transport.TCPServer{ ReuseServer: transport.ReuseServer{ EnableReuse: true, @@ -91,3 +85,9 @@ func (s *server) OnDisConnected(conn net.Conn, err error) { s.closeSession(conn) } + +func NewServer(password string) Server { + return &server{ + handler: NewHandler(password), + } +}