diff --git a/chunk.go b/chunk.go index 42d719c..7b67af7 100644 --- a/chunk.go +++ b/chunk.go @@ -3,8 +3,7 @@ package rtmp import ( "errors" - "github.com/Monibuca/engine/v2/pool" - "github.com/Monibuca/engine/v2/util" + "github.com/Monibuca/utils/v3" ) // RTMP协议中基本的数据单元称为消息(Message). @@ -86,21 +85,21 @@ func (nc *NetConnection) encodeChunk12(head *ChunkHeader, payload []byte, size i if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { return nil, errors.New("chunk error") } - b := pool.GetSlice(12) + b := utils.GetSlice(12) //chunkBasicHead b[0] = byte(RTMP_CHUNK_HEAD_12 + head.ChunkBasicHeader.ChunkStreamID) - util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) - util.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) + utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) + utils.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) b[7] = head.ChunkMessageHeader.MessageTypeID - util.LittleEndian.PutUint32(b[8:], uint32(head.ChunkMessageHeader.MessageStreamID)) + utils.LittleEndian.PutUint32(b[8:], uint32(head.ChunkMessageHeader.MessageStreamID)) nc.Write(b) - pool.RecycleSlice(b) + utils.RecycleSlice(b) nc.writeSeqNum += 12 if head.ChunkMessageHeader.Timestamp == 0xffffff { - b := pool.GetSlice(4) - util.LittleEndian.PutUint32(b, head.ChunkExtendedTimestamp.ExtendTimestamp) + b := utils.GetSlice(4) + utils.LittleEndian.PutUint32(b, head.ChunkExtendedTimestamp.ExtendTimestamp) nc.Write(b) - pool.RecycleSlice(b) + utils.RecycleSlice(b) nc.writeSeqNum += 4 } if len(payload) > size { @@ -118,14 +117,14 @@ func (nc *NetConnection) encodeChunk8(head *ChunkHeader, payload []byte, size in if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { return nil, errors.New("chunk error") } - b := pool.GetSlice(8) + b := utils.GetSlice(8) //chunkBasicHead b[0] = byte(RTMP_CHUNK_HEAD_8 + head.ChunkBasicHeader.ChunkStreamID) - util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) - util.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) + utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) + utils.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) b[7] = head.ChunkMessageHeader.MessageTypeID nc.Write(b) - pool.RecycleSlice(b) + utils.RecycleSlice(b) nc.writeSeqNum += 8 if len(payload) > size { nc.Write(payload[0:size]) @@ -142,12 +141,12 @@ func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size in if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { return nil, errors.New("chunk error") } - b := pool.GetSlice(4) + b := utils.GetSlice(4) //chunkBasicHead b[0] = byte(RTMP_CHUNK_HEAD_4 + head.ChunkBasicHeader.ChunkStreamID) - util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) + utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) nc.Write(b) - pool.RecycleSlice(b) + utils.RecycleSlice(b) nc.writeSeqNum += 4 if len(payload) > size { nc.Write(payload[0:size]) diff --git a/go.mod b/go.mod index 1060e92..7c69019 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,13 @@ -module github.com/Monibuca/plugin-rtmp +module github.com/Monibuca/plugin-rtmp/v3 go 1.13 require ( - github.com/Monibuca/engine/v2 v2.0.0 - github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 + github.com/Monibuca/engine/v3 v3.0.1 + github.com/Monibuca/utils/v3 v3.0.0-alpha2 + github.com/logrusorgru/aurora v2.0.3+incompatible ) + +replace github.com/Monibuca/engine/v3 => ../engine + +replace github.com/Monibuca/utils/v3 v3.0.0-alpha2 => ../utils diff --git a/go.sum b/go.sum index 8f7043f..32a32df 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,12 @@ github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= +github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8= +github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -31,6 +35,8 @@ github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/main.go b/main.go index 5f4adb7..83966e4 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,10 @@ package rtmp import ( - "log" - - . "github.com/Monibuca/engine/v2" + . "github.com/Monibuca/engine/v3" + . "github.com/Monibuca/utils/v3" . "github.com/logrusorgru/aurora" + "log" ) var config = struct { @@ -15,7 +15,6 @@ var config = struct { func init() { InstallPlugin(&PluginConfig{ Name: "RTMP", - Type: PLUGIN_SUBSCRIBER | PLUGIN_PUBLISHER, Config: &config, Run: run, }) diff --git a/msg.go b/msg.go index 3ed2991..663bf20 100644 --- a/msg.go +++ b/msg.go @@ -5,7 +5,7 @@ import ( "log" "sync" - "github.com/Monibuca/engine/v2/util" + "github.com/Monibuca/utils/v3" ) const ( @@ -121,11 +121,11 @@ type HaveStreamID interface { func GetRtmpMessage(chunk *Chunk) { switch chunk.MessageTypeID { case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE: - chunk.MsgData = Uint32Message(util.BigEndian.Uint32(chunk.Body)) + chunk.MsgData = Uint32Message(utils.BigEndian.Uint32(chunk.Body)) case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件. { base := UserControlMessage{ - EventType: util.BigEndian.Uint16(chunk.Body), + EventType: utils.BigEndian.Uint16(chunk.Body), EventData: chunk.Body[2:], } switch base.EventType { @@ -136,28 +136,28 @@ func GetRtmpMessage(chunk *Chunk) { } if len(base.EventData) >= 4 { //服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID. - m.StreamID = util.BigEndian.Uint32(base.EventData) + m.StreamID = utils.BigEndian.Uint32(base.EventData) } chunk.MsgData = m case RTMP_USER_STREAM_EOF, RTMP_USER_STREAM_DRY, RTMP_USER_STREAM_IS_RECORDED: // 服务端向客户端发送本事件通知客户端,数据回放完成.果没有发行额外的命令,就不再发送数据.客户端丢弃从流中接收的消息.4字节的事件数据表示,回放结束的流的ID. m := &StreamIDMessage{ UserControlMessage: base, - StreamID: util.BigEndian.Uint32(base.EventData), + StreamID: utils.BigEndian.Uint32(base.EventData), } chunk.MsgData = m case RTMP_USER_SET_BUFFLEN: // 客户端向服务端发送本事件,告知对方自己存储一个流的数据的缓存的长度(毫秒单位).当服务端开始处理一个流得时候发送本事件.事件数据的头四个字节表示流ID,后4个字节表示缓存长度(毫秒单位). m := &SetBufferMessage{ StreamIDMessage: StreamIDMessage{ UserControlMessage: base, - StreamID: util.BigEndian.Uint32(base.EventData), + StreamID: utils.BigEndian.Uint32(base.EventData), }, - Millisecond: util.BigEndian.Uint32(base.EventData[4:]), + Millisecond: utils.BigEndian.Uint32(base.EventData[4:]), } chunk.MsgData = m case RTMP_USER_PING_REQUEST: // 服务端通过本事件测试客户端是否可达.事件数据是4个字节的事件戳.代表服务调用本命令的本地时间.客户端在接收到kMsgPingRequest之后返回kMsgPingResponse事件 m := &PingRequestMessage{ UserControlMessage: base, - Timestamp: util.BigEndian.Uint32(base.EventData), + Timestamp: utils.BigEndian.Uint32(base.EventData), } chunk.MsgData = m case RTMP_USER_PING_RESPONSE, RTMP_USER_EMPTY: // 客户端向服务端发送本消息响应ping请求.事件数据是接kMsgPingRequest请求的时间. @@ -168,7 +168,7 @@ func GetRtmpMessage(chunk *Chunk) { } case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽. m := &SetPeerBandwidthMessage{ - AcknowledgementWindowsize: util.BigEndian.Uint32(chunk.Body), + AcknowledgementWindowsize: utils.BigEndian.Uint32(chunk.Body), } if len(chunk.Body) > 4 { m.LimitType = chunk.Body[4] @@ -351,7 +351,7 @@ type Uint32Message uint32 func (msg Uint32Message) Encode() (b []byte) { b = make([]byte, 4) - util.BigEndian.PutUint32(b, uint32(msg)) + utils.BigEndian.PutUint32(b, uint32(msg)) return b } @@ -378,7 +378,7 @@ type SetPeerBandwidthMessage struct { func (msg *SetPeerBandwidthMessage) Encode() (b []byte) { b = make([]byte, 5) - util.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize) + utils.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize) b[4] = msg.LimitType return } @@ -869,8 +869,8 @@ type StreamIDMessage struct { func (msg *StreamIDMessage) Encode() (b []byte) { b = make([]byte, 6) - util.BigEndian.PutUint16(b, msg.EventType) - util.BigEndian.PutUint32(b[2:], msg.StreamID) + utils.BigEndian.PutUint16(b, msg.EventType) + utils.BigEndian.PutUint32(b[2:], msg.StreamID) msg.EventData = b[2:] return } @@ -887,9 +887,9 @@ type SetBufferMessage struct { func (msg *SetBufferMessage) Encode() []byte { b := make([]byte, 10) - util.BigEndian.PutUint16(b, msg.EventType) - util.BigEndian.PutUint32(b[2:], msg.StreamID) - util.BigEndian.PutUint32(b[6:], msg.Millisecond) + utils.BigEndian.PutUint16(b, msg.EventType) + utils.BigEndian.PutUint32(b[2:], msg.StreamID) + utils.BigEndian.PutUint32(b[6:], msg.Millisecond) msg.EventData = b[2:] return b } @@ -905,15 +905,20 @@ type PingRequestMessage struct { func (msg *PingRequestMessage) Encode() (b []byte) { b = make([]byte, 6) - util.BigEndian.PutUint16(b, msg.EventType) - util.BigEndian.PutUint32(b[2:], msg.Timestamp) + utils.BigEndian.PutUint16(b, msg.EventType) + utils.BigEndian.PutUint32(b[2:], msg.Timestamp) msg.EventData = b[2:] return } func (msg *UserControlMessage) Encode() []byte { b := make([]byte, 2) - util.BigEndian.PutUint16(b, msg.EventType) + utils.BigEndian.PutUint16(b, msg.EventType) msg.EventData = b[2:] return b } + +type AVPack struct { + Timestamp uint32 + Payload []byte +} diff --git a/netConnection.go b/netConnection.go index 24ce158..0fd2376 100644 --- a/netConnection.go +++ b/netConnection.go @@ -3,13 +3,10 @@ package rtmp import ( "bufio" "errors" + "github.com/Monibuca/engine/v3" + "github.com/Monibuca/utils/v3" "io" "log" - - "github.com/Monibuca/engine/v2" - "github.com/Monibuca/engine/v2/avformat" - "github.com/Monibuca/engine/v2/pool" - "github.com/Monibuca/engine/v2/util" ) const ( @@ -86,7 +83,7 @@ type NetConnection struct { writeChunkSize int readChunkSize int incompleteRtmpBody map[uint32][]byte // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 - nextStreamID func(uint32) uint32 // 下一个流ID + nextStreamID func() uint32 // 下一个流ID streamID uint32 // 流ID rtmpHeader map[uint32]*ChunkHeader // RtmpHeader objectEncoding float64 @@ -331,33 +328,33 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { m.CommandName = "releaseStream" + data["level"].(string) return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_FULL_AUDIO_MESSAGE: - audio, ok := args.(*avformat.SendPacket) + audio, ok := args.(*AVPack) if !ok { errors.New(message + ", The parameter is AVPacket") } - return conn.sendAVMessage(audio, true, true) + return conn.sendAVMessage(audio.Timestamp, audio.Payload, true, true) case SEND_AUDIO_MESSAGE: - audio, ok := args.(*avformat.SendPacket) + audio, ok := args.(*AVPack) if !ok { errors.New(message + ", The parameter is AVPacket") } - return conn.sendAVMessage(audio, true, false) + return conn.sendAVMessage(audio.Timestamp, audio.Payload, true, false) case SEND_FULL_VDIEO_MESSAGE: - video, ok := args.(*avformat.SendPacket) + video, ok := args.(*AVPack) if !ok { errors.New(message + ", The parameter is AVPacket") } - return conn.sendAVMessage(video, false, true) + return conn.sendAVMessage(video.Timestamp, video.Payload, false, true) case SEND_VIDEO_MESSAGE: - video, ok := args.(*avformat.SendPacket) + video, ok := args.(*AVPack) if !ok { errors.New(message + ", The parameter is AVPacket") } - return conn.sendAVMessage(video, false, false) + return conn.sendAVMessage(video.Timestamp, video.Payload, false, false) } return errors.New("send message no exist") @@ -366,7 +363,7 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error { // 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 -func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, isFirst bool) error { +func (conn *NetConnection) sendAVMessage(ts uint32, payload []byte, isAudio bool, isFirst bool) error { if conn.writeSeqNum > conn.bandwidth { conn.totalWrite += conn.writeSeqNum conn.writeSeqNum = 0 @@ -378,19 +375,18 @@ func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, var need []byte var head *ChunkHeader if isAudio { - head = newRtmpHeader(RTMP_CSID_AUDIO, av.Timestamp, uint32(len(av.Payload)), RTMP_MSG_AUDIO, conn.streamID, 0) + head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(len(payload)), RTMP_MSG_AUDIO, conn.streamID, 0) } else { - head = newRtmpHeader(RTMP_CSID_VIDEO, av.Timestamp, uint32(len(av.Payload)), RTMP_MSG_VIDEO, conn.streamID, 0) + head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(len(payload)), RTMP_MSG_VIDEO, conn.streamID, 0) } // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 当Chunk Type为0时(即Chunk12), if isFirst { - need, err = conn.encodeChunk12(head, av.Payload, conn.writeChunkSize) + need, err = conn.encodeChunk12(head, payload, conn.writeChunkSize) } else { - need, err = conn.encodeChunk8(head, av.Payload, conn.writeChunkSize) - + need, err = conn.encodeChunk8(head, payload, conn.writeChunkSize) } if err != nil { return err @@ -518,20 +514,20 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head * case 0: { // Timestamp 3 bytes - b := pool.GetSlice(3) + b := utils.GetSlice(3) if _, err := io.ReadFull(conn, b); err != nil { return nil, err } conn.readSeqNum += 3 - h.Timestamp = util.BigEndian.Uint24(b) //type = 0的时间戳为绝对时间,其他的都为相对时间 + h.Timestamp = utils.BigEndian.Uint24(b) //type = 0的时间戳为绝对时间,其他的都为相对时间 // Message Length 3 bytes if _, err = io.ReadFull(conn, b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度. return nil, err } conn.readSeqNum += 3 - h.MessageLength = util.BigEndian.Uint24(b) - pool.RecycleSlice(b) + h.MessageLength = utils.BigEndian.Uint24(b) + utils.RecycleSlice(b) // Message Type ID 1 bytes v, err := conn.ReadByte() // 读取Message Type ID if err != nil { @@ -541,12 +537,12 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head * h.MessageTypeID = v // Message Stream ID 4bytes - bb := pool.GetSlice(4) + bb := utils.GetSlice(4) if _, err = io.ReadFull(conn, bb); err != nil { // 读取Message Stream ID return nil, err } conn.readSeqNum += 4 - h.MessageStreamID = util.LittleEndian.Uint32(bb) + h.MessageStreamID = utils.LittleEndian.Uint32(bb) // ExtendTimestamp 4 bytes if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 @@ -554,28 +550,28 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head * return nil, err } conn.readSeqNum += 4 - h.ExtendTimestamp = util.BigEndian.Uint32(bb) + h.ExtendTimestamp = utils.BigEndian.Uint32(bb) } - pool.RecycleSlice(bb) + utils.RecycleSlice(bb) } case 1: { // Timestamp 3 bytes - b := pool.GetSlice(3) + b := utils.GetSlice(3) if _, err = io.ReadFull(conn, b); err != nil { return nil, err } conn.readSeqNum += 3 h.ChunkType = chunkType - h.Timestamp = util.BigEndian.Uint24(b) + h.Timestamp = utils.BigEndian.Uint24(b) // Message Length 3 bytes if _, err = io.ReadFull(conn, b); err != nil { return nil, err } conn.readSeqNum += 3 - h.MessageLength = util.BigEndian.Uint24(b) - pool.RecycleSlice(b) + h.MessageLength = utils.BigEndian.Uint24(b) + utils.RecycleSlice(b) // Message Type ID 1 bytes v, err := conn.ReadByte() if err != nil { @@ -586,35 +582,35 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head * // ExtendTimestamp 4 bytes if h.Timestamp == 0xffffff { - bb := pool.GetSlice(4) + bb := utils.GetSlice(4) if _, err := io.ReadFull(conn, bb); err != nil { return nil, err } conn.readSeqNum += 4 - h.ExtendTimestamp = util.BigEndian.Uint32(bb) - pool.RecycleSlice(bb) + h.ExtendTimestamp = utils.BigEndian.Uint32(bb) + utils.RecycleSlice(bb) } } case 2: { // Timestamp 3 bytes - b := pool.GetSlice(3) + b := utils.GetSlice(3) if _, err = io.ReadFull(conn, b); err != nil { return nil, err } conn.readSeqNum += 3 h.ChunkType = chunkType - h.Timestamp = util.BigEndian.Uint24(b) - pool.RecycleSlice(b) + h.Timestamp = utils.BigEndian.Uint24(b) + utils.RecycleSlice(b) // ExtendTimestamp 4 bytes if h.Timestamp == 0xffffff { - bb := pool.GetSlice(4) + bb := utils.GetSlice(4) if _, err := io.ReadFull(conn, bb); err != nil { return nil, err } conn.readSeqNum += 4 - h.ExtendTimestamp = util.BigEndian.Uint32(bb) - pool.RecycleSlice(bb) + h.ExtendTimestamp = utils.BigEndian.Uint32(bb) + utils.RecycleSlice(bb) } } case 3: diff --git a/netStream.go b/netStream.go index 055ef62..2d0231f 100644 --- a/netStream.go +++ b/netStream.go @@ -3,19 +3,17 @@ package rtmp import ( "bufio" "fmt" + "github.com/Monibuca/engine/v3" + "github.com/Monibuca/utils/v3" + "github.com/Monibuca/utils/v3/codec" "log" "net" "strings" + "sync" + "sync/atomic" "time" - - . "github.com/Monibuca/engine/v2" - "github.com/Monibuca/engine/v2/avformat" ) -type RTMP struct { - Publisher -} - func ListenRtmp(addr string) error { defer log.Println("rtmp server start!") // defer fmt.Println("server start!") @@ -53,37 +51,104 @@ func ListenRtmp(addr string) error { var gstreamid = uint32(64) func processRtmp(conn net.Conn) { - var stream *Stream - streams := make(map[uint32]*Subscriber) + var stream *engine.Stream + streams := make(map[uint32]*engine.Subscriber) defer func() { conn.Close() if stream != nil { - stream.Cancel() + stream.Close() } for _, s := range streams { s.Close() } }() - var totalDuration uint32 - nc := &NetConnection{ + nc := NetConnection{ ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, rtmpHeader: make(map[uint32]*ChunkHeader), incompleteRtmpBody: make(map[uint32][]byte), bandwidth: RTMP_MAX_CHUNK_SIZE << 3, - nextStreamID: func(u uint32) uint32 { - gstreamid++ - return gstreamid + nextStreamID: func() uint32 { + return atomic.AddUint32(&gstreamid, 1) }, } /* Handshake */ - if MayBeError(Handshake(nc.ReadWriter)) { + if utils.MayBeError(Handshake(nc.ReadWriter)) { return } - if MayBeError(nc.OnConnect()) { + if utils.MayBeError(nc.OnConnect()) { return } + var rec_audio, rec_video func(*Chunk) + rec_audio = func(msg *Chunk) { + var ts_audio uint32 + va := stream.AudioTracks[0] + tmp := msg.Body[0] + if va.SoundFormat = tmp >> 4; va.SoundFormat == 10 { + if msg.Body[1] != 0 { + return + } + va.ASC = msg.Body[2:] + config1, config2 := msg.Body[2], msg.Body[3] + //audioObjectType = (config1 & 0xF8) >> 3 + // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 + // 2 AAC LC ISO/IEC 14496-3 subpart 4 + // 3 AAC SSR ISO/IEC 14496-3 subpart 4 + // 4 AAC LTP ISO/IEC 14496-3 subpart 4 + va.SoundRate = codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)] + va.SoundType = (config2 >> 3) & 0x0F //声道 + //frameLengthFlag = (config2 >> 2) & 0x01 + //dependsOnCoreCoder = (config2 >> 1) & 0x01 + //extensionFlag = config2 & 0x01 + } else { + va.SoundRate = codec.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz + va.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples + va.SoundType = tmp & 0x01 // 0 单声道,1立体声 + } + va.RtmpTag = msg.Body + rec_audio = func(msg *Chunk) { + if msg.Timestamp == 0xffffff { + ts_audio += msg.ExtendTimestamp + } else { + ts_audio += msg.Timestamp // 绝对时间戳 + } + stream.PushAudio(ts_audio, msg.Body[2:]) + } + } + rec_video = func(msg *Chunk) { + // 等待AVC序列帧 + if msg.Body[1] != 0 { + return + } + vt := stream.VideoTracks[0] + var ts_video uint32 + var info codec.AVCDecoderConfigurationRecord + //0:codec,1:IsAVCSequence,2~4:compositionTime + if _, err := info.Unmarshal(msg.Body[5:]); err == nil { + vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit) + vt.SPS = info.SequenceParameterSetNALUnit + vt.PPS = info.PictureParameterSetNALUnit + } + vt.RtmpTag = msg.Body + nalulenSize := int(info.LengthSizeMinusOne&3 + 1) + rec_video = func(msg *Chunk) { + nalus := msg.Body[5:] + if msg.Timestamp == 0xffffff { + ts_video += msg.ExtendTimestamp + } else { + ts_video += msg.Timestamp // 绝对时间戳 + } + for len(nalus) > nalulenSize { + nalulen := 0 + for i := 0; i < nalulenSize; i++ { + nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1)) + } + vt.Push(ts_video, nalus[nalulenSize:nalulen+nalulenSize]) + nalus = nalus[nalulen+nalulenSize:] + } + } + } for { if msg, err := nc.RecvMessage(); err == nil { if msg.MessageLength <= 0 { @@ -97,16 +162,16 @@ func processRtmp(conn net.Conn) { cmd := msg.MsgData.(Commander).GetCommand() switch cmd.CommandName { case "createStream": - nc.streamID = nc.nextStreamID(msg.ChunkStreamID) + nc.streamID = nc.nextStreamID() log.Println("createStream:", nc.streamID) err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId) - if MayBeError(err) { + if utils.MayBeError(err) { return } case "publish": pm := msg.MsgData.(*PublishMessage) streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0] - if pub := new(RTMP); pub.Publish(streamPath) { + if pub := new(engine.Publisher); pub.Publish(streamPath) { pub.Type = "RTMP" stream = pub.Stream err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) @@ -118,46 +183,7 @@ func processRtmp(conn net.Conn) { pm := msg.MsgData.(*PlayMessage) streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] nc.writeChunkSize = config.ChunkSize - var lastAudioTime uint32 = 0 - var lastVideoTime uint32 = 0 - // followAVCSequence := false - stream := &Subscriber{OnData: func(packet *avformat.SendPacket) (err error) { - switch true { - // case packet.IsADTS: - // tagPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO) - // tagPacket.Payload = avformat.ADTSToAudioSpecificConfig(packet.Payload) - // err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, tagPacket) - // ADTSLength := 7 + (int(packet.Payload[1]&1) << 1) - // if len(packet.Payload) > ADTSLength { - // contentPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO) - // contentPacket.Timestamp = packet.Timestamp - // contentPacket.Payload = make([]byte, len(packet.Payload)-ADTSLength+2) - // contentPacket.Payload[0] = 0xAF - // contentPacket.Payload[1] = 0x01 //raw AAC - // copy(contentPacket.Payload[2:], packet.Payload[ADTSLength:]) - // err = nc.SendMessage(SEND_AUDIO_MESSAGE, contentPacket) - // } - case packet.Type == RTMP_MSG_VIDEO: - if packet.IsSequence { - err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, packet) - } else { - t := packet.Timestamp - lastVideoTime - lastVideoTime = packet.Timestamp - packet.Timestamp = t - err = nc.SendMessage(SEND_VIDEO_MESSAGE, packet) - } - case packet.Type == RTMP_MSG_AUDIO: - if packet.IsSequence { - err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, packet) - } else { - t := packet.Timestamp - lastAudioTime - lastAudioTime = packet.Timestamp - packet.Timestamp = t - err = nc.SendMessage(SEND_AUDIO_MESSAGE, packet) - } - } - return - }} + stream := engine.Subscriber{} stream.Type = "RTMP" stream.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID) err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize)) @@ -166,22 +192,71 @@ func processRtmp(conn net.Conn) { err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) if err == nil { - streams[nc.streamID] = stream - go stream.Subscribe(streamPath) + streams[nc.streamID] = &stream + if err = stream.Subscribe(streamPath); err == nil { + vt, at := stream.VideoTracks[0], stream.AudioTracks[0] + err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag}) + if at.SoundFormat == 10 { + err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag}) + } + var lastAudioTime, lastVideoTime uint32 + var lock sync.Mutex + go vt.Play(stream.Context, func(pack engine.VideoPack) { + if lastVideoTime == 0 { + lastVideoTime = pack.Timestamp + } + t := pack.Timestamp - lastVideoTime + lastVideoTime = pack.Timestamp + payload := utils.GetSlice(9 + len(pack.Payload)) + defer utils.RecycleSlice(payload) + if pack.NalType == codec.NALU_IDR_Picture { + payload[0] = 0x17 + } else { + payload[0] = 0x27 + } + payload[1] = 0x01 + utils.BigEndian.PutUint32(payload[5:], uint32(len(pack.Payload))) + copy(payload[9:], pack.Payload) + lock.Lock() + defer lock.Unlock() + err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) + }) + go at.Play(stream.Context, func(pack engine.AudioPack) { + if lastAudioTime == 0 { + lastAudioTime = pack.Timestamp + } + t := pack.Timestamp - lastAudioTime + lastAudioTime = pack.Timestamp + l := len(pack.Payload) + 1 + if at.SoundFormat == 10 { + l++ + } + payload := utils.GetSlice(l) + defer utils.RecycleSlice(payload) + payload[0] = at.RtmpTag[0] + if at.SoundFormat == 10 { + payload[1] = 1 + } + copy(payload[2:], pack.Payload) + lock.Lock() + defer lock.Unlock() + err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) + }) + } } else { return } case "closeStream": cm := msg.MsgData.(*CURDStreamMessage) if stream, ok := streams[cm.StreamId]; ok { - stream.Cancel() + stream.Close() delete(streams, cm.StreamId) } case "releaseStream": cm := msg.MsgData.(*ReleaseStreamMessage) streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0] amfobj := newAMFObjects() - if s := FindStream(streamPath); s != nil { + if s := engine.FindStream(streamPath); s != nil { amfobj["level"] = "_result" s.Close() } else { @@ -191,21 +266,9 @@ func processRtmp(conn net.Conn) { err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj) } case RTMP_MSG_AUDIO: - // pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO) - if msg.Timestamp == 0xffffff { - totalDuration += msg.ExtendTimestamp - } else { - totalDuration += msg.Timestamp // 绝对时间戳 - } - stream.PushAudio(totalDuration, msg.Body) + rec_audio(msg) case RTMP_MSG_VIDEO: - // pkt := avformat.NewAVPacket(RTMP_MSG_VIDEO) - if msg.Timestamp == 0xffffff { - totalDuration += msg.ExtendTimestamp - } else { - totalDuration += msg.Timestamp // 绝对时间戳 - } - stream.PushVideo(totalDuration, msg.Body) + rec_video(msg) } msg.Recycle() } else {