From 3818e39b1925c9376c1fad8abb8378480026c6ce Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Mon, 20 Dec 2021 13:38:43 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- chunk.go | 54 ++++++++++++++---------------------------------- go.mod | 4 ++-- go.sum | 32 ++++++++-------------------- main.go | 5 +++-- msg.go | 13 +++++++++++- netConnection.go | 18 +++++++++------- netStream.go | 49 +++++++------------------------------------ 7 files changed, 59 insertions(+), 116 deletions(-) diff --git a/chunk.go b/chunk.go index 7b67af7..241c2a8 100644 --- a/chunk.go +++ b/chunk.go @@ -81,10 +81,7 @@ type ChunkExtendedTimestamp struct { // 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7) // 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11) -func (nc *NetConnection) encodeChunk12(head *ChunkHeader, payload []byte, size int) (need []byte, err error) { - if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { - return nil, errors.New("chunk error") - } +func (nc *NetConnection) encodeChunk12(head *ChunkHeader, payload []byte) (need []byte, err error) { b := utils.GetSlice(12) //chunkBasicHead b[0] = byte(RTMP_CHUNK_HEAD_12 + head.ChunkBasicHeader.ChunkStreamID) @@ -102,21 +99,10 @@ func (nc *NetConnection) encodeChunk12(head *ChunkHeader, payload []byte, size i utils.RecycleSlice(b) nc.writeSeqNum += 4 } - if len(payload) > size { - nc.Write(payload[0:size]) - nc.writeSeqNum += uint32(size) - need = payload[size:] - } else { - nc.Write(payload) - nc.writeSeqNum += uint32(len(payload)) - } - return + return nc.writeChunk(payload) } -func (nc *NetConnection) encodeChunk8(head *ChunkHeader, payload []byte, size int) (need []byte, err error) { - if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { - return nil, errors.New("chunk error") - } +func (nc *NetConnection) encodeChunk8(head *ChunkHeader, payload []byte) (need []byte, err error) { b := utils.GetSlice(8) //chunkBasicHead b[0] = byte(RTMP_CHUNK_HEAD_8 + head.ChunkBasicHeader.ChunkStreamID) @@ -126,15 +112,7 @@ func (nc *NetConnection) encodeChunk8(head *ChunkHeader, payload []byte, size in nc.Write(b) utils.RecycleSlice(b) nc.writeSeqNum += 8 - if len(payload) > size { - nc.Write(payload[0:size]) - nc.writeSeqNum += uint32(size) - need = payload[size:] - } else { - nc.Write(payload) - nc.writeSeqNum += uint32(len(payload)) - } - return + return nc.writeChunk(payload) } func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size int) (need []byte, err error) { @@ -159,20 +137,20 @@ func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size in return } -func (nc *NetConnection) encodeChunk1(head *ChunkHeader, payload []byte, size int) (need []byte, err error) { - if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { - return nil, errors.New("chunk error") - } - chunkBasicHead := byte(RTMP_CHUNK_HEAD_1 + head.ChunkBasicHeader.ChunkStreamID) - nc.WriteByte(chunkBasicHead) +func (nc *NetConnection) encodeChunk1(head *ChunkHeader, payload []byte) (need []byte, err error) { + err = nc.WriteByte(byte(RTMP_CHUNK_HEAD_1 + head.ChunkBasicHeader.ChunkStreamID)) nc.writeSeqNum++ - if len(payload) > size { - nc.Write(payload[0:size]) - nc.writeSeqNum += uint32(size) - need = payload[size:] + return nc.writeChunk(payload) +} + +func (nc *NetConnection) writeChunk(payload []byte) (need []byte, err error) { + if payloadLen := len(payload); payloadLen > nc.writeChunkSize { + _, err = nc.Write(payload[:nc.writeChunkSize]) + nc.writeSeqNum += uint32(nc.writeChunkSize) + need = payload[nc.writeChunkSize:] } else { - nc.Write(payload) - nc.writeSeqNum += uint32(len(payload)) + _, err = nc.Write(payload) + nc.writeSeqNum += uint32(payloadLen) } return } diff --git a/go.mod b/go.mod index 2b3a0fb..92e9f59 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/Monibuca/plugin-rtmp/v3 go 1.13 require ( - github.com/Monibuca/engine/v3 v3.3.0 - github.com/Monibuca/utils/v3 v3.0.2 + github.com/Monibuca/engine/v3 v3.3.9 + github.com/Monibuca/utils/v3 v3.0.3 github.com/logrusorgru/aurora v2.0.3+incompatible ) diff --git a/go.sum b/go.sum index 7057ae2..bee94e6 100644 --- a/go.sum +++ b/go.sum @@ -1,28 +1,10 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/engine/v3 v3.0.0-alpha11 h1:Z/ViugwmK+ECBK5MS9bUltrUU1UlcDjFTuJh4nxouK0= -github.com/Monibuca/engine/v3 v3.0.0-alpha11/go.mod h1:eonu3UFn3W7NpHzSrACipxdAyOBCUwzlFUe1R7JjttE= -github.com/Monibuca/engine/v3 v3.0.0-beta3 h1:/co+L2qCRZUq55S0LtYpY9xzOJiUUG3VGytYGFf1RD4= -github.com/Monibuca/engine/v3 v3.0.0-beta3/go.mod h1:SMgnlwih4pBA/HkTLjKXZFYkv3ukRzFjv65CARRLVIk= -github.com/Monibuca/engine/v3 v3.0.0-beta5 h1:b27ZQDfvf5dBMZbCSIUXItUwVIFs95fpkAV4xjN7BNE= -github.com/Monibuca/engine/v3 v3.0.0-beta5/go.mod h1:SMgnlwih4pBA/HkTLjKXZFYkv3ukRzFjv65CARRLVIk= -github.com/Monibuca/engine/v3 v3.0.0-beta8 h1:bJ3VHKAd8eJO7qOrSQp8Byve2xfCN7/fBDhz4Nz+AM8= -github.com/Monibuca/engine/v3 v3.0.0-beta8/go.mod h1:ckcxVangFrP8uC7UlhuLJabN4A4NMLYCKEmYHud1Tbk= -github.com/Monibuca/engine/v3 v3.1.0 h1:/U1YV6dYu4nYdLUHJgDhXzbmp1/X0Vwi8xrx1WfyVNA= -github.com/Monibuca/engine/v3 v3.1.0/go.mod h1:yz6cssED2VlYu+g/LrxseBB9pcvsLM/o2QXa4gVY650= -github.com/Monibuca/engine/v3 v3.3.0 h1:7zwYsLEHdeVZy6+JjVlaDhl/asr0HG6jirBL4uynj0s= -github.com/Monibuca/engine/v3 v3.3.0/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g= -github.com/Monibuca/utils/v3 v3.0.0-alpha5 h1:IOyW/KJSRdRg+TPcgwkHLBynqfNQOV6p3iP7LgXEMFc= -github.com/Monibuca/utils/v3 v3.0.0-alpha5/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= -github.com/Monibuca/utils/v3 v3.0.0-beta h1:z4p/BSH5J9Ja/gwoDmj1RyN+b0q28Nmn/fqXiwq2hGY= -github.com/Monibuca/utils/v3 v3.0.0-beta/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI= -github.com/Monibuca/utils/v3 v3.0.0-beta1 h1:M+miUm9+ojr6AahACOvVaFs+jc5jHmcUi38Dpe1QGgQ= -github.com/Monibuca/utils/v3 v3.0.0-beta1/go.mod h1:mQYP/OMox1tkWP6Qut7pBfARr1TXSRkK662dexQl6kI= -github.com/Monibuca/utils/v3 v3.0.0 h1:i8qCXQPQpRPgjuXKu5C2PYiL5LYzB6GW4xE162mB2ug= -github.com/Monibuca/utils/v3 v3.0.0/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/Monibuca/engine/v3 v3.3.9 h1:s/jH+z845DhBsLlwt247xbQkhaba4oWGZLKjPUjEmvE= +github.com/Monibuca/engine/v3 v3.3.9/go.mod h1:odyqD/VTQDN4qgzajsgn7kW7MWDIzTHt+j+BcI8i+4g= github.com/Monibuca/utils/v3 v3.0.1/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= -github.com/Monibuca/utils/v3 v3.0.2 h1:n2vr67DHanav8wBC9IENk8xrKzeGJnBsxYUu69s8TrQ= -github.com/Monibuca/utils/v3 v3.0.2/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= +github.com/Monibuca/utils/v3 v3.0.3 h1:kSULFYDaMe7dXc1wO+5JHjP8HwkbA1lNUBvwl4yEV/M= +github.com/Monibuca/utils/v3 v3.0.3/go.mod h1:RpNS95gapWs6gimwh8Xn2x72FN5tO7Powabj7dTFyvE= github.com/cnotch/apirouter v0.0.0-20200731232942-89e243a791f3/go.mod h1:5deJPLON/x/s2dLOQfuKS0lenhOIT4xX0pvtN/OEIuY= github.com/cnotch/ipchub v1.1.0 h1:hH0lh2mU3AZXPiqMwA0pdtqrwo7PFIMRGush9OobMUs= github.com/cnotch/ipchub v1.1.0/go.mod h1:2PbeBs2q2VxxTVCn1eYCDwpAWuVXbq1+N0FU7GimOH4= @@ -31,6 +13,7 @@ github.com/cnotch/queue v0.0.0-20200326024423-6e88bdbf2ad4/go.mod h1:zOssjAlNusO github.com/cnotch/queue v0.0.0-20201224060551-4191569ce8f6/go.mod h1:zOssjAlNusOxvtaqT+EMA+Iyi8rrtKr4/XfzN1Fgoeg= github.com/cnotch/scheduler v0.0.0-20200522024700-1d2da93eefc5/go.mod h1:F4GE3SZkJZ8an1Y0ZCqvSM3jeozNuKzoC67erG1PhIo= github.com/cnotch/xlog v0.0.0-20201208005456-cfda439cd3a0/go.mod h1:RW9oHsR79ffl3sR3yMGgxYupMn2btzdtJUwoxFPUE5E= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emitter-io/address v1.0.0/go.mod h1:GfZb5+S/o8694B1GMGK2imUYQyn2skszMvGNA5D84Ug= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -55,19 +38,20 @@ github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= -github.com/pion/rtp v1.6.2 h1:iGBerLX6JiDjB9NXuaPzHyxHFG9JsIEdgwTC0lp5n/U= github.com/pion/rtp v1.6.2/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pion/rtp v1.6.5 h1:o2cZf8OascA5HF/b0PAbTxRKvOWxTQxWYt7SlToxFGI= github.com/pion/rtp v1.6.5/go.mod h1:bDb5n+BFZxXx0Ea7E5qe+klMuqiBrP+w8XSjiWtCUko= github.com/pixelbender/go-sdp v1.1.0/go.mod h1:6IBlz9+BrUHoFTea7gcp4S54khtOhjCW/nVDLhmZBAs= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/q191201771/naza v0.19.1 h1:4KLcxT2CHztO+7miPRtBG3FFgadSQYQw1gPPPKN7rnY= github.com/q191201771/naza v0.19.1/go.mod h1:5LeGupZZFtYP1g/S203n9vXoUNVdlRnPIfM6rExjqt0= github.com/sqs/goreturns v0.0.0-20181028201513-538ac6014518/go.mod h1:CKI4AZ4XmGV240rTHfO0hfE83S6/a3/Q1siZJ/vXf7A= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= @@ -84,10 +68,12 @@ golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepx golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/main.go b/main.go index 0f50bd0..f9c8d23 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,11 @@ package rtmp import ( + "log" + "github.com/Monibuca/engine/v3" . "github.com/Monibuca/utils/v3" . "github.com/logrusorgru/aurora" - "log" ) var config = struct { @@ -21,5 +22,5 @@ func init() { } func run() { Print(Green("server rtmp start at"), BrightBlue(config.ListenAddr)) - log.Fatal(ListenRtmp(config.ListenAddr)) + log.Fatal(ListenTCP(config.ListenAddr, processRtmp)) } diff --git a/msg.go b/msg.go index a2d3bfa..b12e3e8 100644 --- a/msg.go +++ b/msg.go @@ -2,6 +2,7 @@ package rtmp import ( "bytes" + "errors" "log" "sync" @@ -105,12 +106,18 @@ type HaveStreamID interface { GetStreamID() uint32 } -func GetRtmpMessage(chunk *Chunk) { +func GetRtmpMessage(chunk *Chunk) error { switch chunk.MessageTypeID { case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE: + if len(chunk.Body) < 4 { + return errors.New("chunk.Body < 4") + } chunk.MsgData = Uint32Message(utils.BigEndian.Uint32(chunk.Body)) case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件. { + if len(chunk.Body) < 4 { + return errors.New("chunk.Body < 4") + } base := UserControlMessage{ EventType: utils.BigEndian.Uint16(chunk.Body), EventData: chunk.Body[2:], @@ -154,6 +161,9 @@ func GetRtmpMessage(chunk *Chunk) { } } case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽. + if len(chunk.Body) < 4 { + return errors.New("chunk.Body < 4") + } m := &SetPeerBandwidthMessage{ AcknowledgementWindowsize: utils.BigEndian.Uint32(chunk.Body), } @@ -175,6 +185,7 @@ func GetRtmpMessage(chunk *Chunk) { case RTMP_MSG_AGGREGATE: default: } + return nil } // 03 00 00 00 00 01 02 14 00 00 00 00 02 00 07 63 6F 6E 6E 65 63 74 00 3F F0 00 00 00 00 00 00 08 diff --git a/netConnection.go b/netConnection.go index 57b0299..66020e5 100644 --- a/netConnection.go +++ b/netConnection.go @@ -111,6 +111,8 @@ func (conn *NetConnection) OnConnect() (err error) { return } } + } else if msg != nil { + utils.Printf("recv MessageTypeID:%d error:%v", msg.MessageTypeID, err) } return } @@ -385,9 +387,9 @@ func (conn *NetConnection) sendAVMessage(ts uint32, payload []byte, isAudio bool // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 当Chunk Type为0时(即Chunk12), if isFirst { - need, err = conn.encodeChunk12(head, payload, conn.writeChunkSize) + need, err = conn.encodeChunk12(head, payload) } else { - need, err = conn.encodeChunk8(head, payload, conn.writeChunkSize) + need, err = conn.encodeChunk8(head, payload) } if err != nil { return err @@ -397,8 +399,8 @@ func (conn *NetConnection) sendAVMessage(ts uint32, payload []byte, isAudio bool } // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) - for need != nil && len(need) > 0 { - if need, err = conn.encodeChunk1(head, need, conn.writeChunkSize); err != nil { + for len(need) > 0 { + if need, err = conn.encodeChunk1(head, need); err != nil { return err } if err = conn.Flush(); err != nil { @@ -471,9 +473,9 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { msg.MsgData = nil msg.Body = currentBody msg.ChunkHeader = chunkHead.Clone() - GetRtmpMessage(msg) + err = GetRtmpMessage(msg) delete(conn.incompleteRtmpBody, ChunkStreamID) - return msg, nil + return msg, err } return conn.readChunk() @@ -684,7 +686,7 @@ func (conn *NetConnection) writeMessage(t byte, msg RtmpMessage) error { conn.SendMessage(SEND_PING_REQUEST_MESSAGE, nil) } - need, err := conn.encodeChunk12(head, body, conn.writeChunkSize) + need, err := conn.encodeChunk12(head, body) if err != nil { return err } @@ -692,7 +694,7 @@ func (conn *NetConnection) writeMessage(t byte, msg RtmpMessage) error { return err } for need != nil && len(need) > 0 { - if need, err = conn.encodeChunk1(head, need, conn.writeChunkSize); err != nil { + if need, err = conn.encodeChunk1(head, need); err != nil { return err } if err = conn.Flush(); err != nil { diff --git a/netStream.go b/netStream.go index fe6980f..04779d0 100644 --- a/netStream.go +++ b/netStream.go @@ -7,46 +7,11 @@ import ( "net" "strings" "sync/atomic" - "time" "github.com/Monibuca/engine/v3" "github.com/Monibuca/utils/v3" ) -func ListenRtmp(addr string) error { - defer log.Println("rtmp server start!") - // defer fmt.Println("server start!") - listener, err := net.Listen("tcp", addr) - if err != nil { - return err - } - var tempDelay time.Duration - for { - conn, err := listener.Accept() - conn.(*net.TCPConn).SetNoDelay(false) - if err != nil { - if ne, ok := err.(net.Error); ok && ne.Temporary() { - if tempDelay == 0 { - tempDelay = 5 * time.Millisecond - } else { - tempDelay *= 2 - } - if max := 1 * time.Second; tempDelay > max { - tempDelay = max - } - fmt.Printf("rtmp: Accept error: %v; retrying in %v", err, tempDelay) - time.Sleep(tempDelay) - continue - } - return err - } - - tempDelay = 0 - go processRtmp(conn) - } - return nil -} - var gstreamid = uint32(64) func processRtmp(conn net.Conn) { @@ -140,8 +105,8 @@ func processRtmp(conn net.Conn) { streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] nc.writeChunkSize = config.ChunkSize subscriber := engine.Subscriber{ - Type: "RTMP", - ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID), + Type: "RTMP", + ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID), } if err = subscriber.Subscribe(streamPath); err == nil { streams[nc.streamID] = &subscriber @@ -164,9 +129,9 @@ func processRtmp(conn net.Conn) { return } err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.ExtraData.Payload}) - subscriber.OnVideo = func(ts uint32,pack *engine.VideoPack) { + subscriber.OnVideo = func(ts uint32, pack *engine.VideoPack) { err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Timestamp: 0, Payload: pack.Payload}) - subscriber.OnVideo = func(ts uint32,pack *engine.VideoPack) { + subscriber.OnVideo = func(ts uint32, pack *engine.VideoPack) { err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: getDeltaTime(ts), Payload: pack.Payload}) } } @@ -183,14 +148,14 @@ func processRtmp(conn net.Conn) { } return } - subscriber.OnAudio = func(ts uint32,pack *engine.AudioPack) { + subscriber.OnAudio = func(ts uint32, pack *engine.AudioPack) { if at.CodecID == 10 { err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.ExtraData}) } - subscriber.OnAudio = func(ts uint32,pack *engine.AudioPack) { + subscriber.OnAudio = func(ts uint32, pack *engine.AudioPack) { err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: getDeltaTime(ts), Payload: pack.Payload}) } - subscriber.OnAudio(ts,pack) + subscriber.OnAudio(ts, pack) } } go subscriber.Play(at, vt)