diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..7f73d3d --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2019-present, dexter + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. \ No newline at end of file diff --git a/go.mod b/go.mod index e3f0ff7..8b11a8e 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/Monibuca/rtmpplugin go 1.13 -require github.com/Monibuca/engine v1.0.2 +require ( + github.com/Monibuca/engine v1.1.2 + github.com/funny/utest v0.0.0-20161029064919-43870a374500 // indirect +) diff --git a/go.sum b/go.sum index c4e2210..17e3909 100644 --- a/go.sum +++ b/go.sum @@ -2,19 +2,30 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Monibuca/engine v1.0.2 h1:UpPAEQVYrVQrLr9GVGcbu8x5Oiemqd5J2zjGZ/Fhg74= github.com/Monibuca/engine v1.0.2/go.mod h1:NjqVgtXuRSOyk3+NWgCuDf2p7TsBisjYxoEwA9uCZ38= +github.com/Monibuca/engine v1.1.2 h1:vhZkKO8r/S3pBgM+AY+vyNZDV4wX0EScdf3PJw/+Ths= +github.com/Monibuca/engine v1.1.2/go.mod h1:OEmvKy5/pgbVEglb6RLC6EL+LRS7UR3l46Vgz1AURJc= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= +github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= +github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY= github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/main.go b/main.go index d8711d2..5f7d61b 100644 --- a/main.go +++ b/main.go @@ -6,7 +6,10 @@ import ( . "github.com/Monibuca/engine" ) -var config = new(ListenerConfig) +var config = new(struct { + ListenAddr string + FirstScreen bool +}) func init() { InstallPlugin(&PluginConfig{ diff --git a/msg.go b/msg.go index 831d5e8..a800b5b 100644 --- a/msg.go +++ b/msg.go @@ -2,9 +2,10 @@ package rtmpplugin import ( "bytes" - "github.com/Monibuca/engine/util" "log" "sync" + + "github.com/Monibuca/engine/util" ) const ( @@ -78,6 +79,9 @@ var ( func newChunkHeader(messageType byte) *ChunkHeader { head := rtmpHeaderPool.Get().(*ChunkHeader) head.ChunkStreamID = RTMP_CSID_CONTROL + if messageType == RTMP_MSG_AMF0_COMMAND { + head.ChunkStreamID = RTMP_CSID_COMMAND + } head.Timestamp = 0 head.MessageTypeID = messageType head.MessageStreamID = 0 diff --git a/netConnection.go b/netConnection.go index 82d5a7b..a503904 100644 --- a/netConnection.go +++ b/netConnection.go @@ -243,12 +243,11 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { m.StreamID = streamID return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_CONNECT_RESPONSE_MESSAGE: - data := newConnectResponseMessageData(args.(float64)) //if !ok { // errors.New(SEND_CONNECT_RESPONSE_MESSAGE + ", The parameter is AMFObjects(map[string]interface{})") //} - //pro := newAMFObjects() + pro := newAMFObjects() info := newAMFObjects() //for i, v := range data { @@ -257,10 +256,19 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { // pro[i] = v // } //} + + pro["fmsVer"] = "monibuca/" + engine.Version + pro["capabilities"] = 31 + pro["mode"] = 1 + pro["Author"] = "dexter" + + info["level"] = Level_Status + info["code"] = NetConnection_Connect_Success + info["objectEncoding"] = args.(float64) m := new(ResponseConnectMessage) m.CommandName = Response_Result m.TransactionId = 1 - m.Properties = data + m.Properties = pro m.Infomation = info return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_CONNECT_MESSAGE: @@ -351,18 +359,17 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, isFirst bool) error { - // if conn.writeSeqNum > conn.bandwidth { - // conn.totalWrite += conn.writeSeqNum - // conn.writeSeqNum = 0 - // conn.SendMessage(SEND_ACK_MESSAGE, conn.totalWrite) - // conn.SendMessage(SEND_PING_REQUEST_MESSAGE, nil) - // } + if conn.writeSeqNum > conn.bandwidth { + conn.totalWrite += conn.writeSeqNum + conn.writeSeqNum = 0 + conn.SendMessage(SEND_ACK_MESSAGE, conn.totalWrite) + conn.SendMessage(SEND_PING_REQUEST_MESSAGE, nil) + } var err error var mark []byte var need []byte var head *ChunkHeader - if isAudio { head = newRtmpHeader(RTMP_CSID_AUDIO, av.Timestamp, uint32(len(av.Packet.Payload)), RTMP_MSG_AUDIO, conn.streamID, 0) } else { @@ -376,19 +383,18 @@ func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, mark, need, err = encodeChunk12(head, av.Packet.Payload, conn.writeChunkSize) } else { mark, need, err = encodeChunk8(head, av.Packet.Payload, conn.writeChunkSize) + } if err != nil { return err } - _, err = conn.Write(mark) - if err != nil { + if _, err = conn.Write(mark); err != nil { return err } - err = conn.Flush() - if err != nil { + if err = conn.Flush(); err != nil { return err } @@ -396,18 +402,15 @@ func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, // 如果音视频数据太大,一次发送不完,那么在这里进行分割(data + Chunk Basic Header(1)) for need != nil && len(need) > 0 { - mark, need, err = encodeChunk1(head, need, conn.writeChunkSize) - if err != nil { + if mark, need, err = encodeChunk1(head, need, conn.writeChunkSize); err != nil { return err } - _, err = conn.Write(mark) - if err != nil { + if _, err = conn.Write(mark); err != nil { return err } - err = conn.Flush() - if err != nil { + if err = conn.Flush(); err != nil { return err } @@ -475,6 +478,7 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { if markRead == msgLen { msg := chunkMsgPool.Get().(*Chunk) + msg.MsgData = nil msg.Body = currentBody msg.ChunkHeader = chunkHead.Clone() GetRtmpMessage(msg) diff --git a/netStream.go b/netStream.go index 33f9eff..3f3b2f6 100644 --- a/netStream.go +++ b/netStream.go @@ -97,6 +97,7 @@ func processRtmp(conn net.Conn) { switch cmd.CommandName { case "createStream": nc.streamID = nc.nextStreamID(msg.ChunkStreamID) + log.Println("createStream:", nc.streamID) err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId) if MayBeError(err) { return @@ -106,17 +107,22 @@ func processRtmp(conn net.Conn) { streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0] pub := new(RTMP) if pub.Publish(streamPath, pub) { - //pub.FirstScreen = make([]*avformat.AVPacket, 0) + if config.FirstScreen { + pub.FirstScreen = make([]*avformat.AVPacket, 0) + } room = pub.Room err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status)) } else { - err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, Level_Error, NetStream_Publish_BadName)) + err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error)) } case "play": pm := msg.MsgData.(*PlayMessage) streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] nc.writeChunkSize = 512 + var lastAudioTime uint32 = 0 + var lastVideoTime uint32 = 0 + // followAVCSequence := false stream := &OutputStream{SendHandler: func(packet *avformat.SendPacket) (err error) { switch true { case packet.Packet.IsADTS: @@ -135,11 +141,23 @@ func processRtmp(conn net.Conn) { } case packet.Packet.IsAVCSequence: err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, packet) + // followAVCSequence = true case packet.Packet.Type == RTMP_MSG_VIDEO: + // if followAVCSequence { + // followAVCSequence = false + // err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, packet) + // break + // } + t := packet.Timestamp - lastVideoTime + lastVideoTime = packet.Timestamp + packet.Timestamp = t err = nc.SendMessage(SEND_VIDEO_MESSAGE, packet) case packet.Packet.IsAACSequence: err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, packet) case packet.Packet.Type == RTMP_MSG_AUDIO: + t := packet.Timestamp - lastAudioTime + lastAudioTime = packet.Timestamp + packet.Timestamp = t err = nc.SendMessage(SEND_AUDIO_MESSAGE, packet) } return