mirror of
https://github.com/Monibuca/plugin-rtmp.git
synced 2025-10-05 15:37:11 +08:00
👌 IMPROVE: 导出RTMP插件
This commit is contained in:
14
amf.go
14
amf.go
@@ -97,14 +97,14 @@ func (amf *AMF) decodeObject() (obj AMFValue) {
|
|||||||
case AMF0_OBJECT:
|
case AMF0_OBJECT:
|
||||||
return amf.readObject()
|
return amf.readObject()
|
||||||
case AMF0_MOVIECLIP:
|
case AMF0_MOVIECLIP:
|
||||||
plugin.Error("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)")
|
RTMPPlugin.Error("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)")
|
||||||
case AMF0_NULL:
|
case AMF0_NULL:
|
||||||
return amf.readNull()
|
return amf.readNull()
|
||||||
case AMF0_UNDEFINED:
|
case AMF0_UNDEFINED:
|
||||||
amf.ReadByte()
|
amf.ReadByte()
|
||||||
return Undefined
|
return Undefined
|
||||||
case AMF0_REFERENCE:
|
case AMF0_REFERENCE:
|
||||||
plugin.Error("reference-type.(AMF0_REFERENCE)")
|
RTMPPlugin.Error("reference-type.(AMF0_REFERENCE)")
|
||||||
case AMF0_ECMA_ARRAY:
|
case AMF0_ECMA_ARRAY:
|
||||||
return amf.readECMAArray()
|
return amf.readECMAArray()
|
||||||
case AMF0_END_OBJECT:
|
case AMF0_END_OBJECT:
|
||||||
@@ -118,15 +118,15 @@ func (amf *AMF) decodeObject() (obj AMFValue) {
|
|||||||
AMF0_XML_DOCUMENT:
|
AMF0_XML_DOCUMENT:
|
||||||
return amf.readLongString()
|
return amf.readLongString()
|
||||||
case AMF0_UNSUPPORTED:
|
case AMF0_UNSUPPORTED:
|
||||||
plugin.Error("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)")
|
RTMPPlugin.Error("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)")
|
||||||
case AMF0_RECORDSET:
|
case AMF0_RECORDSET:
|
||||||
plugin.Error("This type is not supported and is reserved for future use.(AMF0_RECORDSET)")
|
RTMPPlugin.Error("This type is not supported and is reserved for future use.(AMF0_RECORDSET)")
|
||||||
case AMF0_TYPED_OBJECT:
|
case AMF0_TYPED_OBJECT:
|
||||||
plugin.Error("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)")
|
RTMPPlugin.Error("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)")
|
||||||
case AMF0_AVMPLUS_OBJECT:
|
case AMF0_AVMPLUS_OBJECT:
|
||||||
plugin.Error("AMF0_AVMPLUS_OBJECT")
|
RTMPPlugin.Error("AMF0_AVMPLUS_OBJECT")
|
||||||
default:
|
default:
|
||||||
plugin.Error("Unsupported type", zap.Uint8("type", t))
|
RTMPPlugin.Error("Unsupported type", zap.Uint8("type", t))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
10
client.go
10
client.go
@@ -15,7 +15,7 @@ import (
|
|||||||
func NewRTMPClient(addr string) (client *NetConnection, err error) {
|
func NewRTMPClient(addr string) (client *NetConnection, err error) {
|
||||||
u, err := url.Parse(addr)
|
u, err := url.Parse(addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plugin.Error("connect url parse", zap.Error(err))
|
RTMPPlugin.Error("connect url parse", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
if strings.Count(u.Host, ":") == 0 {
|
if strings.Count(u.Host, ":") == 0 {
|
||||||
@@ -23,7 +23,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
|
|||||||
}
|
}
|
||||||
conn, err := net.Dial("tcp", u.Host)
|
conn, err := net.Dial("tcp", u.Host)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err))
|
RTMPPlugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
client = &NetConnection{
|
client = &NetConnection{
|
||||||
@@ -38,7 +38,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
|
|||||||
}
|
}
|
||||||
err = client.ClientHandshake()
|
err = client.ClientHandshake()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
plugin.Error("handshake", zap.Error(err))
|
RTMPPlugin.Error("handshake", zap.Error(err))
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
ps := strings.Split(u.Path, "/")
|
ps := strings.Split(u.Path, "/")
|
||||||
@@ -82,7 +82,7 @@ type RTMPPusher struct {
|
|||||||
|
|
||||||
func (pusher *RTMPPusher) Connect() (err error) {
|
func (pusher *RTMPPusher) Connect() (err error) {
|
||||||
pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL)
|
pusher.NetConnection, err = NewRTMPClient(pusher.RemoteURL)
|
||||||
plugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL))
|
RTMPPlugin.Info("connect", zap.String("remoteURL", pusher.RemoteURL))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -134,7 +134,7 @@ type RTMPPuller struct {
|
|||||||
|
|
||||||
func (puller *RTMPPuller) Connect() (err error) {
|
func (puller *RTMPPuller) Connect() (err error) {
|
||||||
puller.NetConnection, err = NewRTMPClient(puller.RemoteURL)
|
puller.NetConnection, err = NewRTMPClient(puller.RemoteURL)
|
||||||
plugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
|
RTMPPlugin.Info("connect", zap.String("remoteURL", puller.RemoteURL))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
45
main.go
45
main.go
@@ -2,6 +2,7 @@ package rtmp
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
. "m7s.live/engine/v4"
|
. "m7s.live/engine/v4"
|
||||||
@@ -22,28 +23,28 @@ func (c *RTMPConfig) OnEvent(event any) {
|
|||||||
switch v := event.(type) {
|
switch v := event.(type) {
|
||||||
case FirstConfig:
|
case FirstConfig:
|
||||||
if c.ListenAddr != "" {
|
if c.ListenAddr != "" {
|
||||||
plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
|
RTMPPlugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
|
||||||
go c.Listen(plugin, c)
|
go c.Listen(RTMPPlugin, c)
|
||||||
}
|
}
|
||||||
if c.PullOnStart {
|
if c.PullOnStart {
|
||||||
for streamPath, url := range c.PullList {
|
for streamPath, url := range c.PullList {
|
||||||
if err := plugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
|
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
|
||||||
plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
case config.Config:
|
case config.Config:
|
||||||
plugin.CancelFunc()
|
RTMPPlugin.CancelFunc()
|
||||||
if c.ListenAddr != "" {
|
if c.ListenAddr != "" {
|
||||||
plugin.Context, plugin.CancelFunc = context.WithCancel(Engine)
|
RTMPPlugin.Context, RTMPPlugin.CancelFunc = context.WithCancel(Engine)
|
||||||
plugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
|
RTMPPlugin.Info("server rtmp start at", zap.String("listen addr", c.ListenAddr))
|
||||||
go c.Listen(plugin, c)
|
go c.Listen(RTMPPlugin, c)
|
||||||
}
|
}
|
||||||
case SEpublish:
|
case SEpublish:
|
||||||
for streamPath, url := range c.PushList {
|
for streamPath, url := range c.PushList {
|
||||||
if streamPath == v.Stream.Path {
|
if streamPath == v.Stream.Path {
|
||||||
if err := plugin.Push(streamPath, url, new(RTMPPusher), false); err != nil {
|
if err := RTMPPlugin.Push(streamPath, url, new(RTMPPusher), false); err != nil {
|
||||||
plugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
RTMPPlugin.Error("push", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -51,8 +52,8 @@ func (c *RTMPConfig) OnEvent(event any) {
|
|||||||
if c.PullOnSubscribe {
|
if c.PullOnSubscribe {
|
||||||
for streamPath, url := range c.PullList {
|
for streamPath, url := range c.PullList {
|
||||||
if streamPath == v.Path {
|
if streamPath == v.Path {
|
||||||
if err := plugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
|
if err := RTMPPlugin.Pull(streamPath, url, new(RTMPPuller), false); err != nil {
|
||||||
plugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
RTMPPlugin.Error("pull", zap.String("streamPath", streamPath), zap.String("url", url), zap.Error(err))
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@@ -65,4 +66,22 @@ var conf = &RTMPConfig{
|
|||||||
ChunkSize: 4096,
|
ChunkSize: 4096,
|
||||||
TCP: config.TCP{ListenAddr: ":1935"},
|
TCP: config.TCP{ListenAddr: ":1935"},
|
||||||
}
|
}
|
||||||
var plugin = InstallPlugin(conf)
|
var RTMPPlugin = InstallPlugin(conf)
|
||||||
|
|
||||||
|
func (*RTMPConfig) API_Pull(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
err := RTMPPlugin.Pull(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPuller), r.URL.Query().Has("save"))
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
} else {
|
||||||
|
rw.Write([]byte("ok"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (*RTMPConfig) API_Push(rw http.ResponseWriter, r *http.Request) {
|
||||||
|
err := RTMPPlugin.Push(r.URL.Query().Get("streamPath"), r.URL.Query().Get("target"), new(RTMPPusher), r.URL.Query().Has("save"))
|
||||||
|
if err != nil {
|
||||||
|
http.Error(rw, err.Error(), http.StatusBadRequest)
|
||||||
|
} else {
|
||||||
|
rw.Write([]byte("ok"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
8
msg.go
8
msg.go
@@ -310,11 +310,11 @@ func decodeCommandAMF0(chunk *Chunk) {
|
|||||||
codef := zap.String("code", response.Infomation["code"].(string))
|
codef := zap.String("code", response.Infomation["code"].(string))
|
||||||
switch response.Infomation["level"] {
|
switch response.Infomation["level"] {
|
||||||
case Level_Status:
|
case Level_Status:
|
||||||
plugin.Info("_result :", codef)
|
RTMPPlugin.Info("_result :", codef)
|
||||||
case Level_Warning:
|
case Level_Warning:
|
||||||
plugin.Warn("_result :", codef)
|
RTMPPlugin.Warn("_result :", codef)
|
||||||
case Level_Error:
|
case Level_Error:
|
||||||
plugin.Error("_result :", codef)
|
RTMPPlugin.Error("_result :", codef)
|
||||||
}
|
}
|
||||||
if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Publish") {
|
if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Publish") {
|
||||||
chunk.MsgData = &ResponsePublishMessage{
|
chunk.MsgData = &ResponsePublishMessage{
|
||||||
@@ -335,7 +335,7 @@ func decodeCommandAMF0(chunk *Chunk) {
|
|||||||
}
|
}
|
||||||
case "FCPublish", "FCUnpublish":
|
case "FCPublish", "FCUnpublish":
|
||||||
default:
|
default:
|
||||||
plugin.Info("decode command amf0 ", zap.String("cmd", cmd))
|
RTMPPlugin.Info("decode command amf0 ", zap.String("cmd", cmd))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
12
server.go
12
server.go
@@ -52,7 +52,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
/* Handshake */
|
/* Handshake */
|
||||||
if err := nc.Handshake(); err != nil {
|
if err := nc.Handshake(); err != nil {
|
||||||
plugin.Error("handshake", zap.Error(err))
|
RTMPPlugin.Error("handshake", zap.Error(err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
for {
|
for {
|
||||||
@@ -66,7 +66,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
cmd := msg.MsgData.(Commander).GetCommand()
|
cmd := msg.MsgData.(Commander).GetCommand()
|
||||||
plugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
|
RTMPPlugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID))
|
||||||
switch cmd := msg.MsgData.(type) {
|
switch cmd := msg.MsgData.(type) {
|
||||||
case *CallMessage: //connect
|
case *CallMessage: //connect
|
||||||
app := cmd.Object["app"] // 客户端要连接到的服务应用名
|
app := cmd.Object["app"] // 客户端要连接到的服务应用名
|
||||||
@@ -75,7 +75,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
nc.objectEncoding = objectEncoding.(float64)
|
nc.objectEncoding = objectEncoding.(float64)
|
||||||
}
|
}
|
||||||
nc.appName = app.(string)
|
nc.appName = app.(string)
|
||||||
plugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
|
RTMPPlugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding))
|
||||||
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
|
err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10))
|
||||||
nc.writeChunkSize = config.ChunkSize
|
nc.writeChunkSize = config.ChunkSize
|
||||||
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
|
err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize))
|
||||||
@@ -101,7 +101,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
|
err = nc.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
|
||||||
case *CommandMessage: // "createStream"
|
case *CommandMessage: // "createStream"
|
||||||
streamId := atomic.AddUint32(&gstreamid, 1)
|
streamId := atomic.AddUint32(&gstreamid, 1)
|
||||||
plugin.Info("createStream:", zap.Uint32("streamId", streamId))
|
RTMPPlugin.Info("createStream:", zap.Uint32("streamId", streamId))
|
||||||
nc.ResponseCreateStream(cmd.TransactionId, streamId)
|
nc.ResponseCreateStream(cmd.TransactionId, streamId)
|
||||||
case *CURDStreamMessage:
|
case *CURDStreamMessage:
|
||||||
if stream, ok := receivers[cmd.StreamId]; ok {
|
if stream, ok := receivers[cmd.StreamId]; ok {
|
||||||
@@ -133,7 +133,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
if !config.KeepAlive {
|
if !config.KeepAlive {
|
||||||
receiver.SetIO(conn)
|
receiver.SetIO(conn)
|
||||||
}
|
}
|
||||||
if plugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
|
if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
|
||||||
receivers[cmd.StreamId] = receiver
|
receivers[cmd.StreamId] = receiver
|
||||||
receiver.absTs = make(map[uint32]uint32)
|
receiver.absTs = make(map[uint32]uint32)
|
||||||
receiver.Begin()
|
receiver.Begin()
|
||||||
@@ -153,7 +153,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
|
|||||||
sender.SetIO(conn)
|
sender.SetIO(conn)
|
||||||
}
|
}
|
||||||
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
|
sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID)
|
||||||
if plugin.Subscribe(streamPath, sender) != nil {
|
if RTMPPlugin.Subscribe(streamPath, sender) != nil {
|
||||||
sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
|
sender.Response(cmd.TransactionId, NetStream_Play_Failed, Level_Error)
|
||||||
} else {
|
} else {
|
||||||
senders[sender.StreamID] = sender
|
senders[sender.StreamID] = sender
|
||||||
|
Reference in New Issue
Block a user