mirror of
https://github.com/lkmio/lkm.git
synced 2025-10-04 14:52:44 +08:00
完善代码结构
This commit is contained in:
@@ -16,22 +16,12 @@ type Publisher struct {
|
|||||||
stack *librtmp.Stack
|
stack *librtmp.Stack
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewPublisher(sourceId string, stack *librtmp.Stack, conn net.Conn) *Publisher {
|
|
||||||
deMuxer := libflv.NewDeMuxer()
|
|
||||||
publisher_ := &Publisher{PublishSource: stream.PublishSource{ID: sourceId, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
|
|
||||||
//设置回调,从flv解析出来的Stream和AVPacket都将统一回调到stream.PublishSource
|
|
||||||
deMuxer.SetHandler(publisher_)
|
|
||||||
//为推流方分配足够多的缓冲区
|
|
||||||
//conn.(*transport.Conn).ReallocateRecvBuffer(1024 * 1024)
|
|
||||||
return publisher_
|
|
||||||
}
|
|
||||||
|
|
||||||
func (p *Publisher) Input(data []byte) error {
|
func (p *Publisher) Input(data []byte) error {
|
||||||
return p.stack.Input(nil, data)
|
return p.stack.Input(nil, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
|
func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
|
||||||
//AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存
|
// AVStream的ExtraData已经拷贝, 释放掉内存池中最新分配的内存
|
||||||
p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail()
|
p.FindOrCreatePacketBuffer(stream.Index(), stream.Type()).FreeTail()
|
||||||
if !p.IsCompleted() {
|
if !p.IsCompleted() {
|
||||||
p.PublishSource.OnDeMuxStream(stream)
|
p.PublishSource.OnDeMuxStream(stream)
|
||||||
@@ -42,10 +32,9 @@ func (p *Publisher) OnDeMuxStream(stream utils.AVStream) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// OnVideo 解析出来的完整视频包
|
// OnVideo 解析出来的完整视频包
|
||||||
// @ts rtmp chunk的相对时间戳
|
|
||||||
func (p *Publisher) OnVideo(index int, data []byte, ts uint32) {
|
func (p *Publisher) OnVideo(index int, data []byte, ts uint32) {
|
||||||
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeVideo).Fetch()
|
data = p.FindOrCreatePacketBuffer(index, utils.AVMediaTypeVideo).Fetch()
|
||||||
//交给flv解复用器, 解析回调出AVPacket
|
// 交给flv解复用器, 解析出AVPacket
|
||||||
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
|
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputVideo(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,6 +43,7 @@ func (p *Publisher) OnAudio(index int, data []byte, ts uint32) {
|
|||||||
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
|
p.PublishSource.TransDeMuxer.(libflv.DeMuxer).InputAudio(data, ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// OnPartPacket AVPacket的部分数据包
|
||||||
func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []byte, first bool) {
|
func (p *Publisher) OnPartPacket(index int, mediaType utils.AVMediaType, data []byte, first bool) {
|
||||||
buffer := p.FindOrCreatePacketBuffer(index, mediaType)
|
buffer := p.FindOrCreatePacketBuffer(index, mediaType)
|
||||||
if first {
|
if first {
|
||||||
@@ -67,3 +57,11 @@ func (p *Publisher) Close() {
|
|||||||
p.PublishSource.Close()
|
p.PublishSource.Close()
|
||||||
p.stack = nil
|
p.stack = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewPublisher(source string, stack *librtmp.Stack, conn net.Conn) *Publisher {
|
||||||
|
deMuxer := libflv.NewDeMuxer()
|
||||||
|
publisher := &Publisher{PublishSource: stream.PublishSource{ID: source, Type: stream.SourceTypeRtmp, TransDeMuxer: deMuxer, Conn: conn}, stack: stack}
|
||||||
|
// 设置回调, 接受从DeMuxer解析出来的音视频包
|
||||||
|
deMuxer.SetHandler(publisher)
|
||||||
|
return publisher
|
||||||
|
}
|
||||||
|
@@ -8,23 +8,23 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Session 负责除连接和断开以外的所有RTMP生命周期处理
|
// Session RTMP会话, 解析处理Message
|
||||||
type Session struct {
|
type Session struct {
|
||||||
stack *librtmp.Stack //rtmp协议栈
|
stack *librtmp.Stack // rtmp协议栈, 解析message
|
||||||
handle interface{} //Publisher/sink, 在publish或play成功后赋值
|
handle interface{} // 持有具体会话句柄(推流端/拉流端), 在@see OnPublish @see OnPlay回调中赋值
|
||||||
isPublisher bool
|
isPublisher bool // 是否时推流会话
|
||||||
|
|
||||||
conn net.Conn
|
conn net.Conn
|
||||||
receiveBuffer *stream.ReceiveBuffer
|
receiveBuffer *stream.ReceiveBuffer // 推流源收流队列
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Session) generateSourceId(app, stream_ string) string {
|
func (s *Session) generateSourceID(app, stream string) string {
|
||||||
if len(app) == 0 {
|
if len(app) == 0 {
|
||||||
return stream_
|
return stream
|
||||||
} else if len(stream_) == 0 {
|
} else if len(stream) == 0 {
|
||||||
return app
|
return app
|
||||||
} else {
|
} else {
|
||||||
return app + "/" + stream_
|
return app + "/" + stream
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -33,16 +33,16 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState {
|
|||||||
|
|
||||||
streamName, values := stream.ParseUrl(stream_)
|
streamName, values := stream.ParseUrl(stream_)
|
||||||
|
|
||||||
sourceId := s.generateSourceId(app, streamName)
|
sourceId := s.generateSourceID(app, streamName)
|
||||||
source := NewPublisher(sourceId, s.stack, s.conn)
|
source := NewPublisher(sourceId, s.stack, s.conn)
|
||||||
//设置推流的音视频回调
|
// 设置推流的音视频回调
|
||||||
s.stack.SetOnPublishHandler(source)
|
s.stack.SetOnPublishHandler(source)
|
||||||
|
|
||||||
//初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针.
|
// 初始化放在add source前面, 以防add后再init, 空窗期拉流队列空指针.
|
||||||
source.Init(stream.ReceiveBufferTCPBlockCount)
|
source.Init(stream.ReceiveBufferTCPBlockCount)
|
||||||
source.SetUrlValues(values)
|
source.SetUrlValues(values)
|
||||||
|
|
||||||
//统一处理source推流事件, source是否已经存在, hook回调....
|
// 统一处理source推流事件, source是否已经存在, hook回调....
|
||||||
_, state := stream.PreparePublishSource(source, true)
|
_, state := stream.PreparePublishSource(source, true)
|
||||||
if utils.HookStateOK != state {
|
if utils.HookStateOK != state {
|
||||||
log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId)
|
log.Sugar.Errorf("rtmp推流失败 source:%s", sourceId)
|
||||||
@@ -60,15 +60,15 @@ func (s *Session) OnPublish(app, stream_ string) utils.HookState {
|
|||||||
func (s *Session) OnPlay(app, stream_ string) utils.HookState {
|
func (s *Session) OnPlay(app, stream_ string) utils.HookState {
|
||||||
streamName, values := stream.ParseUrl(stream_)
|
streamName, values := stream.ParseUrl(stream_)
|
||||||
|
|
||||||
sourceId := s.generateSourceId(app, streamName)
|
sourceId := s.generateSourceID(app, streamName)
|
||||||
sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
|
sink := NewSink(stream.NetAddr2SinkId(s.conn.RemoteAddr()), sourceId, s.conn, s.stack)
|
||||||
sink.SetUrlValues(values)
|
sink.SetUrlValues(values)
|
||||||
|
|
||||||
log.Sugar.Infof("rtmp onplay app:%s stream:%s sink:%v conn:%s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String())
|
log.Sugar.Infof("rtmp onplay app: %s stream: %s sink: %v conn: %s", app, stream_, sink.GetID(), s.conn.RemoteAddr().String())
|
||||||
|
|
||||||
_, state := stream.PreparePlaySink(sink)
|
_, state := stream.PreparePlaySink(sink)
|
||||||
if utils.HookStateOK != state {
|
if utils.HookStateOK != state {
|
||||||
log.Sugar.Errorf("rtmp拉流失败 source:%s sink:%s", sourceId, sink.GetID())
|
log.Sugar.Errorf("rtmp拉流失败 source: %s sink: %s", sourceId, sink.GetID())
|
||||||
} else {
|
} else {
|
||||||
s.handle = sink
|
s.handle = sink
|
||||||
}
|
}
|
||||||
@@ -77,7 +77,7 @@ func (s *Session) OnPlay(app, stream_ string) utils.HookState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Session) Input(conn net.Conn, data []byte) error {
|
func (s *Session) Input(conn net.Conn, data []byte) error {
|
||||||
//如果是推流,并且握手成功,后续收到的包,都将发送给LoopEvent处理
|
// 推流会话, 收到的包都将交由主协程处理
|
||||||
if s.isPublisher {
|
if s.isPublisher {
|
||||||
s.handle.(*Publisher).PublishSource.Input(data)
|
s.handle.(*Publisher).PublishSource.Input(data)
|
||||||
return nil
|
return nil
|
||||||
@@ -87,7 +87,7 @@ func (s *Session) Input(conn net.Conn, data []byte) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Session) Close() {
|
func (s *Session) Close() {
|
||||||
//session/conn/stack相互引用, go释放不了...手动赋值为nil
|
// session/conn/stack相互引用, go释放不了...手动赋值为nil
|
||||||
s.conn = nil
|
s.conn = nil
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
@@ -97,7 +97,7 @@ func (s *Session) Close() {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
//还没到publish/play
|
// 还未确定会话类型, 无需处理
|
||||||
if s.handle == nil {
|
if s.handle == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -107,11 +107,12 @@ func (s *Session) Close() {
|
|||||||
log.Sugar.Infof("rtmp推流结束 %s", publisher.String())
|
log.Sugar.Infof("rtmp推流结束 %s", publisher.String())
|
||||||
|
|
||||||
if s.isPublisher {
|
if s.isPublisher {
|
||||||
s.handle.(*Publisher).Close()
|
publisher.Close()
|
||||||
s.receiveBuffer = nil
|
s.receiveBuffer = nil
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
sink := s.handle.(*Sink)
|
sink := s.handle.(*Sink)
|
||||||
|
|
||||||
log.Sugar.Infof("rtmp拉流结束 %s", sink.String())
|
log.Sugar.Infof("rtmp拉流结束 %s", sink.String())
|
||||||
sink.Close()
|
sink.Close()
|
||||||
}
|
}
|
||||||
|
@@ -14,12 +14,6 @@ type Server interface {
|
|||||||
Close()
|
Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServer(password string) Server {
|
|
||||||
return &server{
|
|
||||||
handler: NewHandler(password),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
type server struct {
|
type server struct {
|
||||||
tcp *transport.TCPServer
|
tcp *transport.TCPServer
|
||||||
handler *handler
|
handler *handler
|
||||||
@@ -28,7 +22,7 @@ type server struct {
|
|||||||
func (s *server) Start(addr net.Addr) error {
|
func (s *server) Start(addr net.Addr) error {
|
||||||
utils.Assert(s.tcp == nil)
|
utils.Assert(s.tcp == nil)
|
||||||
|
|
||||||
//监听TCP端口
|
// 监听TCP端口
|
||||||
tcp := &transport.TCPServer{
|
tcp := &transport.TCPServer{
|
||||||
ReuseServer: transport.ReuseServer{
|
ReuseServer: transport.ReuseServer{
|
||||||
EnableReuse: true,
|
EnableReuse: true,
|
||||||
@@ -91,3 +85,9 @@ func (s *server) OnDisConnected(conn net.Conn, err error) {
|
|||||||
|
|
||||||
s.closeSession(conn)
|
s.closeSession(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewServer(password string) Server {
|
||||||
|
return &server{
|
||||||
|
handler: NewHandler(password),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user