rtmp迁移新架构

This commit is contained in:
langhuihui
2021-01-25 20:33:41 +08:00
parent 64a2b5884c
commit 6084f8b96d
7 changed files with 235 additions and 162 deletions

View File

@@ -3,8 +3,7 @@ package rtmp
import ( import (
"errors" "errors"
"github.com/Monibuca/engine/v2/pool" "github.com/Monibuca/utils/v3"
"github.com/Monibuca/engine/v2/util"
) )
// RTMP协议中基本的数据单元称为消息(Message). // RTMP协议中基本的数据单元称为消息(Message).
@@ -86,21 +85,21 @@ func (nc *NetConnection) encodeChunk12(head *ChunkHeader, payload []byte, size i
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
return nil, errors.New("chunk error") return nil, errors.New("chunk error")
} }
b := pool.GetSlice(12) b := utils.GetSlice(12)
//chunkBasicHead //chunkBasicHead
b[0] = byte(RTMP_CHUNK_HEAD_12 + head.ChunkBasicHeader.ChunkStreamID) b[0] = byte(RTMP_CHUNK_HEAD_12 + head.ChunkBasicHeader.ChunkStreamID)
util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp)
util.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) utils.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength)
b[7] = head.ChunkMessageHeader.MessageTypeID b[7] = head.ChunkMessageHeader.MessageTypeID
util.LittleEndian.PutUint32(b[8:], uint32(head.ChunkMessageHeader.MessageStreamID)) utils.LittleEndian.PutUint32(b[8:], uint32(head.ChunkMessageHeader.MessageStreamID))
nc.Write(b) nc.Write(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
nc.writeSeqNum += 12 nc.writeSeqNum += 12
if head.ChunkMessageHeader.Timestamp == 0xffffff { if head.ChunkMessageHeader.Timestamp == 0xffffff {
b := pool.GetSlice(4) b := utils.GetSlice(4)
util.LittleEndian.PutUint32(b, head.ChunkExtendedTimestamp.ExtendTimestamp) utils.LittleEndian.PutUint32(b, head.ChunkExtendedTimestamp.ExtendTimestamp)
nc.Write(b) nc.Write(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
nc.writeSeqNum += 4 nc.writeSeqNum += 4
} }
if len(payload) > size { if len(payload) > size {
@@ -118,14 +117,14 @@ func (nc *NetConnection) encodeChunk8(head *ChunkHeader, payload []byte, size in
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
return nil, errors.New("chunk error") return nil, errors.New("chunk error")
} }
b := pool.GetSlice(8) b := utils.GetSlice(8)
//chunkBasicHead //chunkBasicHead
b[0] = byte(RTMP_CHUNK_HEAD_8 + head.ChunkBasicHeader.ChunkStreamID) b[0] = byte(RTMP_CHUNK_HEAD_8 + head.ChunkBasicHeader.ChunkStreamID)
util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp)
util.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength) utils.BigEndian.PutUint24(b[4:], head.ChunkMessageHeader.MessageLength)
b[7] = head.ChunkMessageHeader.MessageTypeID b[7] = head.ChunkMessageHeader.MessageTypeID
nc.Write(b) nc.Write(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
nc.writeSeqNum += 8 nc.writeSeqNum += 8
if len(payload) > size { if len(payload) > size {
nc.Write(payload[0:size]) nc.Write(payload[0:size])
@@ -142,12 +141,12 @@ func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size in
if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
return nil, errors.New("chunk error") return nil, errors.New("chunk error")
} }
b := pool.GetSlice(4) b := utils.GetSlice(4)
//chunkBasicHead //chunkBasicHead
b[0] = byte(RTMP_CHUNK_HEAD_4 + head.ChunkBasicHeader.ChunkStreamID) b[0] = byte(RTMP_CHUNK_HEAD_4 + head.ChunkBasicHeader.ChunkStreamID)
util.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp) utils.BigEndian.PutUint24(b[1:], head.ChunkMessageHeader.Timestamp)
nc.Write(b) nc.Write(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
nc.writeSeqNum += 4 nc.writeSeqNum += 4
if len(payload) > size { if len(payload) > size {
nc.Write(payload[0:size]) nc.Write(payload[0:size])

11
go.mod
View File

@@ -1,8 +1,13 @@
module github.com/Monibuca/plugin-rtmp module github.com/Monibuca/plugin-rtmp/v3
go 1.13 go 1.13
require ( require (
github.com/Monibuca/engine/v2 v2.0.0 github.com/Monibuca/engine/v3 v3.0.1
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 github.com/Monibuca/utils/v3 v3.0.0-alpha2
github.com/logrusorgru/aurora v2.0.3+incompatible
) )
replace github.com/Monibuca/engine/v3 => ../engine
replace github.com/Monibuca/utils/v3 v3.0.0-alpha2 => ../utils

6
go.sum
View File

@@ -18,8 +18,12 @@ github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI=
github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs=
github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
@@ -31,6 +35,8 @@ github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

View File

@@ -1,10 +1,10 @@
package rtmp package rtmp
import ( import (
"log" . "github.com/Monibuca/engine/v3"
. "github.com/Monibuca/utils/v3"
. "github.com/Monibuca/engine/v2"
. "github.com/logrusorgru/aurora" . "github.com/logrusorgru/aurora"
"log"
) )
var config = struct { var config = struct {
@@ -15,7 +15,6 @@ var config = struct {
func init() { func init() {
InstallPlugin(&PluginConfig{ InstallPlugin(&PluginConfig{
Name: "RTMP", Name: "RTMP",
Type: PLUGIN_SUBSCRIBER | PLUGIN_PUBLISHER,
Config: &config, Config: &config,
Run: run, Run: run,
}) })

43
msg.go
View File

@@ -5,7 +5,7 @@ import (
"log" "log"
"sync" "sync"
"github.com/Monibuca/engine/v2/util" "github.com/Monibuca/utils/v3"
) )
const ( const (
@@ -121,11 +121,11 @@ type HaveStreamID interface {
func GetRtmpMessage(chunk *Chunk) { func GetRtmpMessage(chunk *Chunk) {
switch chunk.MessageTypeID { switch chunk.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE: case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE:
chunk.MsgData = Uint32Message(util.BigEndian.Uint32(chunk.Body)) chunk.MsgData = Uint32Message(utils.BigEndian.Uint32(chunk.Body))
case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件. case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件.
{ {
base := UserControlMessage{ base := UserControlMessage{
EventType: util.BigEndian.Uint16(chunk.Body), EventType: utils.BigEndian.Uint16(chunk.Body),
EventData: chunk.Body[2:], EventData: chunk.Body[2:],
} }
switch base.EventType { switch base.EventType {
@@ -136,28 +136,28 @@ func GetRtmpMessage(chunk *Chunk) {
} }
if len(base.EventData) >= 4 { if len(base.EventData) >= 4 {
//服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID. //服务端在成功地从客户端接收连接命令之后发送本事件,事件ID为0.事件数据是表示开始起作用的流的ID.
m.StreamID = util.BigEndian.Uint32(base.EventData) m.StreamID = utils.BigEndian.Uint32(base.EventData)
} }
chunk.MsgData = m chunk.MsgData = m
case RTMP_USER_STREAM_EOF, RTMP_USER_STREAM_DRY, RTMP_USER_STREAM_IS_RECORDED: // 服务端向客户端发送本事件通知客户端,数据回放完成.果没有发行额外的命令,就不再发送数据.客户端丢弃从流中接收的消息.4字节的事件数据表示,回放结束的流的ID. case RTMP_USER_STREAM_EOF, RTMP_USER_STREAM_DRY, RTMP_USER_STREAM_IS_RECORDED: // 服务端向客户端发送本事件通知客户端,数据回放完成.果没有发行额外的命令,就不再发送数据.客户端丢弃从流中接收的消息.4字节的事件数据表示,回放结束的流的ID.
m := &StreamIDMessage{ m := &StreamIDMessage{
UserControlMessage: base, UserControlMessage: base,
StreamID: util.BigEndian.Uint32(base.EventData), StreamID: utils.BigEndian.Uint32(base.EventData),
} }
chunk.MsgData = m chunk.MsgData = m
case RTMP_USER_SET_BUFFLEN: // 客户端向服务端发送本事件,告知对方自己存储一个流的数据的缓存的长度(毫秒单位).当服务端开始处理一个流得时候发送本事件.事件数据的头四个字节表示流ID,后4个字节表示缓存长度(毫秒单位). case RTMP_USER_SET_BUFFLEN: // 客户端向服务端发送本事件,告知对方自己存储一个流的数据的缓存的长度(毫秒单位).当服务端开始处理一个流得时候发送本事件.事件数据的头四个字节表示流ID,后4个字节表示缓存长度(毫秒单位).
m := &SetBufferMessage{ m := &SetBufferMessage{
StreamIDMessage: StreamIDMessage{ StreamIDMessage: StreamIDMessage{
UserControlMessage: base, UserControlMessage: base,
StreamID: util.BigEndian.Uint32(base.EventData), StreamID: utils.BigEndian.Uint32(base.EventData),
}, },
Millisecond: util.BigEndian.Uint32(base.EventData[4:]), Millisecond: utils.BigEndian.Uint32(base.EventData[4:]),
} }
chunk.MsgData = m chunk.MsgData = m
case RTMP_USER_PING_REQUEST: // 服务端通过本事件测试客户端是否可达.事件数据是4个字节的事件戳.代表服务调用本命令的本地时间.客户端在接收到kMsgPingRequest之后返回kMsgPingResponse事件 case RTMP_USER_PING_REQUEST: // 服务端通过本事件测试客户端是否可达.事件数据是4个字节的事件戳.代表服务调用本命令的本地时间.客户端在接收到kMsgPingRequest之后返回kMsgPingResponse事件
m := &PingRequestMessage{ m := &PingRequestMessage{
UserControlMessage: base, UserControlMessage: base,
Timestamp: util.BigEndian.Uint32(base.EventData), Timestamp: utils.BigEndian.Uint32(base.EventData),
} }
chunk.MsgData = m chunk.MsgData = m
case RTMP_USER_PING_RESPONSE, RTMP_USER_EMPTY: // 客户端向服务端发送本消息响应ping请求.事件数据是接kMsgPingRequest请求的时间. case RTMP_USER_PING_RESPONSE, RTMP_USER_EMPTY: // 客户端向服务端发送本消息响应ping请求.事件数据是接kMsgPingRequest请求的时间.
@@ -168,7 +168,7 @@ func GetRtmpMessage(chunk *Chunk) {
} }
case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽. case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽.
m := &SetPeerBandwidthMessage{ m := &SetPeerBandwidthMessage{
AcknowledgementWindowsize: util.BigEndian.Uint32(chunk.Body), AcknowledgementWindowsize: utils.BigEndian.Uint32(chunk.Body),
} }
if len(chunk.Body) > 4 { if len(chunk.Body) > 4 {
m.LimitType = chunk.Body[4] m.LimitType = chunk.Body[4]
@@ -351,7 +351,7 @@ type Uint32Message uint32
func (msg Uint32Message) Encode() (b []byte) { func (msg Uint32Message) Encode() (b []byte) {
b = make([]byte, 4) b = make([]byte, 4)
util.BigEndian.PutUint32(b, uint32(msg)) utils.BigEndian.PutUint32(b, uint32(msg))
return b return b
} }
@@ -378,7 +378,7 @@ type SetPeerBandwidthMessage struct {
func (msg *SetPeerBandwidthMessage) Encode() (b []byte) { func (msg *SetPeerBandwidthMessage) Encode() (b []byte) {
b = make([]byte, 5) b = make([]byte, 5)
util.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize) utils.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize)
b[4] = msg.LimitType b[4] = msg.LimitType
return return
} }
@@ -869,8 +869,8 @@ type StreamIDMessage struct {
func (msg *StreamIDMessage) Encode() (b []byte) { func (msg *StreamIDMessage) Encode() (b []byte) {
b = make([]byte, 6) b = make([]byte, 6)
util.BigEndian.PutUint16(b, msg.EventType) utils.BigEndian.PutUint16(b, msg.EventType)
util.BigEndian.PutUint32(b[2:], msg.StreamID) utils.BigEndian.PutUint32(b[2:], msg.StreamID)
msg.EventData = b[2:] msg.EventData = b[2:]
return return
} }
@@ -887,9 +887,9 @@ type SetBufferMessage struct {
func (msg *SetBufferMessage) Encode() []byte { func (msg *SetBufferMessage) Encode() []byte {
b := make([]byte, 10) b := make([]byte, 10)
util.BigEndian.PutUint16(b, msg.EventType) utils.BigEndian.PutUint16(b, msg.EventType)
util.BigEndian.PutUint32(b[2:], msg.StreamID) utils.BigEndian.PutUint32(b[2:], msg.StreamID)
util.BigEndian.PutUint32(b[6:], msg.Millisecond) utils.BigEndian.PutUint32(b[6:], msg.Millisecond)
msg.EventData = b[2:] msg.EventData = b[2:]
return b return b
} }
@@ -905,15 +905,20 @@ type PingRequestMessage struct {
func (msg *PingRequestMessage) Encode() (b []byte) { func (msg *PingRequestMessage) Encode() (b []byte) {
b = make([]byte, 6) b = make([]byte, 6)
util.BigEndian.PutUint16(b, msg.EventType) utils.BigEndian.PutUint16(b, msg.EventType)
util.BigEndian.PutUint32(b[2:], msg.Timestamp) utils.BigEndian.PutUint32(b[2:], msg.Timestamp)
msg.EventData = b[2:] msg.EventData = b[2:]
return return
} }
func (msg *UserControlMessage) Encode() []byte { func (msg *UserControlMessage) Encode() []byte {
b := make([]byte, 2) b := make([]byte, 2)
util.BigEndian.PutUint16(b, msg.EventType) utils.BigEndian.PutUint16(b, msg.EventType)
msg.EventData = b[2:] msg.EventData = b[2:]
return b return b
} }
type AVPack struct {
Timestamp uint32
Payload []byte
}

View File

@@ -3,13 +3,10 @@ package rtmp
import ( import (
"bufio" "bufio"
"errors" "errors"
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3"
"io" "io"
"log" "log"
"github.com/Monibuca/engine/v2"
"github.com/Monibuca/engine/v2/avformat"
"github.com/Monibuca/engine/v2/pool"
"github.com/Monibuca/engine/v2/util"
) )
const ( const (
@@ -86,7 +83,7 @@ type NetConnection struct {
writeChunkSize int writeChunkSize int
readChunkSize int readChunkSize int
incompleteRtmpBody map[uint32][]byte // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 incompleteRtmpBody map[uint32][]byte // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来
nextStreamID func(uint32) uint32 // 下一个流ID nextStreamID func() uint32 // 下一个流ID
streamID uint32 // 流ID streamID uint32 // 流ID
rtmpHeader map[uint32]*ChunkHeader // RtmpHeader rtmpHeader map[uint32]*ChunkHeader // RtmpHeader
objectEncoding float64 objectEncoding float64
@@ -331,33 +328,33 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
m.CommandName = "releaseStream" + data["level"].(string) m.CommandName = "releaseStream" + data["level"].(string)
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_FULL_AUDIO_MESSAGE: case SEND_FULL_AUDIO_MESSAGE:
audio, ok := args.(*avformat.SendPacket) audio, ok := args.(*AVPack)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(audio, true, true) return conn.sendAVMessage(audio.Timestamp, audio.Payload, true, true)
case SEND_AUDIO_MESSAGE: case SEND_AUDIO_MESSAGE:
audio, ok := args.(*avformat.SendPacket) audio, ok := args.(*AVPack)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(audio, true, false) return conn.sendAVMessage(audio.Timestamp, audio.Payload, true, false)
case SEND_FULL_VDIEO_MESSAGE: case SEND_FULL_VDIEO_MESSAGE:
video, ok := args.(*avformat.SendPacket) video, ok := args.(*AVPack)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(video, false, true) return conn.sendAVMessage(video.Timestamp, video.Payload, false, true)
case SEND_VIDEO_MESSAGE: case SEND_VIDEO_MESSAGE:
video, ok := args.(*avformat.SendPacket) video, ok := args.(*AVPack)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(video, false, false) return conn.sendAVMessage(video.Timestamp, video.Payload, false, false)
} }
return errors.New("send message no exist") return errors.New("send message no exist")
@@ -366,7 +363,7 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 // 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, isFirst bool) error { func (conn *NetConnection) sendAVMessage(ts uint32, payload []byte, isAudio bool, isFirst bool) error {
if conn.writeSeqNum > conn.bandwidth { if conn.writeSeqNum > conn.bandwidth {
conn.totalWrite += conn.writeSeqNum conn.totalWrite += conn.writeSeqNum
conn.writeSeqNum = 0 conn.writeSeqNum = 0
@@ -378,19 +375,18 @@ func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool,
var need []byte var need []byte
var head *ChunkHeader var head *ChunkHeader
if isAudio { if isAudio {
head = newRtmpHeader(RTMP_CSID_AUDIO, av.Timestamp, uint32(len(av.Payload)), RTMP_MSG_AUDIO, conn.streamID, 0) head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(len(payload)), RTMP_MSG_AUDIO, conn.streamID, 0)
} else { } else {
head = newRtmpHeader(RTMP_CSID_VIDEO, av.Timestamp, uint32(len(av.Payload)), RTMP_MSG_VIDEO, conn.streamID, 0) head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(len(payload)), RTMP_MSG_VIDEO, conn.streamID, 0)
} }
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12), // 当Chunk Type为0时(即Chunk12),
if isFirst { if isFirst {
need, err = conn.encodeChunk12(head, av.Payload, conn.writeChunkSize) need, err = conn.encodeChunk12(head, payload, conn.writeChunkSize)
} else { } else {
need, err = conn.encodeChunk8(head, av.Payload, conn.writeChunkSize) need, err = conn.encodeChunk8(head, payload, conn.writeChunkSize)
} }
if err != nil { if err != nil {
return err return err
@@ -518,20 +514,20 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head *
case 0: case 0:
{ {
// Timestamp 3 bytes // Timestamp 3 bytes
b := pool.GetSlice(3) b := utils.GetSlice(3)
if _, err := io.ReadFull(conn, b); err != nil { if _, err := io.ReadFull(conn, b); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 3 conn.readSeqNum += 3
h.Timestamp = util.BigEndian.Uint24(b) //type = 0的时间戳为绝对时间,其他的都为相对时间 h.Timestamp = utils.BigEndian.Uint24(b) //type = 0的时间戳为绝对时间,其他的都为相对时间
// Message Length 3 bytes // Message Length 3 bytes
if _, err = io.ReadFull(conn, b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度. if _, err = io.ReadFull(conn, b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度.
return nil, err return nil, err
} }
conn.readSeqNum += 3 conn.readSeqNum += 3
h.MessageLength = util.BigEndian.Uint24(b) h.MessageLength = utils.BigEndian.Uint24(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
// Message Type ID 1 bytes // Message Type ID 1 bytes
v, err := conn.ReadByte() // 读取Message Type ID v, err := conn.ReadByte() // 读取Message Type ID
if err != nil { if err != nil {
@@ -541,12 +537,12 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head *
h.MessageTypeID = v h.MessageTypeID = v
// Message Stream ID 4bytes // Message Stream ID 4bytes
bb := pool.GetSlice(4) bb := utils.GetSlice(4)
if _, err = io.ReadFull(conn, bb); err != nil { // 读取Message Stream ID if _, err = io.ReadFull(conn, bb); err != nil { // 读取Message Stream ID
return nil, err return nil, err
} }
conn.readSeqNum += 4 conn.readSeqNum += 4
h.MessageStreamID = util.LittleEndian.Uint32(bb) h.MessageStreamID = utils.LittleEndian.Uint32(bb)
// ExtendTimestamp 4 bytes // ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
@@ -554,28 +550,28 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head *
return nil, err return nil, err
} }
conn.readSeqNum += 4 conn.readSeqNum += 4
h.ExtendTimestamp = util.BigEndian.Uint32(bb) h.ExtendTimestamp = utils.BigEndian.Uint32(bb)
} }
pool.RecycleSlice(bb) utils.RecycleSlice(bb)
} }
case 1: case 1:
{ {
// Timestamp 3 bytes // Timestamp 3 bytes
b := pool.GetSlice(3) b := utils.GetSlice(3)
if _, err = io.ReadFull(conn, b); err != nil { if _, err = io.ReadFull(conn, b); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 3 conn.readSeqNum += 3
h.ChunkType = chunkType h.ChunkType = chunkType
h.Timestamp = util.BigEndian.Uint24(b) h.Timestamp = utils.BigEndian.Uint24(b)
// Message Length 3 bytes // Message Length 3 bytes
if _, err = io.ReadFull(conn, b); err != nil { if _, err = io.ReadFull(conn, b); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 3 conn.readSeqNum += 3
h.MessageLength = util.BigEndian.Uint24(b) h.MessageLength = utils.BigEndian.Uint24(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
// Message Type ID 1 bytes // Message Type ID 1 bytes
v, err := conn.ReadByte() v, err := conn.ReadByte()
if err != nil { if err != nil {
@@ -586,35 +582,35 @@ func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (head *
// ExtendTimestamp 4 bytes // ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { if h.Timestamp == 0xffffff {
bb := pool.GetSlice(4) bb := utils.GetSlice(4)
if _, err := io.ReadFull(conn, bb); err != nil { if _, err := io.ReadFull(conn, bb); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 4 conn.readSeqNum += 4
h.ExtendTimestamp = util.BigEndian.Uint32(bb) h.ExtendTimestamp = utils.BigEndian.Uint32(bb)
pool.RecycleSlice(bb) utils.RecycleSlice(bb)
} }
} }
case 2: case 2:
{ {
// Timestamp 3 bytes // Timestamp 3 bytes
b := pool.GetSlice(3) b := utils.GetSlice(3)
if _, err = io.ReadFull(conn, b); err != nil { if _, err = io.ReadFull(conn, b); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 3 conn.readSeqNum += 3
h.ChunkType = chunkType h.ChunkType = chunkType
h.Timestamp = util.BigEndian.Uint24(b) h.Timestamp = utils.BigEndian.Uint24(b)
pool.RecycleSlice(b) utils.RecycleSlice(b)
// ExtendTimestamp 4 bytes // ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { if h.Timestamp == 0xffffff {
bb := pool.GetSlice(4) bb := utils.GetSlice(4)
if _, err := io.ReadFull(conn, bb); err != nil { if _, err := io.ReadFull(conn, bb); err != nil {
return nil, err return nil, err
} }
conn.readSeqNum += 4 conn.readSeqNum += 4
h.ExtendTimestamp = util.BigEndian.Uint32(bb) h.ExtendTimestamp = utils.BigEndian.Uint32(bb)
pool.RecycleSlice(bb) utils.RecycleSlice(bb)
} }
} }
case 3: case 3:

View File

@@ -3,19 +3,17 @@ package rtmp
import ( import (
"bufio" "bufio"
"fmt" "fmt"
"github.com/Monibuca/engine/v3"
"github.com/Monibuca/utils/v3"
"github.com/Monibuca/utils/v3/codec"
"log" "log"
"net" "net"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
. "github.com/Monibuca/engine/v2"
"github.com/Monibuca/engine/v2/avformat"
) )
type RTMP struct {
Publisher
}
func ListenRtmp(addr string) error { func ListenRtmp(addr string) error {
defer log.Println("rtmp server start!") defer log.Println("rtmp server start!")
// defer fmt.Println("server start!") // defer fmt.Println("server start!")
@@ -53,37 +51,104 @@ func ListenRtmp(addr string) error {
var gstreamid = uint32(64) var gstreamid = uint32(64)
func processRtmp(conn net.Conn) { func processRtmp(conn net.Conn) {
var stream *Stream var stream *engine.Stream
streams := make(map[uint32]*Subscriber) streams := make(map[uint32]*engine.Subscriber)
defer func() { defer func() {
conn.Close() conn.Close()
if stream != nil { if stream != nil {
stream.Cancel() stream.Close()
} }
for _, s := range streams { for _, s := range streams {
s.Close() s.Close()
} }
}() }()
var totalDuration uint32 nc := NetConnection{
nc := &NetConnection{
ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)), ReadWriter: bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader), rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32][]byte), incompleteRtmpBody: make(map[uint32][]byte),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3, bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
nextStreamID: func(u uint32) uint32 { nextStreamID: func() uint32 {
gstreamid++ return atomic.AddUint32(&gstreamid, 1)
return gstreamid
}, },
} }
/* Handshake */ /* Handshake */
if MayBeError(Handshake(nc.ReadWriter)) { if utils.MayBeError(Handshake(nc.ReadWriter)) {
return return
} }
if MayBeError(nc.OnConnect()) { if utils.MayBeError(nc.OnConnect()) {
return return
} }
var rec_audio, rec_video func(*Chunk)
rec_audio = func(msg *Chunk) {
var ts_audio uint32
va := stream.AudioTracks[0]
tmp := msg.Body[0]
if va.SoundFormat = tmp >> 4; va.SoundFormat == 10 {
if msg.Body[1] != 0 {
return
}
va.ASC = msg.Body[2:]
config1, config2 := msg.Body[2], msg.Body[3]
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
va.SoundRate = codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]
va.SoundType = (config2 >> 3) & 0x0F //声道
//frameLengthFlag = (config2 >> 2) & 0x01
//dependsOnCoreCoder = (config2 >> 1) & 0x01
//extensionFlag = config2 & 0x01
} else {
va.SoundRate = codec.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz
va.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
va.SoundType = tmp & 0x01 // 0 单声道1立体声
}
va.RtmpTag = msg.Body
rec_audio = func(msg *Chunk) {
if msg.Timestamp == 0xffffff {
ts_audio += msg.ExtendTimestamp
} else {
ts_audio += msg.Timestamp // 绝对时间戳
}
stream.PushAudio(ts_audio, msg.Body[2:])
}
}
rec_video = func(msg *Chunk) {
// 等待AVC序列帧
if msg.Body[1] != 0 {
return
}
vt := stream.VideoTracks[0]
var ts_video uint32
var info codec.AVCDecoderConfigurationRecord
//0:codec,1:IsAVCSequence,2~4:compositionTime
if _, err := info.Unmarshal(msg.Body[5:]); err == nil {
vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.SPS = info.SequenceParameterSetNALUnit
vt.PPS = info.PictureParameterSetNALUnit
}
vt.RtmpTag = msg.Body
nalulenSize := int(info.LengthSizeMinusOne&3 + 1)
rec_video = func(msg *Chunk) {
nalus := msg.Body[5:]
if msg.Timestamp == 0xffffff {
ts_video += msg.ExtendTimestamp
} else {
ts_video += msg.Timestamp // 绝对时间戳
}
for len(nalus) > nalulenSize {
nalulen := 0
for i := 0; i < nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1))
}
vt.Push(ts_video, nalus[nalulenSize:nalulen+nalulenSize])
nalus = nalus[nalulen+nalulenSize:]
}
}
}
for { for {
if msg, err := nc.RecvMessage(); err == nil { if msg, err := nc.RecvMessage(); err == nil {
if msg.MessageLength <= 0 { if msg.MessageLength <= 0 {
@@ -97,16 +162,16 @@ func processRtmp(conn net.Conn) {
cmd := msg.MsgData.(Commander).GetCommand() cmd := msg.MsgData.(Commander).GetCommand()
switch cmd.CommandName { switch cmd.CommandName {
case "createStream": case "createStream":
nc.streamID = nc.nextStreamID(msg.ChunkStreamID) nc.streamID = nc.nextStreamID()
log.Println("createStream:", nc.streamID) log.Println("createStream:", nc.streamID)
err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId) err = nc.SendMessage(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId)
if MayBeError(err) { if utils.MayBeError(err) {
return return
} }
case "publish": case "publish":
pm := msg.MsgData.(*PublishMessage) pm := msg.MsgData.(*PublishMessage)
streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0] streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0]
if pub := new(RTMP); pub.Publish(streamPath) { if pub := new(engine.Publisher); pub.Publish(streamPath) {
pub.Type = "RTMP" pub.Type = "RTMP"
stream = pub.Stream stream = pub.Stream
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
@@ -118,46 +183,7 @@ func processRtmp(conn net.Conn) {
pm := msg.MsgData.(*PlayMessage) pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0]
nc.writeChunkSize = config.ChunkSize nc.writeChunkSize = config.ChunkSize
var lastAudioTime uint32 = 0 stream := engine.Subscriber{}
var lastVideoTime uint32 = 0
// followAVCSequence := false
stream := &Subscriber{OnData: func(packet *avformat.SendPacket) (err error) {
switch true {
// case packet.IsADTS:
// tagPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
// tagPacket.Payload = avformat.ADTSToAudioSpecificConfig(packet.Payload)
// err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, tagPacket)
// ADTSLength := 7 + (int(packet.Payload[1]&1) << 1)
// if len(packet.Payload) > ADTSLength {
// contentPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
// contentPacket.Timestamp = packet.Timestamp
// contentPacket.Payload = make([]byte, len(packet.Payload)-ADTSLength+2)
// contentPacket.Payload[0] = 0xAF
// contentPacket.Payload[1] = 0x01 //raw AAC
// copy(contentPacket.Payload[2:], packet.Payload[ADTSLength:])
// err = nc.SendMessage(SEND_AUDIO_MESSAGE, contentPacket)
// }
case packet.Type == RTMP_MSG_VIDEO:
if packet.IsSequence {
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, packet)
} else {
t := packet.Timestamp - lastVideoTime
lastVideoTime = packet.Timestamp
packet.Timestamp = t
err = nc.SendMessage(SEND_VIDEO_MESSAGE, packet)
}
case packet.Type == RTMP_MSG_AUDIO:
if packet.IsSequence {
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, packet)
} else {
t := packet.Timestamp - lastAudioTime
lastAudioTime = packet.Timestamp
packet.Timestamp = t
err = nc.SendMessage(SEND_AUDIO_MESSAGE, packet)
}
}
return
}}
stream.Type = "RTMP" stream.Type = "RTMP"
stream.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID) stream.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID)
err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize)) err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize))
@@ -166,22 +192,71 @@ func processRtmp(conn net.Conn) {
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
if err == nil { if err == nil {
streams[nc.streamID] = stream streams[nc.streamID] = &stream
go stream.Subscribe(streamPath) if err = stream.Subscribe(streamPath); err == nil {
vt, at := stream.VideoTracks[0], stream.AudioTracks[0]
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag})
if at.SoundFormat == 10 {
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag})
}
var lastAudioTime, lastVideoTime uint32
var lock sync.Mutex
go vt.Play(stream.Context, func(pack engine.VideoPack) {
if lastVideoTime == 0 {
lastVideoTime = pack.Timestamp
}
t := pack.Timestamp - lastVideoTime
lastVideoTime = pack.Timestamp
payload := utils.GetSlice(9 + len(pack.Payload))
defer utils.RecycleSlice(payload)
if pack.NalType == codec.NALU_IDR_Picture {
payload[0] = 0x17
} else {
payload[0] = 0x27
}
payload[1] = 0x01
utils.BigEndian.PutUint32(payload[5:], uint32(len(pack.Payload)))
copy(payload[9:], pack.Payload)
lock.Lock()
defer lock.Unlock()
err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
})
go at.Play(stream.Context, func(pack engine.AudioPack) {
if lastAudioTime == 0 {
lastAudioTime = pack.Timestamp
}
t := pack.Timestamp - lastAudioTime
lastAudioTime = pack.Timestamp
l := len(pack.Payload) + 1
if at.SoundFormat == 10 {
l++
}
payload := utils.GetSlice(l)
defer utils.RecycleSlice(payload)
payload[0] = at.RtmpTag[0]
if at.SoundFormat == 10 {
payload[1] = 1
}
copy(payload[2:], pack.Payload)
lock.Lock()
defer lock.Unlock()
err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
})
}
} else { } else {
return return
} }
case "closeStream": case "closeStream":
cm := msg.MsgData.(*CURDStreamMessage) cm := msg.MsgData.(*CURDStreamMessage)
if stream, ok := streams[cm.StreamId]; ok { if stream, ok := streams[cm.StreamId]; ok {
stream.Cancel() stream.Close()
delete(streams, cm.StreamId) delete(streams, cm.StreamId)
} }
case "releaseStream": case "releaseStream":
cm := msg.MsgData.(*ReleaseStreamMessage) cm := msg.MsgData.(*ReleaseStreamMessage)
streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0] streamPath := nc.appName + "/" + strings.Split(cm.StreamName, "?")[0]
amfobj := newAMFObjects() amfobj := newAMFObjects()
if s := FindStream(streamPath); s != nil { if s := engine.FindStream(streamPath); s != nil {
amfobj["level"] = "_result" amfobj["level"] = "_result"
s.Close() s.Close()
} else { } else {
@@ -191,21 +266,9 @@ func processRtmp(conn net.Conn) {
err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj) err = nc.SendMessage(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj)
} }
case RTMP_MSG_AUDIO: case RTMP_MSG_AUDIO:
// pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO) rec_audio(msg)
if msg.Timestamp == 0xffffff {
totalDuration += msg.ExtendTimestamp
} else {
totalDuration += msg.Timestamp // 绝对时间戳
}
stream.PushAudio(totalDuration, msg.Body)
case RTMP_MSG_VIDEO: case RTMP_MSG_VIDEO:
// pkt := avformat.NewAVPacket(RTMP_MSG_VIDEO) rec_video(msg)
if msg.Timestamp == 0xffffff {
totalDuration += msg.ExtendTimestamp
} else {
totalDuration += msg.Timestamp // 绝对时间戳
}
stream.PushVideo(totalDuration, msg.Body)
} }
msg.Recycle() msg.Recycle()
} else { } else {