diff --git a/amf.go b/amf.go index b72b93c..b2b9d33 100644 --- a/amf.go +++ b/amf.go @@ -1,8 +1,8 @@ package rtmp import ( - "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" + "m7s.live/engine/v4/util" ) // Action Message Format -- AMF 0 diff --git a/chunk.go b/chunk.go index 14ed437..0a1dc50 100644 --- a/chunk.go +++ b/chunk.go @@ -3,7 +3,7 @@ package rtmp import ( "encoding/binary" - "github.com/Monibuca/engine/v4/util" + "m7s.live/engine/v4/util" ) // RTMP协议中基本的数据单元称为消息(Message). diff --git a/client.go b/client.go index e5dcf93..d22648d 100644 --- a/client.go +++ b/client.go @@ -6,21 +6,22 @@ import ( "net/url" "strings" - "github.com/Monibuca/engine/v4" - "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" + "m7s.live/engine/v4" + "m7s.live/engine/v4/log" + "m7s.live/engine/v4/util" ) -func NewRTMPClient(addr string) (client *NetConnection) { +func NewRTMPClient(addr string) (client *NetConnection, err error) { u, err := url.Parse(addr) if err != nil { plugin.Error("connect url parse", zap.Error(err)) - return + return nil, err } conn, err := net.Dial("tcp", u.Host) if err != nil { plugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err)) - return + return nil, err } client = &NetConnection{ TCPConn: conn.(*net.TCPConn), @@ -36,7 +37,7 @@ func NewRTMPClient(addr string) (client *NetConnection) { err = client.ClientHandshake() if err != nil { plugin.Error("handshake", zap.Error(err)) - return nil + return nil, err } connectArg := make(AMFObject) connectArg["swfUrl"] = addr @@ -48,7 +49,7 @@ func NewRTMPClient(addr string) (client *NetConnection) { for { msg, err := client.RecvMessage() if err != nil { - return nil + return nil, err } switch msg.MessageTypeID { case RTMP_MSG_AMF0_COMMAND: @@ -57,9 +58,9 @@ func NewRTMPClient(addr string) (client *NetConnection) { case "_result": response := msg.MsgData.(*ResponseMessage) if response.Infomation["code"] == NetConnection_Connect_Success { - return + return client, nil } else { - return nil + return nil, err } } } @@ -71,25 +72,15 @@ type RTMPPusher struct { engine.Pusher } -func (pusher *RTMPPusher) OnEvent(event any) { - pusher.RTMPSender.OnEvent(event) - switch event.(type) { - case *engine.Stream: - pusher.NetConnection = NewRTMPClient(pusher.RemoteURL) - if pusher.NetConnection != nil { - pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) - go pusher.push() - } - case engine.PushEvent: - pusher.ReConnectCount++ - if pusher.Stream == nil { - plugin.Subscribe(pusher.StreamPath, pusher) - } - } +func (pusher *RTMPPusher) Connect() (err error) { + pusher.ReConnectCount++ + pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL) + log.Info("connect", zap.String("remoteURL", pusher.RemoteURL)) + return } -func (pusher *RTMPPusher) push() { - defer pusher.Stop() +func (pusher *RTMPPusher) Push() { + pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) for { msg, err := pusher.RecvMessage() if err != nil { @@ -124,9 +115,6 @@ func (pusher *RTMPPusher) push() { } } } - if !pusher.Stream.IsClosed() && pusher.Reconnect() { - pusher.OnEvent(engine.PullEvent(pusher.ReConnectCount)) - } } type RTMPPuller struct { @@ -134,27 +122,16 @@ type RTMPPuller struct { engine.Puller } -func (puller *RTMPPuller) OnEvent(event any) { - puller.RTMPReceiver.OnEvent(event) - switch event.(type) { - case *engine.Stream: - puller.NetConnection = NewRTMPClient(puller.RemoteURL) - if puller.NetConnection != nil { - puller.absTs = make(map[uint32]uint32) - puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) - go puller.pull() - break - } - case engine.PullEvent: - puller.ReConnectCount++ - if puller.Stream == nil { - plugin.Publish(puller.StreamPath, puller) - } - } +func (puller *RTMPPuller) Connect() (err error) { + puller.ReConnectCount++ + puller.NetConnection, err = NewRTMPClient(puller.RemoteURL) + log.Info("connect", zap.String("remoteURL", puller.RemoteURL)) + return } -func (puller *RTMPPuller) pull() { - defer puller.Stop() +func (puller *RTMPPuller) Pull() { + puller.absTs = make(map[uint32]uint32) + puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) for { msg, err := puller.RecvMessage() if err != nil { diff --git a/go.mod b/go.mod index e73ed6d..af6c4e1 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/Monibuca/plugin-rtmp/v4 +module m7s.live/plugin/rtmp/v4 go 1.18 diff --git a/main.go b/main.go index f7d59b5..9fe5a50 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,9 @@ package rtmp import ( "context" - . "github.com/Monibuca/engine/v4" - "github.com/Monibuca/engine/v4/config" "go.uber.org/zap" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/config" ) type RTMPConfig struct { @@ -24,6 +24,13 @@ func (c *RTMPConfig) OnEvent(event any) { plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr)) go c.Listen(plugin, c) } + if c.PullOnStart { + for streamPath, url := range c.PullList { + if err := plugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil { + plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) + } + } + } case config.Config: plugin.CancelFunc() if c.ListenAddr != "" { @@ -31,20 +38,29 @@ func (c *RTMPConfig) OnEvent(event any) { plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr)) go c.Listen(plugin, c) } - case Puller: - client := RTMPPuller{ - Puller: v, + case SEpublish: + for streamPath, url := range c.PushList { + if streamPath == v.Stream.Path { + if err := plugin.Push(streamPath, url, new(RTMPPusher), false); err != nil { + plugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) + } + } } - client.OnEvent(PullEvent(0)) - case Pusher: - client := RTMPPusher{ - Pusher: v, + case *Stream: //按需拉流 + if c.PullOnSubscribe { + for streamPath, url := range c.PullList { + if streamPath == v.Path { + if err := plugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil { + plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err)) + } + break + } + } } - client.OnEvent(PushEvent(0)) } } var plugin = InstallPlugin(&RTMPConfig{ ChunkSize: 4096, TCP: config.TCP{ListenAddr: ":1935"}, -}) \ No newline at end of file +}) diff --git a/media.go b/media.go index 7ab5bfc..abf1d01 100644 --- a/media.go +++ b/media.go @@ -3,8 +3,8 @@ package rtmp import ( "net" - . "github.com/Monibuca/engine/v4" - "github.com/Monibuca/engine/v4/util" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/util" ) type RTMPSender struct { @@ -18,11 +18,9 @@ func (rtmp *RTMPSender) OnEvent(event any) { rtmp.sendAVMessage(0, v.AVCC, true, true) case VideoDeConf: rtmp.sendAVMessage(0, v.AVCC, false, true) - // case TrackRemoved: - //TODO - case AudioFrame: + case *AudioFrame: rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) - case VideoFrame: + case *VideoFrame: rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false) default: rtmp.Subscriber.OnEvent(event) diff --git a/msg.go b/msg.go index 101e39f..f938e26 100644 --- a/msg.go +++ b/msg.go @@ -5,8 +5,8 @@ import ( "errors" "strings" - "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" + "m7s.live/engine/v4/util" ) const ( diff --git a/netConnection.go b/netConnection.go index 73ba6e1..22d3fc0 100644 --- a/netConnection.go +++ b/netConnection.go @@ -7,8 +7,8 @@ import ( "io" "net" - . "github.com/Monibuca/engine/v4" - "github.com/Monibuca/engine/v4/util" + . "m7s.live/engine/v4" + "m7s.live/engine/v4/util" ) const ( @@ -27,7 +27,7 @@ const ( SEND_CONNECT_MESSAGE = "Send Connect Message" SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message" - SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message" + SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message" SEND_PLAY_MESSAGE = "Send Play Message" SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message" diff --git a/server.go b/server.go index 24c86fc..b745c50 100644 --- a/server.go +++ b/server.go @@ -7,9 +7,9 @@ import ( "net" "sync/atomic" - "github.com/Monibuca/engine/v4" - "github.com/Monibuca/engine/v4/util" "go.uber.org/zap" + "m7s.live/engine/v4" + "m7s.live/engine/v4/util" ) type NetStream struct { @@ -50,7 +50,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { ctx, cancel := context.WithCancel(engine.Engine) defer func() { nc.Close() - cancel() + cancel() //终止所有发布者和订阅者 }() /* Handshake */ if err := nc.Handshake(); err != nil { @@ -100,8 +100,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { StreamID: pm.StreamId, }, } - receiver.Closer = &nc - receiver.OnEvent(ctx) + receiver.SetParentCtx(ctx) if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) == nil { receivers[receiver.StreamID] = receiver receiver.absTs = make(map[uint32]uint32) @@ -118,7 +117,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { &nc, msg.MessageStreamID, } - sender.OnEvent(ctx) + sender.SetParentCtx(ctx) sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID) if plugin.Subscribe(streamPath, sender) == nil { senders[sender.StreamID] = sender