This commit is contained in:
dexter
2023-01-28 18:24:08 +08:00
parent 95125b5560
commit e45e9b2912
7 changed files with 305 additions and 455 deletions

111
chunk.go
View File

@@ -1,8 +1,6 @@
package rtmp package rtmp
import ( import (
"encoding/binary"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -27,45 +25,31 @@ const (
type Chunk struct { type Chunk struct {
ChunkHeader ChunkHeader
Body []byte AVData util.BLL
MsgData RtmpMessage MsgData RtmpMessage
} }
func (c *Chunk) Encode(msg RtmpMessage) {
c.MsgData = msg
c.Body = msg.Encode()
c.MessageLength = uint32(len(c.Body))
}
type ChunkHeader struct { type ChunkHeader struct {
ChunkBasicHeader ChunkStreamID uint32 `json:""`
ChunkMessageHeader
// Extended Timestamp (0 or 4 bytes): This field is present in certain
// circumstances depending on the encoded timestamp or timestamp
// delta field in the Chunk Message header. See Section 5.3.1.3 for
// more information
ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略
}
// Basic Header (1 to 3 bytes) : This field encodes the chunk stream ID
// and the chunk type. Chunk type determines the format of the
// encoded message header. The length(Basic Header) depends entirely on the chunk
// stream ID, which is a variable-length field.
type ChunkBasicHeader struct {
ChunkStreamID uint32 `json:""` // 6 bit. 3 ~ 65559, 0,1,2 reserved
ChunkType byte `json:""` // 2 bit.
}
// Message Header (0, 3, 7, or 11 bytes): This field encodes
// information about the message being sent (whether in whole or in
// part). The length can be determined using the chunk type
// specified in the chunk header.
type ChunkMessageHeader struct {
Timestamp uint32 `json:""` // 3 byte Timestamp uint32 `json:""` // 3 byte
MessageLength uint32 `json:""` // 3 byte MessageLength uint32 `json:""` // 3 byte
MessageTypeID byte `json:""` // 1 byte MessageTypeID byte `json:""` // 1 byte
MessageStreamID uint32 `json:""` // 4 byte MessageStreamID uint32 `json:""` // 4 byte
// Extended Timestamp (0 or 4 bytes): This field is present in certain
// circumstances depending on the encoded timestamp or timestamp
// delta field in the Chunk Message header. See Section 5.3.1.3 for
// more information
ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略
}
func (c *ChunkHeader) SetTimestamp(timestamp uint32) {
if timestamp >= 0xFFFFFF {
c.ExtendTimestamp = timestamp
c.Timestamp = 0xFFFFFF
} else {
c.ExtendTimestamp = 0
c.Timestamp = timestamp
}
} }
// ChunkBasicHeader会决定ChunkMessgaeHeader,ChunkMessgaeHeader有4种(0,3,7,11 Bytes),因此可能有4种头. // ChunkBasicHeader会决定ChunkMessgaeHeader,ChunkMessgaeHeader有4种(0,3,7,11 Bytes),因此可能有4种头.
@@ -75,49 +59,22 @@ type ChunkMessageHeader struct {
// 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7) // 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7)
// 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11) // 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11)
func (nc *NetConnection) encodeChunk12(head *ChunkHeader) []byte { func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) {
b := util.Buffer(make([]byte, 0, 16)) b.Reset()
b.WriteByte(byte(RTMP_CHUNK_HEAD_12 + head.ChunkStreamID)) csid := byte(h.ChunkStreamID)
b.WriteUint24(head.Timestamp) b.WriteByte(t + csid)
b.WriteUint24(head.MessageLength)
b.WriteByte(head.MessageTypeID) if t < RTMP_CHUNK_HEAD_1 {
binary.LittleEndian.PutUint32(b.Malloc(4), head.MessageStreamID) b.WriteUint24(h.Timestamp)
if head.ChunkMessageHeader.Timestamp == 0xffffff { if t < RTMP_CHUNK_HEAD_4 {
binary.LittleEndian.PutUint32(b.Malloc(4), head.ExtendTimestamp) b.WriteUint24(h.MessageLength)
b.WriteByte(h.MessageTypeID)
if t < RTMP_CHUNK_HEAD_8 {
b.WriteUint32(h.MessageStreamID)
}
}
}
if h.Timestamp == 0xffffff {
b.WriteUint32(h.ExtendTimestamp)
} }
return b
}
func (nc *NetConnection) encodeChunk8(head *ChunkHeader) []byte {
b := util.Buffer(make([]byte, 0, 8))
b.WriteByte(byte(RTMP_CHUNK_HEAD_8 + head.ChunkStreamID))
b.WriteUint24(head.Timestamp)
b.WriteUint24(head.MessageLength)
b.WriteByte(head.MessageTypeID)
return b
}
// func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size int) (need []byte, err error) {
// if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 {
// return nil, errors.New("chunk error")
// }
// b := make([]byte, 4)
// //chunkBasicHead
// b[0] = byte(RTMP_CHUNK_HEAD_4 + head.ChunkStreamID)
// util.PutBE(b[1:4], head.Timestamp)
// nc.Write(b)
// nc.writeSeqNum += 4
// if len(payload) > size {
// nc.Write(payload[0:size])
// nc.writeSeqNum += uint32(size)
// need = payload[size:]
// } else {
// nc.Write(payload)
// nc.writeSeqNum += uint32(len(payload))
// }
// return
// }
func (nc *NetConnection) encodeChunk1(head *ChunkHeader) []byte {
return []byte{byte(RTMP_CHUNK_HEAD_1 + head.ChunkStreamID)}
} }

View File

@@ -1,7 +1,6 @@
package rtmp package rtmp
import ( import (
"bufio"
"crypto/tls" "crypto/tls"
"errors" "errors"
"net" "net"
@@ -10,7 +9,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4" "m7s.live/engine/v4"
"m7s.live/engine/v4/util"
) )
func NewRTMPClient(addr string) (client *NetConnection, err error) { func NewRTMPClient(addr string) (client *NetConnection, err error) {
@@ -49,16 +47,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
conn.Close() conn.Close()
} }
}() }()
client = &NetConnection{ client = NewNetConnection(conn)
Conn: conn,
Reader: bufio.NewReader(conn),
writeChunkSize: conf.ChunkSize,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32]*util.Buffer),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make([]byte, 4),
}
err = client.ClientHandshake() err = client.ClientHandshake()
if err != nil { if err != nil {
RTMPPlugin.Error("handshake", zap.Error(err)) RTMPPlugin.Error("handshake", zap.Error(err))
@@ -69,6 +58,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) {
if err != nil { if err != nil {
return return
} }
client.writeChunkSize = conf.ChunkSize
err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{ err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{
CommandMessage{"connect", 1}, CommandMessage{"connect", 1},
map[string]any{ map[string]any{
@@ -135,6 +125,9 @@ func (pusher *RTMPPusher) Push() error {
_, streamPath, _ := strings.Cut(URL.Path, "/") _, streamPath, _ := strings.Cut(URL.Path, "/")
_, streamPath, _ = strings.Cut(streamPath, "/") _, streamPath, _ = strings.Cut(streamPath, "/")
pusher.Args = URL.Query() pusher.Args = URL.Query()
if len(pusher.Args) > 0 {
streamPath += "?" + pusher.Args.Encode()
}
pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &PublishMessage{ pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &PublishMessage{
CURDStreamMessage{ CURDStreamMessage{
CommandMessage{ CommandMessage{
@@ -172,13 +165,12 @@ func (puller *RTMPPuller) Connect() (err error) {
} }
func (puller *RTMPPuller) Pull() (err error) { func (puller *RTMPPuller) Pull() (err error) {
puller.absTs = make(map[uint32]uint32)
defer puller.Stop() defer puller.Stop()
err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2})
for err == nil { for err == nil {
msg, err := puller.RecvMessage() msg, err := puller.RecvMessage()
if err != nil { if err != nil {
break return err
} }
switch msg.MessageTypeID { switch msg.MessageTypeID {
case RTMP_MSG_AUDIO: case RTMP_MSG_AUDIO:
@@ -199,6 +191,9 @@ func (puller *RTMPPuller) Pull() (err error) {
ps := strings.Split(URL.Path, "/") ps := strings.Split(URL.Path, "/")
puller.Args = URL.Query() puller.Args = URL.Query()
m.StreamName = ps[len(ps)-1] m.StreamName = ps[len(ps)-1]
if len(puller.Args) > 0 {
m.StreamName += "?" + puller.Args.Encode()
}
puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m) puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m)
// if response, ok := msg.MsgData.(*ResponsePlayMessage); ok { // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok {
// if response.Object["code"] == "NetStream.Play.Start" { // if response.Object["code"] == "NetStream.Play.Start" {

View File

@@ -62,7 +62,7 @@ func (c *RTMPConfig) OnEvent(event any) {
} }
var conf = &RTMPConfig{ var conf = &RTMPConfig{
ChunkSize: 4096, ChunkSize: 65536,
TCP: config.TCP{ListenAddr: ":1935"}, TCP: config.TCP{ListenAddr: ":1935"},
} }
var RTMPPlugin = InstallPlugin(conf) var RTMPPlugin = InstallPlugin(conf)

133
media.go
View File

@@ -2,11 +2,10 @@ package rtmp
import ( import (
"errors" "errors"
"net"
"go.uber.org/zap" "go.uber.org/zap"
. "m7s.live/engine/v4" . "m7s.live/engine/v4"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/common"
) )
type RTMPSender struct { type RTMPSender struct {
@@ -14,6 +13,8 @@ type RTMPSender struct {
NetStream NetStream
firstAudioSent bool firstAudioSent bool
firstVideoSent bool firstVideoSent bool
audioChunkHeader ChunkHeader
videoChunkHeader ChunkHeader
} }
func (rtmp *RTMPSender) OnEvent(event any) { func (rtmp *RTMPSender) OnEvent(event any) {
@@ -22,24 +23,27 @@ func (rtmp *RTMPSender) OnEvent(event any) {
rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus) rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus)
case SEpublish: case SEpublish:
rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus) rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus)
case ISubscriber:
rtmp.audioChunkHeader.ChunkStreamID = RTMP_CSID_AUDIO
rtmp.videoChunkHeader.ChunkStreamID = RTMP_CSID_VIDEO
rtmp.audioChunkHeader.MessageTypeID = RTMP_MSG_AUDIO
rtmp.videoChunkHeader.MessageTypeID = RTMP_MSG_VIDEO
rtmp.audioChunkHeader.MessageStreamID = rtmp.StreamID
rtmp.videoChunkHeader.MessageStreamID = rtmp.StreamID
case AudioDeConf: case AudioDeConf:
rtmp.sendAVMessage(0, v.AVCC, true, true) rtmp.audioChunkHeader.SetTimestamp(0)
rtmp.audioChunkHeader.MessageLength = uint32(len(v))
rtmp.audioChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader)
rtmp.sendChunk(v)
case VideoDeConf: case VideoDeConf:
rtmp.sendAVMessage(0, v.AVCC, false, true) rtmp.videoChunkHeader.SetTimestamp(0)
case *AudioFrame: rtmp.videoChunkHeader.MessageLength = uint32(len(v))
if rtmp.firstAudioSent { rtmp.videoChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader)
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) rtmp.sendChunk(v)
} else { case AudioFrame:
rtmp.firstAudioSent = true rtmp.sendAVMessage(v.AVFrame, v.AbsTime, true)
rtmp.sendAVMessage(v.AbsTime, v.AVCC, true, true) case VideoFrame:
} rtmp.sendAVMessage(v.AVFrame, v.AbsTime, false)
case *VideoFrame:
if rtmp.firstVideoSent {
rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false)
} else {
rtmp.firstVideoSent = true
rtmp.sendAVMessage(v.AbsTime, v.AVCC, false, true)
}
default: default:
rtmp.Subscriber.OnEvent(event) rtmp.Subscriber.OnEvent(event)
} }
@@ -48,8 +52,8 @@ func (rtmp *RTMPSender) OnEvent(event any) {
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 // 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) { func (sender *RTMPSender) sendAVMessage(frame *common.AVFrame, absTime uint32, isAudio bool) (err error) {
payloadLen := util.SizeOfBuffers(payload) payloadLen := frame.AVCC.ByteLength
if payloadLen == 0 { if payloadLen == 0 {
err := errors.New("payload is empty") err := errors.New("payload is empty")
sender.Error("payload is empty", zap.Error(err)) sender.Error("payload is empty", zap.Error(err))
@@ -61,35 +65,42 @@ func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio
sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite)) sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite))
sender.SendStreamID(RTMP_USER_PING_REQUEST, 0) sender.SendStreamID(RTMP_USER_PING_REQUEST, 0)
} }
var head *ChunkHeader var head *ChunkHeader
if isAudio {
head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(payloadLen), RTMP_MSG_AUDIO, sender.StreamID, 0)
} else {
head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(payloadLen), RTMP_MSG_VIDEO, sender.StreamID, 0)
}
var isFirst = false
if isAudio {
head = &sender.audioChunkHeader
if isFirst = !sender.firstAudioSent; isFirst {
sender.firstAudioSent = true
}
} else {
head = &sender.videoChunkHeader
if isFirst = !sender.firstVideoSent; isFirst {
sender.firstVideoSent = true
}
}
head.MessageLength = uint32(payloadLen)
// 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括))
// 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7))
// 当Chunk Type为0时(即Chunk12), // 当Chunk Type为0时(即Chunk12),
var chunk1 net.Buffers
if isFirst { if isFirst {
chunk1 = append(chunk1, sender.encodeChunk12(head)) head.SetTimestamp(absTime)
head.WriteTo(RTMP_CHUNK_HEAD_12, &sender.chunkHeader)
} else { } else {
chunk1 = append(chunk1, sender.encodeChunk8(head)) head.SetTimestamp(frame.DeltaTime)
head.WriteTo(RTMP_CHUNK_HEAD_8, &sender.chunkHeader)
} }
chunks := util.SplitBuffers(payload, sender.writeChunkSize) r := frame.AVCC.NewReader()
chunk1 = append(chunk1, chunks[0]...) chunk := r.ReadN(sender.writeChunkSize)
sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) // payloadLen -= util.SizeOfBuffers(chunk)
_, err = chunk1.WriteTo(sender.NetConnection) sender.sendChunk(chunk...)
if r.CanRead() {
// 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1))
for _, chunk := range chunks[1:] { for head.WriteTo(RTMP_CHUNK_HEAD_1, &sender.chunkHeader); r.CanRead(); sender.sendChunk(chunk...) {
chunk1 = net.Buffers{sender.encodeChunk1(head)} chunk = r.ReadN(sender.writeChunkSize)
chunk1 = append(chunk1, chunk...) // payloadLen -= util.SizeOfBuffers(chunk)
sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) }
_, err = chunk1.WriteTo(sender.NetConnection)
} }
return nil return nil
} }
@@ -97,7 +108,7 @@ func (r *RTMPSender) Response(tid uint64, code, level string) error {
m := new(ResponsePlayMessage) m := new(ResponsePlayMessage)
m.CommandName = Response_OnStatus m.CommandName = Response_OnStatus
m.TransactionId = tid m.TransactionId = tid
m.Object = map[string]any{ m.Infomation = map[string]any{
"code": code, "code": code,
"level": level, "level": level,
"description": "", "description": "",
@@ -109,7 +120,6 @@ func (r *RTMPSender) Response(tid uint64, code, level string) error {
type RTMPReceiver struct { type RTMPReceiver struct {
Publisher Publisher
NetStream NetStream
absTs map[uint32]uint32
} }
func (r *RTMPReceiver) Response(tid uint64, code, level string) error { func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
@@ -127,43 +137,20 @@ func (r *RTMPReceiver) Response(tid uint64, code, level string) error {
func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) { func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) {
if r.AudioTrack == nil { if r.AudioTrack == nil {
r.absTs[msg.ChunkStreamID] = 0 if r.WriteAVCCAudio(0, msg.AVData); r.AudioTrack != nil {
r.WriteAVCCAudio(0, msg.Body) r.AudioTrack.SetStuff(r.bytePool)
}
return return
} }
ts := msg.Timestamp r.AudioTrack.WriteAVCC(msg.ExtendTimestamp, msg.AVData)
if ts == 0xffffff {
ts = msg.ExtendTimestamp
}
if msg.ChunkType == 0 {
if r.AudioTrack.GetBase().Name == "" {
r.absTs[msg.ChunkStreamID] = 0
} else {
r.absTs[msg.ChunkStreamID] = ts
}
} else {
r.absTs[msg.ChunkStreamID] += ts
}
r.AudioTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
} }
func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) { func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) {
if r.VideoTrack == nil { if r.VideoTrack == nil {
r.absTs[msg.ChunkStreamID] = 0 if r.WriteAVCCVideo(0, msg.AVData); r.VideoTrack != nil {
r.WriteAVCCVideo(0, msg.Body) r.VideoTrack.SetStuff(r.bytePool)
}
return return
} }
ts := msg.Timestamp r.VideoTrack.WriteAVCC(msg.ExtendTimestamp, msg.AVData)
if ts == 0xffffff {
ts = msg.ExtendTimestamp
}
if msg.ChunkType == 0 {
if r.VideoTrack.GetBase().Name == "" {
r.absTs[msg.ChunkStreamID] = 0
} else {
r.absTs[msg.ChunkStreamID] = ts
}
} else {
r.absTs[msg.ChunkStreamID] += ts
}
r.VideoTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body)
} }

190
msg.go
View File

@@ -6,7 +6,6 @@ import (
"strings" "strings"
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4/codec"
"m7s.live/engine/v4/util" "m7s.live/engine/v4/util"
) )
@@ -72,20 +71,7 @@ func newChunkHeader(messageType byte) *ChunkHeader {
if messageType == RTMP_MSG_AMF0_COMMAND { if messageType == RTMP_MSG_AMF0_COMMAND {
head.ChunkStreamID = RTMP_CSID_COMMAND head.ChunkStreamID = RTMP_CSID_COMMAND
} }
head.Timestamp = 0
head.MessageTypeID = messageType head.MessageTypeID = messageType
head.MessageStreamID = 0
head.ExtendTimestamp = 0
return head
}
func newRtmpHeader(chunkID uint32, timestamp uint32, messageLength uint32, messageType byte, messageStreamID uint32, extendTimestamp uint32) *ChunkHeader {
head := new(ChunkHeader)
head.ChunkStreamID = chunkID
head.Timestamp = timestamp
head.MessageLength = messageLength
head.MessageTypeID = messageType
head.MessageStreamID = messageStreamID
head.ExtendTimestamp = extendTimestamp
return head return head
} }
@@ -94,23 +80,22 @@ func (h ChunkHeader) Clone() *ChunkHeader {
} }
type RtmpMessage interface { type RtmpMessage interface {
Encode() []byte Encode(*util.Buffer)
} }
type HaveStreamID interface { type HaveStreamID interface {
GetStreamID() uint32 GetStreamID() uint32
} }
func GetRtmpMessage(chunk *Chunk) error { func GetRtmpMessage(chunk *Chunk, body util.Buffer) error {
body := util.Buffer(chunk.Body)
switch chunk.MessageTypeID { switch chunk.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE: case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE:
if len(chunk.Body) < 4 { if body.Len() < 4 {
return errors.New("chunk.Body < 4") return errors.New("chunk.Body < 4")
} }
chunk.MsgData = Uint32Message(body.ReadUint32()) chunk.MsgData = Uint32Message(body.ReadUint32())
case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件. case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件.
{ {
if len(chunk.Body) < 2 { if body.Len() < 2 {
return errors.New("UserControlMessage.Body < 2") return errors.New("UserControlMessage.Body < 2")
} }
base := UserControlMessage{ base := UserControlMessage{
@@ -153,7 +138,7 @@ func GetRtmpMessage(chunk *Chunk) error {
} }
} }
case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽. case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽.
if len(chunk.Body) < 4 { if body.Len() < 4 {
return errors.New("chunk.Body < 4") return errors.New("chunk.Body < 4")
} }
m := &SetPeerBandwidthMessage{ m := &SetPeerBandwidthMessage{
@@ -169,11 +154,11 @@ func GetRtmpMessage(chunk *Chunk) error {
case RTMP_MSG_AMF3_METADATA: // RTMP消息类型ID=15, 数据消息.用AMF3编码. case RTMP_MSG_AMF3_METADATA: // RTMP消息类型ID=15, 数据消息.用AMF3编码.
case RTMP_MSG_AMF3_SHARED: // RTMP消息类型ID=16, 共享对象消息.用AMF3编码. case RTMP_MSG_AMF3_SHARED: // RTMP消息类型ID=16, 共享对象消息.用AMF3编码.
case RTMP_MSG_AMF3_COMMAND: // RTMP消息类型ID=17, 命令消息.用AMF3编码. case RTMP_MSG_AMF3_COMMAND: // RTMP消息类型ID=17, 命令消息.用AMF3编码.
decodeCommandAMF3(chunk) decodeCommandAMF0(chunk, body[1:])
case RTMP_MSG_AMF0_METADATA: // RTMP消息类型ID=18, 数据消息.用AMF0编码. case RTMP_MSG_AMF0_METADATA: // RTMP消息类型ID=18, 数据消息.用AMF0编码.
case RTMP_MSG_AMF0_SHARED: // RTMP消息类型ID=19, 共享对象消息.用AMF0编码. case RTMP_MSG_AMF0_SHARED: // RTMP消息类型ID=19, 共享对象消息.用AMF0编码.
case RTMP_MSG_AMF0_COMMAND: // RTMP消息类型ID=20, 命令消息.用AMF0编码. case RTMP_MSG_AMF0_COMMAND: // RTMP消息类型ID=20, 命令消息.用AMF0编码.
decodeCommandAMF0(chunk) // 解析具体的命令消息 decodeCommandAMF0(chunk, body) // 解析具体的命令消息
case RTMP_MSG_AGGREGATE: case RTMP_MSG_AGGREGATE:
default: default:
} }
@@ -198,8 +183,8 @@ func GetRtmpMessage(chunk *Chunk) error {
// object类型要复杂点. // object类型要复杂点.
// 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束 // 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束
func decodeCommandAMF0(chunk *Chunk) { func decodeCommandAMF0(chunk *Chunk, body []byte) {
amf := codec.AMF{chunk.Body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去. amf := util.AMF{body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去.
cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型. cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型.
cmdMsg := CommandMessage{ cmdMsg := CommandMessage{
cmd, cmd,
@@ -328,7 +313,6 @@ func decodeCommandAMF0(chunk *Chunk) {
chunk.MsgData = &ResponsePlayMessage{ chunk.MsgData = &ResponsePlayMessage{
cmdMsg, cmdMsg,
response.Infomation, response.Infomation,
"",
chunk.MessageStreamID, chunk.MessageStreamID,
} }
} else { } else {
@@ -342,11 +326,6 @@ func decodeCommandAMF0(chunk *Chunk) {
} }
} }
func decodeCommandAMF3(chunk *Chunk) {
chunk.Body = chunk.Body[1:]
decodeCommandAMF0(chunk)
}
/* Command Message */ /* Command Message */
type CommandMessage struct { type CommandMessage struct {
CommandName string // 命令名. 字符串. 命令名.设置为"connect" CommandName string // 命令名. 字符串. 命令名.设置为"connect"
@@ -360,8 +339,8 @@ func (cmd *CommandMessage) GetCommand() *CommandMessage {
return cmd return cmd
} }
func (msg *CommandMessage) Encode() (b []byte) { func (msg *CommandMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil)
} }
// Protocol control message 1. // Protocol control message 1.
@@ -370,10 +349,8 @@ func (msg *CommandMessage) Encode() (b []byte) {
// chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the senders subsequent chunks until further notice // chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the senders subsequent chunks until further notice
type Uint32Message uint32 type Uint32Message uint32
func (msg Uint32Message) Encode() (b []byte) { func (msg Uint32Message) Encode(buf *util.Buffer) {
b = make([]byte, 4) binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg))
binary.BigEndian.PutUint32(b, uint32(msg))
return b
} }
// Protocol control message 4, User Control Messages. // Protocol control message 4, User Control Messages.
@@ -397,11 +374,9 @@ type SetPeerBandwidthMessage struct {
LimitType byte LimitType byte
} }
func (msg *SetPeerBandwidthMessage) Encode() (b []byte) { func (msg *SetPeerBandwidthMessage) Encode(buf *util.Buffer) {
b = make([]byte, 5) buf.WriteUint32(msg.AcknowledgementWindowsize)
binary.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize) buf.WriteByte(msg.LimitType)
b[4] = msg.LimitType
return
} }
// Message 15, 18. Data Message. The client or the server sends this message to send Metadata or any // Message 15, 18. Data Message. The client or the server sends this message to send Metadata or any
@@ -431,20 +406,18 @@ type CallMessage struct {
Optional map[string]any `json:",omitempty"` Optional map[string]any `json:",omitempty"`
} }
func (msg *CallMessage) Encode() []byte { func (msg *CallMessage) Encode(buf *util.Buffer) {
var amf codec.AMF buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object)
amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object)
if msg.Optional != nil { if msg.Optional != nil {
amf.Marshal(msg.Optional) buf.MarshalAMFs(msg.Optional)
} }
return amf.Buffer
} }
func (msg *CallMessage) Encode3() []byte { // func (msg *CallMessage) Encode3() []byte {
var amf codec.AMF // var amf util.AMF
amf.WriteByte(0) // amf.WriteByte(0)
return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional) // return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional)
} // }
// Create Stream Message. // Create Stream Message.
// The client sends this command to the server to create a logical channel for message communication The publishing of audio, // The client sends this command to the server to create a logical channel for message communication The publishing of audio,
@@ -492,8 +465,7 @@ type PlayMessage struct {
// Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束 // Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束
// Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush // Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush
func (msg *PlayMessage) Encode() []byte { func (msg *PlayMessage) Encode(buf *util.Buffer) {
var amf codec.AMF
// if msg.Start > 0 { // if msg.Start > 0 {
// amf.writeNumber(msg.Start) // amf.writeNumber(msg.Start)
// } // }
@@ -503,7 +475,7 @@ func (msg *PlayMessage) Encode() []byte {
// } // }
// amf.writeBool(msg.Reset) // amf.writeBool(msg.Reset)
return amf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000)
} }
/* /*
@@ -576,8 +548,8 @@ type PublishMessage struct {
// “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件. // “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件.
// “live”:发布直播数据而不录制到文件 // “live”:发布直播数据而不录制到文件
func (msg *PublishMessage) Encode() []byte { func (msg *PublishMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType)
} }
// Seek Message // Seek Message
@@ -618,8 +590,8 @@ type ResponseConnectMessage struct {
Infomation map[string]any `json:",omitempty"` Infomation map[string]any `json:",omitempty"`
} }
func (msg *ResponseConnectMessage) Encode() []byte { func (msg *ResponseConnectMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
} }
/* /*
@@ -633,9 +605,9 @@ type ResponseCallMessage struct {
Response map[string]any Response map[string]any
} }
func (msg *ResponseCallMessage) Encode0() []byte { // func (msg *ResponseCallMessage) Encode0() []byte {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response) // return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response)
} // }
// Response Create Stream Message // Response Create Stream Message
type ResponseCreateStreamMessage struct { type ResponseCreateStreamMessage struct {
@@ -644,55 +616,56 @@ type ResponseCreateStreamMessage struct {
StreamId uint32 StreamId uint32
} }
func (msg *ResponseCreateStreamMessage) Encode() []byte { func (msg *ResponseCreateStreamMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId)
} }
/* /*
func (msg *ResponseCreateStreamMessage) Encode3() { func (msg *ResponseCreateStreamMessage) Encode3() {
}*/ }*/
func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) { // func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) {
amf := codec.AMF{chunk.Body} // amf := util.AMF{chunk.Body}
msg.CommandName = amf.ReadShortString() // msg.CommandName = amf.ReadShortString()
msg.TransactionId = uint64(amf.ReadNumber()) // msg.TransactionId = uint64(amf.ReadNumber())
amf.Unmarshal() // amf.Unmarshal()
msg.StreamId = uint32(amf.ReadNumber()) // msg.StreamId = uint32(amf.ReadNumber())
} // }
func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) {
chunk.Body = chunk.Body[1:] // func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) {
msg.Decode0(chunk) // chunk.Body = chunk.Body[1:]
} // msg.Decode0(chunk)
// }
// Response Play Message // Response Play Message
type ResponsePlayMessage struct { type ResponsePlayMessage struct {
CommandMessage CommandMessage
Object map[string]any `json:",omitempty"` Infomation map[string]any `json:",omitempty"`
Description string
StreamID uint32 StreamID uint32
} }
func (msg *ResponsePlayMessage) GetStreamID() uint32 { func (msg *ResponsePlayMessage) GetStreamID() uint32 {
return msg.StreamID return msg.StreamID
} }
func (msg *ResponsePlayMessage) Encode() []byte { func (msg *ResponsePlayMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Object, msg.Description) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Infomation)
} }
/* /*
func (msg *ResponsePlayMessage) Encode3() { func (msg *ResponsePlayMessage) Encode3() {
}*/ }*/
func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) { // func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) {
amf := codec.AMF{chunk.Body} // amf := util.AMF{chunk.Body}
msg.CommandName = amf.ReadShortString() // msg.CommandName = amf.ReadShortString()
msg.TransactionId = uint64(amf.ReadNumber()) // msg.TransactionId = uint64(amf.ReadNumber())
msg.Object = amf.ReadObject() // msg.Infomation = amf.ReadObject()
} // }
func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) {
chunk.Body = chunk.Body[1:] // func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) {
msg.Decode0(chunk) // chunk.Body = chunk.Body[1:]
} // msg.Decode0(chunk)
// }
// Response Publish Message // Response Publish Message
type ResponsePublishMessage struct { type ResponsePublishMessage struct {
@@ -711,8 +684,8 @@ func (msg *ResponsePublishMessage) GetStreamID() uint32 {
// 属性 -> null // 属性 -> null
// 信息 -> level, code, description // 信息 -> level, code, description
func (msg *ResponsePublishMessage) Encode() []byte { func (msg *ResponsePublishMessage) Encode(buf *util.Buffer) {
return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation)
} }
/* /*
@@ -771,12 +744,10 @@ type StreamIDMessage struct {
StreamID uint32 StreamID uint32
} }
func (msg *StreamIDMessage) Encode() (b []byte) { func (msg *StreamIDMessage) Encode(buffer *util.Buffer) {
buf := util.Buffer(make([]byte, 0, 6)) buffer.WriteUint16(msg.EventType)
buf.WriteUint16(msg.EventType) msg.EventData = buffer.Malloc(4)
buf.WriteUint32(msg.StreamID) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
msg.EventData = buf[2:]
return buf
} }
// SetBuffer Length (=3) // SetBuffer Length (=3)
@@ -789,13 +760,11 @@ type SetBufferMessage struct {
Millisecond uint32 Millisecond uint32
} }
func (msg *SetBufferMessage) Encode() []byte { func (msg *SetBufferMessage) Encode(buf *util.Buffer) {
buf := util.Buffer(make([]byte, 0, 10))
buf.WriteUint16(msg.EventType) buf.WriteUint16(msg.EventType)
buf.WriteUint32(msg.StreamID) msg.EventData = buf.Malloc(8)
buf.WriteUint32(msg.Millisecond) binary.BigEndian.PutUint32(msg.EventData, msg.StreamID)
msg.EventData = buf[2:] binary.BigEndian.PutUint32(msg.EventData[4:], msg.Millisecond)
return buf
} }
// PingRequest (=6) // PingRequest (=6)
@@ -807,19 +776,12 @@ type PingRequestMessage struct {
Timestamp uint32 Timestamp uint32
} }
func (msg *PingRequestMessage) Encode() (b []byte) { func (msg *PingRequestMessage) Encode(buf *util.Buffer) {
buf := util.Buffer(make([]byte, 0, 6))
buf.WriteUint16(msg.EventType) buf.WriteUint16(msg.EventType)
buf.WriteUint32(msg.Timestamp) msg.EventData = buf.Malloc(4)
msg.EventData = buf[2:] binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp)
return buf
} }
func (msg *UserControlMessage) Encode() []byte { func (msg *UserControlMessage) Encode(buf *util.Buffer) {
return util.PutBE(make([]byte, 2), msg.EventType) buf.WriteUint16(msg.EventType)
}
type AVPack struct {
Timestamp uint32
Payload []byte
} }

View File

@@ -52,15 +52,33 @@ type NetConnection struct {
totalRead uint32 // 总共读了多少字节 totalRead uint32 // 总共读了多少字节
writeChunkSize int writeChunkSize int
readChunkSize int readChunkSize int
incompleteRtmpBody map[uint32]*util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 incommingChunks map[uint32]*Chunk
rtmpHeader map[uint32]*ChunkHeader // RtmpHeader
objectEncoding float64 objectEncoding float64
appName string appName string
tmpBuf []byte //用来接收小数据,复用内存 tmpBuf util.Buffer //用来接收/发送小数据,复用内存
chunkHeader util.Buffer
bytePool util.BytesPool
} }
func NewNetConnection(conn net.Conn) *NetConnection {
return &NetConnection{
Conn: conn,
Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
incommingChunks: make(map[uint32]*Chunk),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make(util.Buffer, 4),
chunkHeader: make(util.Buffer, 0, 16),
bytePool: make(util.BytesPool, 16),
}
}
func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) { func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) {
return io.ReadFull(conn.Reader, buf) n, err = io.ReadFull(conn.Reader, buf)
if err == nil {
conn.readSeqNum += uint32(n)
}
return
} }
func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) { func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) {
return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID}) return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID})
@@ -94,62 +112,58 @@ func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) err
func (conn *NetConnection) readChunk() (msg *Chunk, err error) { func (conn *NetConnection) readChunk() (msg *Chunk, err error) {
head, err := conn.ReadByte() head, err := conn.ReadByte()
conn.readSeqNum++
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn.readSeqNum++
ChunkStreamID := uint32(head & 0x3f) // 0011 1111 ChunkStreamID := uint32(head & 0x3f) // 0011 1111
ChunkType := head >> 6 // 1100 0000 ChunkType := head >> 6 // 1100 0000
// 如果块流ID为0,1的话,就需要计算. // 如果块流ID为0,1的话,就需要计算.
ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID) ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID)
if err != nil { if err != nil {
return nil, errors.New("get chunk stream id error :" + err.Error()) return nil, errors.New("get chunk stream id error :" + err.Error())
} }
// println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType)
chunk, ok := conn.incommingChunks[ChunkStreamID]
h, ok := conn.rtmpHeader[ChunkStreamID] if ChunkType != 3 && ok && chunk.AVData.Length > 0 {
if !ok {
h = &ChunkHeader{
ChunkBasicHeader: ChunkBasicHeader{
ChunkStreamID,
ChunkType,
},
}
conn.rtmpHeader[ChunkStreamID] = h
}
currentBody, ok := conn.incompleteRtmpBody[ChunkStreamID]
if ChunkType != 3 && ok && currentBody.Len() > 0 {
// 如果块类型不为3,那么这个rtmp的body应该为空. // 如果块类型不为3,那么这个rtmp的body应该为空.
return nil, errors.New("incompleteRtmpBody error") return nil, errors.New("incompleteRtmpBody error")
} }
if err = conn.readChunkType(h, ChunkType); err != nil { if !ok {
chunk = &Chunk{}
conn.incommingChunks[ChunkStreamID] = chunk
}
if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil {
return nil, errors.New("get chunk type error :" + err.Error()) return nil, errors.New("get chunk type error :" + err.Error())
} }
msgLen := int(h.MessageLength) msgLen := int(chunk.MessageLength)
if !ok {
newBuffer := util.Buffer(make([]byte, 0, msgLen))
currentBody = &newBuffer
conn.incompleteRtmpBody[ChunkStreamID] = currentBody
}
needRead := conn.readChunkSize needRead := conn.readChunkSize
if unRead := msgLen - currentBody.Len(); unRead < needRead { if unRead := msgLen - chunk.AVData.ByteLength; unRead < needRead {
needRead = unRead needRead = unRead
} }
if n, err := conn.ReadFull(currentBody.Malloc(needRead)); err != nil { mem := conn.bytePool.Get(needRead)
if n, err := conn.ReadFull(mem.Value); err != nil {
mem.Recycle()
return nil, err return nil, err
} else { } else {
conn.readSeqNum += uint32(n) conn.readSeqNum += uint32(n)
} }
// 如果读完了一个完整的块,那么就返回这个消息,没读完继续递归读块. chunk.AVData.Push(mem)
if currentBody.Len() == msgLen { // println("read chunk body", chunk.MessageTypeID, chunk.AVData.ByteLength, ChunkType, msgLen, chunk.Timestamp, chunk.ExtendTimestamp)
msg = &Chunk{ if chunk.AVData.ByteLength == msgLen {
ChunkHeader: *h, msg = chunk
Body: currentBody.ReadN(msgLen), switch chunk.MessageTypeID {
case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO:
default:
err = GetRtmpMessage(msg, msg.AVData.ToBytes())
msg.AVData.Recycle()
}
conn.incommingChunks[ChunkStreamID] = &Chunk{
ChunkHeader: chunk.ChunkHeader,
} }
err = GetRtmpMessage(msg)
} }
return return
} }
@@ -187,106 +201,53 @@ func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32,
} }
func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) { func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) {
b := conn.tmpBuf[:3]
switch chunkType {
case 0:
{
// Timestamp 3 bytes
if _, err := conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 3
util.GetBE(b, &h.Timestamp) //type = 0的时间戳为绝对时间,其他的都为相对时间
// Message Length 3 bytes conn.tmpBuf.Reset()
if _, err = conn.ReadFull(b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度. b4 := conn.tmpBuf.Malloc(4)
b3 := b4[:3]
if chunkType == 3 {
// 3个字节的时间戳
} else {
// Timestamp 3 bytes
if _, err = conn.ReadFull(b3); err != nil {
return err return err
} }
conn.readSeqNum += 3 util.GetBE(b3, &h.Timestamp)
util.GetBE(b, &h.MessageLength) if chunkType != 2 {
if _, err = conn.ReadFull(b3); err != nil {
return err
}
util.GetBE(b3, &h.MessageLength)
// Message Type ID 1 bytes // Message Type ID 1 bytes
v, err := conn.ReadByte() // 读取Message Type ID if h.MessageTypeID, err = conn.ReadByte(); err != nil {
if err != nil {
return err return err
} }
conn.readSeqNum++ conn.readSeqNum++
h.MessageTypeID = v if chunkType == 0 {
// Message Stream ID 4bytes // Message Stream ID 4bytes
b = conn.tmpBuf if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID
if _, err = conn.ReadFull(b); err != nil { // 读取Message Stream ID
return err return err
} }
conn.readSeqNum += 4 h.MessageStreamID = binary.LittleEndian.Uint32(b4)
h.MessageStreamID = binary.LittleEndian.Uint32(b) }
}
}
// ExtendTimestamp 4 bytes // ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求
if _, err = conn.ReadFull(b); err != nil { if _, err = conn.ReadFull(b4); err != nil {
return err return err
} }
conn.readSeqNum += 4 util.GetBE(b4, &h.ExtendTimestamp)
util.GetBE(b, &h.ExtendTimestamp) } else {
if chunkType == 0 {
h.ExtendTimestamp = h.Timestamp
// println("timestamp", h.Timestamp)
} else if chunkType != 3 {
// println("extend timestamp", chunkType, h.Timestamp, h.ExtendTimestamp)
h.ExtendTimestamp += h.Timestamp
} }
} }
case 1:
{
// Timestamp 3 bytes
if _, err = conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 3
h.ChunkType = chunkType
util.GetBE(b, &h.Timestamp)
// Message Length 3 bytes
if _, err = conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 3
util.GetBE(b, &h.MessageLength)
// Message Type ID 1 bytes
v, err := conn.ReadByte()
if err != nil {
return err
}
conn.readSeqNum++
h.MessageTypeID = v
// ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff {
b = conn.tmpBuf
if _, err := conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 4
util.GetBE(b, &h.ExtendTimestamp)
}
}
case 2:
{
// Timestamp 3 bytes
if _, err = conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 3
h.ChunkType = chunkType
util.GetBE(b, &h.Timestamp)
// ExtendTimestamp 4 bytes
if h.Timestamp == 0xffffff {
b = conn.tmpBuf
if _, err := conn.ReadFull(b); err != nil {
return err
}
conn.readSeqNum += 4
util.GetBE(b, &h.ExtendTimestamp)
}
}
case 3:
{
//h.ChunkType = chunkType
}
}
return nil return nil
} }
@@ -301,8 +262,9 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
switch msg.MessageTypeID { switch msg.MessageTypeID {
case RTMP_MSG_CHUNK_SIZE: case RTMP_MSG_CHUNK_SIZE:
conn.readChunkSize = int(msg.MsgData.(Uint32Message)) conn.readChunkSize = int(msg.MsgData.(Uint32Message))
println("read chunk size", conn.readChunkSize)
case RTMP_MSG_ABORT: case RTMP_MSG_ABORT:
delete(conn.incompleteRtmpBody, uint32(msg.MsgData.(Uint32Message))) delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message)))
case RTMP_MSG_ACK, RTMP_MSG_EDGE: case RTMP_MSG_ACK, RTMP_MSG_EDGE:
case RTMP_MSG_USER_CONTROL: case RTMP_MSG_USER_CONTROL:
if _, ok := msg.MsgData.(*PingRequestMessage); ok { if _, ok := msg.MsgData.(*PingRequestMessage); ok {
@@ -320,11 +282,8 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) {
return return
} }
func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
body := msg.Encode() if conn == nil {
head := newChunkHeader(t) return errors.New("connection is nil")
head.MessageLength = uint32(len(body))
if sid, ok := msg.(HaveStreamID); ok {
head.MessageStreamID = sid.GetStreamID()
} }
if conn.writeSeqNum > conn.bandwidth { if conn.writeSeqNum > conn.bandwidth {
conn.totalWrite += conn.writeSeqNum conn.totalWrite += conn.writeSeqNum
@@ -332,26 +291,28 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) {
err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite)) err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite))
err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0) err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0)
} }
var chunk = net.Buffers{conn.encodeChunk12(head)} conn.tmpBuf.Reset()
if len(body) > conn.writeChunkSize { msg.Encode(&conn.tmpBuf)
chunk = append(chunk, body[:conn.writeChunkSize]) head := newChunkHeader(t)
body = body[conn.writeChunkSize:] head.MessageLength = uint32(conn.tmpBuf.Len())
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) if sid, ok := msg.(HaveStreamID); ok {
_, err = chunk.WriteTo(conn) head.MessageStreamID = sid.GetStreamID()
for len(body) > conn.writeChunkSize {
chunk = append(chunk[:0], conn.encodeChunk12(head), body[:conn.writeChunkSize])
body = body[conn.writeChunkSize:]
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk))
_, err = chunk.WriteTo(conn)
} }
chunk = append(chunk[:0], conn.encodeChunk12(head), body) head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader)
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) for _, chunk := range conn.tmpBuf.Split(conn.writeChunkSize) {
_, err = chunk.WriteTo(conn) conn.sendChunk(chunk)
} else {
chunk = append(chunk, body)
conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk))
_, err = chunk.WriteTo(conn)
} }
return nil return nil
} }
func (conn *NetConnection) sendChunk(writeBuffer ...[]byte) error {
if n, err := conn.Write(conn.chunkHeader); err != nil {
return err
} else {
conn.writeSeqNum += uint32(n)
}
buf := net.Buffers(writeBuffer)
n, err := buf.WriteTo(conn)
conn.writeSeqNum += uint32(n)
return err
}

View File

@@ -1,7 +1,6 @@
package rtmp package rtmp
import ( import (
"bufio"
"context" "context"
"fmt" "fmt"
"net" "net"
@@ -9,7 +8,6 @@ import (
"go.uber.org/zap" "go.uber.org/zap"
"m7s.live/engine/v4" "m7s.live/engine/v4"
"m7s.live/engine/v4/util"
) )
type NetStream struct { type NetStream struct {
@@ -38,16 +36,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
defer conn.Close() defer conn.Close()
senders := make(map[uint32]*RTMPSubscriber) senders := make(map[uint32]*RTMPSubscriber)
receivers := make(map[uint32]*RTMPReceiver) receivers := make(map[uint32]*RTMPReceiver)
nc := &NetConnection{ nc := NewNetConnection(conn)
Conn: conn,
Reader: bufio.NewReader(conn),
writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
readChunkSize: RTMP_DEFAULT_CHUNK_SIZE,
rtmpHeader: make(map[uint32]*ChunkHeader),
incompleteRtmpBody: make(map[uint32]*util.Buffer),
bandwidth: RTMP_MAX_CHUNK_SIZE << 3,
tmpBuf: make([]byte, 4),
}
ctx, cancel := context.WithCancel(engine.Engine) ctx, cancel := context.WithCancel(engine.Engine)
defer cancel() defer cancel()
/* Handshake */ /* Handshake */
@@ -138,7 +127,6 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) {
} }
if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil { if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil {
receivers[cmd.StreamId] = receiver receivers[cmd.StreamId] = receiver
receiver.absTs = make(map[uint32]uint32)
receiver.Begin() receiver.Begin()
err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status) err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status)
} else { } else {