diff --git a/chunk.go b/chunk.go index 0a1dc50..26ea0f1 100644 --- a/chunk.go +++ b/chunk.go @@ -1,8 +1,6 @@ package rtmp import ( - "encoding/binary" - "m7s.live/engine/v4/util" ) @@ -27,45 +25,31 @@ const ( type Chunk struct { ChunkHeader - Body []byte + AVData util.BLL MsgData RtmpMessage } -func (c *Chunk) Encode(msg RtmpMessage) { - c.MsgData = msg - c.Body = msg.Encode() - c.MessageLength = uint32(len(c.Body)) -} - type ChunkHeader struct { - ChunkBasicHeader - ChunkMessageHeader - // Extended Timestamp (0 or 4 bytes): This field is present in certain - // circumstances depending on the encoded timestamp or timestamp - // delta field in the Chunk Message header. See Section 5.3.1.3 for - // more information - - ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略 -} - -// Basic Header (1 to 3 bytes) : This field encodes the chunk stream ID -// and the chunk type. Chunk type determines the format of the -// encoded message header. The length(Basic Header) depends entirely on the chunk -// stream ID, which is a variable-length field. -type ChunkBasicHeader struct { - ChunkStreamID uint32 `json:""` // 6 bit. 3 ~ 65559, 0,1,2 reserved - ChunkType byte `json:""` // 2 bit. -} - -// Message Header (0, 3, 7, or 11 bytes): This field encodes -// information about the message being sent (whether in whole or in -// part). The length can be determined using the chunk type -// specified in the chunk header. -type ChunkMessageHeader struct { + ChunkStreamID uint32 `json:""` Timestamp uint32 `json:""` // 3 byte MessageLength uint32 `json:""` // 3 byte MessageTypeID byte `json:""` // 1 byte MessageStreamID uint32 `json:""` // 4 byte + // Extended Timestamp (0 or 4 bytes): This field is present in certain + // circumstances depending on the encoded timestamp or timestamp + // delta field in the Chunk Message header. See Section 5.3.1.3 for + // more information + ExtendTimestamp uint32 `json:",omitempty"` // 标识该字段的数据可忽略 +} + +func (c *ChunkHeader) SetTimestamp(timestamp uint32) { + if timestamp >= 0xFFFFFF { + c.ExtendTimestamp = timestamp + c.Timestamp = 0xFFFFFF + } else { + c.ExtendTimestamp = 0 + c.Timestamp = timestamp + } } // ChunkBasicHeader会决定ChunkMessgaeHeader,ChunkMessgaeHeader有4种(0,3,7,11 Bytes),因此可能有4种头. @@ -75,49 +59,22 @@ type ChunkMessageHeader struct { // 8 -> ChunkBasicHeader(1) + ChunkMessageHeader(7) // 12 -> ChunkBasicHeader(1) + ChunkMessageHeader(11) -func (nc *NetConnection) encodeChunk12(head *ChunkHeader) []byte { - b := util.Buffer(make([]byte, 0, 16)) - b.WriteByte(byte(RTMP_CHUNK_HEAD_12 + head.ChunkStreamID)) - b.WriteUint24(head.Timestamp) - b.WriteUint24(head.MessageLength) - b.WriteByte(head.MessageTypeID) - binary.LittleEndian.PutUint32(b.Malloc(4), head.MessageStreamID) - if head.ChunkMessageHeader.Timestamp == 0xffffff { - binary.LittleEndian.PutUint32(b.Malloc(4), head.ExtendTimestamp) +func (h *ChunkHeader) WriteTo(t byte, b *util.Buffer) { + b.Reset() + csid := byte(h.ChunkStreamID) + b.WriteByte(t + csid) + + if t < RTMP_CHUNK_HEAD_1 { + b.WriteUint24(h.Timestamp) + if t < RTMP_CHUNK_HEAD_4 { + b.WriteUint24(h.MessageLength) + b.WriteByte(h.MessageTypeID) + if t < RTMP_CHUNK_HEAD_8 { + b.WriteUint32(h.MessageStreamID) + } + } + } + if h.Timestamp == 0xffffff { + b.WriteUint32(h.ExtendTimestamp) } - return b -} - -func (nc *NetConnection) encodeChunk8(head *ChunkHeader) []byte { - b := util.Buffer(make([]byte, 0, 8)) - b.WriteByte(byte(RTMP_CHUNK_HEAD_8 + head.ChunkStreamID)) - b.WriteUint24(head.Timestamp) - b.WriteUint24(head.MessageLength) - b.WriteByte(head.MessageTypeID) - return b -} - -// func (nc *NetConnection) encodeChunk4(head *ChunkHeader, payload []byte, size int) (need []byte, err error) { -// if size > RTMP_MAX_CHUNK_SIZE || payload == nil || len(payload) == 0 { -// return nil, errors.New("chunk error") -// } -// b := make([]byte, 4) -// //chunkBasicHead -// b[0] = byte(RTMP_CHUNK_HEAD_4 + head.ChunkStreamID) -// util.PutBE(b[1:4], head.Timestamp) -// nc.Write(b) -// nc.writeSeqNum += 4 -// if len(payload) > size { -// nc.Write(payload[0:size]) -// nc.writeSeqNum += uint32(size) -// need = payload[size:] -// } else { -// nc.Write(payload) -// nc.writeSeqNum += uint32(len(payload)) -// } -// return -// } - -func (nc *NetConnection) encodeChunk1(head *ChunkHeader) []byte { - return []byte{byte(RTMP_CHUNK_HEAD_1 + head.ChunkStreamID)} } diff --git a/client.go b/client.go index 743bf49..46d13f2 100644 --- a/client.go +++ b/client.go @@ -1,7 +1,6 @@ package rtmp import ( - "bufio" "crypto/tls" "errors" "net" @@ -10,7 +9,6 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4" - "m7s.live/engine/v4/util" ) func NewRTMPClient(addr string) (client *NetConnection, err error) { @@ -49,16 +47,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { conn.Close() } }() - client = &NetConnection{ - Conn: conn, - Reader: bufio.NewReader(conn), - writeChunkSize: conf.ChunkSize, - readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, - rtmpHeader: make(map[uint32]*ChunkHeader), - incompleteRtmpBody: make(map[uint32]*util.Buffer), - bandwidth: RTMP_MAX_CHUNK_SIZE << 3, - tmpBuf: make([]byte, 4), - } + client = NewNetConnection(conn) err = client.ClientHandshake() if err != nil { RTMPPlugin.Error("handshake", zap.Error(err)) @@ -69,6 +58,7 @@ func NewRTMPClient(addr string) (client *NetConnection, err error) { if err != nil { return } + client.writeChunkSize = conf.ChunkSize err = client.SendMessage(RTMP_MSG_AMF0_COMMAND, &CallMessage{ CommandMessage{"connect", 1}, map[string]any{ @@ -135,6 +125,9 @@ func (pusher *RTMPPusher) Push() error { _, streamPath, _ := strings.Cut(URL.Path, "/") _, streamPath, _ = strings.Cut(streamPath, "/") pusher.Args = URL.Query() + if len(pusher.Args) > 0 { + streamPath += "?" + pusher.Args.Encode() + } pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, &PublishMessage{ CURDStreamMessage{ CommandMessage{ @@ -172,13 +165,12 @@ func (puller *RTMPPuller) Connect() (err error) { } func (puller *RTMPPuller) Pull() (err error) { - puller.absTs = make(map[uint32]uint32) defer puller.Stop() err = puller.SendMessage(RTMP_MSG_AMF0_COMMAND, &CommandMessage{"createStream", 2}) for err == nil { msg, err := puller.RecvMessage() if err != nil { - break + return err } switch msg.MessageTypeID { case RTMP_MSG_AUDIO: @@ -199,6 +191,9 @@ func (puller *RTMPPuller) Pull() (err error) { ps := strings.Split(URL.Path, "/") puller.Args = URL.Query() m.StreamName = ps[len(ps)-1] + if len(puller.Args) > 0 { + m.StreamName += "?" + puller.Args.Encode() + } puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m) // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok { // if response.Object["code"] == "NetStream.Play.Start" { diff --git a/main.go b/main.go index 0f234ca..0ba6d30 100644 --- a/main.go +++ b/main.go @@ -62,7 +62,7 @@ func (c *RTMPConfig) OnEvent(event any) { } var conf = &RTMPConfig{ - ChunkSize: 4096, + ChunkSize: 65536, TCP: config.TCP{ListenAddr: ":1935"}, } var RTMPPlugin = InstallPlugin(conf) diff --git a/media.go b/media.go index 0d2cc8b..72622d0 100644 --- a/media.go +++ b/media.go @@ -2,18 +2,19 @@ package rtmp import ( "errors" - "net" "go.uber.org/zap" . "m7s.live/engine/v4" - "m7s.live/engine/v4/util" + "m7s.live/engine/v4/common" ) type RTMPSender struct { Subscriber NetStream - firstAudioSent bool - firstVideoSent bool + firstAudioSent bool + firstVideoSent bool + audioChunkHeader ChunkHeader + videoChunkHeader ChunkHeader } func (rtmp *RTMPSender) OnEvent(event any) { @@ -22,24 +23,27 @@ func (rtmp *RTMPSender) OnEvent(event any) { rtmp.Response(1, NetStream_Play_UnpublishNotify, Response_OnStatus) case SEpublish: rtmp.Response(1, NetStream_Play_PublishNotify, Response_OnStatus) + case ISubscriber: + rtmp.audioChunkHeader.ChunkStreamID = RTMP_CSID_AUDIO + rtmp.videoChunkHeader.ChunkStreamID = RTMP_CSID_VIDEO + rtmp.audioChunkHeader.MessageTypeID = RTMP_MSG_AUDIO + rtmp.videoChunkHeader.MessageTypeID = RTMP_MSG_VIDEO + rtmp.audioChunkHeader.MessageStreamID = rtmp.StreamID + rtmp.videoChunkHeader.MessageStreamID = rtmp.StreamID case AudioDeConf: - rtmp.sendAVMessage(0, v.AVCC, true, true) + rtmp.audioChunkHeader.SetTimestamp(0) + rtmp.audioChunkHeader.MessageLength = uint32(len(v)) + rtmp.audioChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader) + rtmp.sendChunk(v) case VideoDeConf: - rtmp.sendAVMessage(0, v.AVCC, false, true) - case *AudioFrame: - if rtmp.firstAudioSent { - rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) - } else { - rtmp.firstAudioSent = true - rtmp.sendAVMessage(v.AbsTime, v.AVCC, true, true) - } - case *VideoFrame: - if rtmp.firstVideoSent { - rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false) - } else { - rtmp.firstVideoSent = true - rtmp.sendAVMessage(v.AbsTime, v.AVCC, false, true) - } + rtmp.videoChunkHeader.SetTimestamp(0) + rtmp.videoChunkHeader.MessageLength = uint32(len(v)) + rtmp.videoChunkHeader.WriteTo(RTMP_CHUNK_HEAD_12, &rtmp.chunkHeader) + rtmp.sendChunk(v) + case AudioFrame: + rtmp.sendAVMessage(v.AVFrame, v.AbsTime, true) + case VideoFrame: + rtmp.sendAVMessage(v.AVFrame, v.AbsTime, false) default: rtmp.Subscriber.OnEvent(event) } @@ -48,8 +52,8 @@ func (rtmp *RTMPSender) OnEvent(event any) { // 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 -func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) { - payloadLen := util.SizeOfBuffers(payload) +func (sender *RTMPSender) sendAVMessage(frame *common.AVFrame, absTime uint32, isAudio bool) (err error) { + payloadLen := frame.AVCC.ByteLength if payloadLen == 0 { err := errors.New("payload is empty") sender.Error("payload is empty", zap.Error(err)) @@ -61,35 +65,42 @@ func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite)) sender.SendStreamID(RTMP_USER_PING_REQUEST, 0) } - var head *ChunkHeader - if isAudio { - head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(payloadLen), RTMP_MSG_AUDIO, sender.StreamID, 0) - } else { - head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(payloadLen), RTMP_MSG_VIDEO, sender.StreamID, 0) - } + var isFirst = false + if isAudio { + head = &sender.audioChunkHeader + if isFirst = !sender.firstAudioSent; isFirst { + sender.firstAudioSent = true + } + } else { + head = &sender.videoChunkHeader + if isFirst = !sender.firstVideoSent; isFirst { + sender.firstVideoSent = true + } + } + head.MessageLength = uint32(payloadLen) // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) // 当Chunk Type为0时(即Chunk12), - var chunk1 net.Buffers if isFirst { - chunk1 = append(chunk1, sender.encodeChunk12(head)) + head.SetTimestamp(absTime) + head.WriteTo(RTMP_CHUNK_HEAD_12, &sender.chunkHeader) } else { - chunk1 = append(chunk1, sender.encodeChunk8(head)) + head.SetTimestamp(frame.DeltaTime) + head.WriteTo(RTMP_CHUNK_HEAD_8, &sender.chunkHeader) } - chunks := util.SplitBuffers(payload, sender.writeChunkSize) - chunk1 = append(chunk1, chunks[0]...) - sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) - _, err = chunk1.WriteTo(sender.NetConnection) - // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) - for _, chunk := range chunks[1:] { - chunk1 = net.Buffers{sender.encodeChunk1(head)} - chunk1 = append(chunk1, chunk...) - sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) - _, err = chunk1.WriteTo(sender.NetConnection) + r := frame.AVCC.NewReader() + chunk := r.ReadN(sender.writeChunkSize) + // payloadLen -= util.SizeOfBuffers(chunk) + sender.sendChunk(chunk...) + if r.CanRead() { + // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) + for head.WriteTo(RTMP_CHUNK_HEAD_1, &sender.chunkHeader); r.CanRead(); sender.sendChunk(chunk...) { + chunk = r.ReadN(sender.writeChunkSize) + // payloadLen -= util.SizeOfBuffers(chunk) + } } - return nil } @@ -97,7 +108,7 @@ func (r *RTMPSender) Response(tid uint64, code, level string) error { m := new(ResponsePlayMessage) m.CommandName = Response_OnStatus m.TransactionId = tid - m.Object = map[string]any{ + m.Infomation = map[string]any{ "code": code, "level": level, "description": "", @@ -109,7 +120,6 @@ func (r *RTMPSender) Response(tid uint64, code, level string) error { type RTMPReceiver struct { Publisher NetStream - absTs map[uint32]uint32 } func (r *RTMPReceiver) Response(tid uint64, code, level string) error { @@ -127,43 +137,20 @@ func (r *RTMPReceiver) Response(tid uint64, code, level string) error { func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) { if r.AudioTrack == nil { - r.absTs[msg.ChunkStreamID] = 0 - r.WriteAVCCAudio(0, msg.Body) + if r.WriteAVCCAudio(0, msg.AVData); r.AudioTrack != nil { + r.AudioTrack.SetStuff(r.bytePool) + } return } - ts := msg.Timestamp - if ts == 0xffffff { - ts = msg.ExtendTimestamp - } - if msg.ChunkType == 0 { - if r.AudioTrack.GetBase().Name == "" { - r.absTs[msg.ChunkStreamID] = 0 - } else { - r.absTs[msg.ChunkStreamID] = ts - } - } else { - r.absTs[msg.ChunkStreamID] += ts - } - r.AudioTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) + r.AudioTrack.WriteAVCC(msg.ExtendTimestamp, msg.AVData) } + func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) { if r.VideoTrack == nil { - r.absTs[msg.ChunkStreamID] = 0 - r.WriteAVCCVideo(0, msg.Body) + if r.WriteAVCCVideo(0, msg.AVData); r.VideoTrack != nil { + r.VideoTrack.SetStuff(r.bytePool) + } return } - ts := msg.Timestamp - if ts == 0xffffff { - ts = msg.ExtendTimestamp - } - if msg.ChunkType == 0 { - if r.VideoTrack.GetBase().Name == "" { - r.absTs[msg.ChunkStreamID] = 0 - } else { - r.absTs[msg.ChunkStreamID] = ts - } - } else { - r.absTs[msg.ChunkStreamID] += ts - } - r.VideoTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) + r.VideoTrack.WriteAVCC(msg.ExtendTimestamp, msg.AVData) } diff --git a/msg.go b/msg.go index 55d78ce..2474185 100644 --- a/msg.go +++ b/msg.go @@ -6,7 +6,6 @@ import ( "strings" "go.uber.org/zap" - "m7s.live/engine/v4/codec" "m7s.live/engine/v4/util" ) @@ -72,20 +71,7 @@ func newChunkHeader(messageType byte) *ChunkHeader { if messageType == RTMP_MSG_AMF0_COMMAND { head.ChunkStreamID = RTMP_CSID_COMMAND } - head.Timestamp = 0 head.MessageTypeID = messageType - head.MessageStreamID = 0 - head.ExtendTimestamp = 0 - return head -} -func newRtmpHeader(chunkID uint32, timestamp uint32, messageLength uint32, messageType byte, messageStreamID uint32, extendTimestamp uint32) *ChunkHeader { - head := new(ChunkHeader) - head.ChunkStreamID = chunkID - head.Timestamp = timestamp - head.MessageLength = messageLength - head.MessageTypeID = messageType - head.MessageStreamID = messageStreamID - head.ExtendTimestamp = extendTimestamp return head } @@ -94,23 +80,22 @@ func (h ChunkHeader) Clone() *ChunkHeader { } type RtmpMessage interface { - Encode() []byte + Encode(*util.Buffer) } type HaveStreamID interface { GetStreamID() uint32 } -func GetRtmpMessage(chunk *Chunk) error { - body := util.Buffer(chunk.Body) +func GetRtmpMessage(chunk *Chunk, body util.Buffer) error { switch chunk.MessageTypeID { case RTMP_MSG_CHUNK_SIZE, RTMP_MSG_ABORT, RTMP_MSG_ACK, RTMP_MSG_ACK_SIZE: - if len(chunk.Body) < 4 { + if body.Len() < 4 { return errors.New("chunk.Body < 4") } chunk.MsgData = Uint32Message(body.ReadUint32()) case RTMP_MSG_USER_CONTROL: // RTMP消息类型ID=4, 用户控制消息.客户端或服务端发送本消息通知对方用户的控制事件. { - if len(chunk.Body) < 2 { + if body.Len() < 2 { return errors.New("UserControlMessage.Body < 2") } base := UserControlMessage{ @@ -153,7 +138,7 @@ func GetRtmpMessage(chunk *Chunk) error { } } case RTMP_MSG_BANDWIDTH: // RTMP消息类型ID=6, 置对等端带宽.客户端或服务端发送本消息更新对等端的输出带宽. - if len(chunk.Body) < 4 { + if body.Len() < 4 { return errors.New("chunk.Body < 4") } m := &SetPeerBandwidthMessage{ @@ -169,11 +154,11 @@ func GetRtmpMessage(chunk *Chunk) error { case RTMP_MSG_AMF3_METADATA: // RTMP消息类型ID=15, 数据消息.用AMF3编码. case RTMP_MSG_AMF3_SHARED: // RTMP消息类型ID=16, 共享对象消息.用AMF3编码. case RTMP_MSG_AMF3_COMMAND: // RTMP消息类型ID=17, 命令消息.用AMF3编码. - decodeCommandAMF3(chunk) + decodeCommandAMF0(chunk, body[1:]) case RTMP_MSG_AMF0_METADATA: // RTMP消息类型ID=18, 数据消息.用AMF0编码. case RTMP_MSG_AMF0_SHARED: // RTMP消息类型ID=19, 共享对象消息.用AMF0编码. case RTMP_MSG_AMF0_COMMAND: // RTMP消息类型ID=20, 命令消息.用AMF0编码. - decodeCommandAMF0(chunk) // 解析具体的命令消息 + decodeCommandAMF0(chunk, body) // 解析具体的命令消息 case RTMP_MSG_AGGREGATE: default: } @@ -198,8 +183,8 @@ func GetRtmpMessage(chunk *Chunk) error { // object类型要复杂点. // 第一个byte是03表示object,其后跟的是N个(key+value).最后以00 00 09表示object结束 -func decodeCommandAMF0(chunk *Chunk) { - amf := codec.AMF{chunk.Body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去. +func decodeCommandAMF0(chunk *Chunk, body []byte) { + amf := util.AMF{body} // rtmp_amf.go, amf 是 bytes类型, 将rtmp body(payload)放到bytes.Buffer(amf)中去. cmd := amf.ReadShortString() // rtmp_amf.go, 将payload的bytes类型转换成string类型. cmdMsg := CommandMessage{ cmd, @@ -328,7 +313,6 @@ func decodeCommandAMF0(chunk *Chunk) { chunk.MsgData = &ResponsePlayMessage{ cmdMsg, response.Infomation, - "", chunk.MessageStreamID, } } else { @@ -342,11 +326,6 @@ func decodeCommandAMF0(chunk *Chunk) { } } -func decodeCommandAMF3(chunk *Chunk) { - chunk.Body = chunk.Body[1:] - decodeCommandAMF0(chunk) -} - /* Command Message */ type CommandMessage struct { CommandName string // 命令名. 字符串. 命令名.设置为"connect" @@ -360,8 +339,8 @@ func (cmd *CommandMessage) GetCommand() *CommandMessage { return cmd } -func (msg *CommandMessage) Encode() (b []byte) { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil) +func (msg *CommandMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil) } // Protocol control message 1. @@ -370,10 +349,8 @@ func (msg *CommandMessage) Encode() (b []byte) { // chunk size (31 bits): This field holds the new maximum chunk size,in bytes, which will be used for all of the sender’s subsequent chunks until further notice type Uint32Message uint32 -func (msg Uint32Message) Encode() (b []byte) { - b = make([]byte, 4) - binary.BigEndian.PutUint32(b, uint32(msg)) - return b +func (msg Uint32Message) Encode(buf *util.Buffer) { + binary.BigEndian.PutUint32(buf.Malloc(4), uint32(msg)) } // Protocol control message 4, User Control Messages. @@ -397,11 +374,9 @@ type SetPeerBandwidthMessage struct { LimitType byte } -func (msg *SetPeerBandwidthMessage) Encode() (b []byte) { - b = make([]byte, 5) - binary.BigEndian.PutUint32(b, msg.AcknowledgementWindowsize) - b[4] = msg.LimitType - return +func (msg *SetPeerBandwidthMessage) Encode(buf *util.Buffer) { + buf.WriteUint32(msg.AcknowledgementWindowsize) + buf.WriteByte(msg.LimitType) } // Message 15, 18. Data Message. The client or the server sends this message to send Metadata or any @@ -431,20 +406,18 @@ type CallMessage struct { Optional map[string]any `json:",omitempty"` } -func (msg *CallMessage) Encode() []byte { - var amf codec.AMF - amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object) +func (msg *CallMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object) if msg.Optional != nil { - amf.Marshal(msg.Optional) + buf.MarshalAMFs(msg.Optional) } - return amf.Buffer } -func (msg *CallMessage) Encode3() []byte { - var amf codec.AMF - amf.WriteByte(0) - return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional) -} +// func (msg *CallMessage) Encode3() []byte { +// var amf util.AMF +// amf.WriteByte(0) +// return amf.Marshals(msg.CommandName, msg.TransactionId, msg.Object, msg.Optional) +// } // Create Stream Message. // The client sends this command to the server to create a logical channel for message communication The publishing of audio, @@ -492,8 +465,7 @@ type PlayMessage struct { // Duration -> 可选的参数,以秒为单位定义了回放的持续时间.默认值为 -1.-1 值意味着一个直播流会一直播放直到它不再可用或者一个录制流一直播放直到结束 // Reset -> 可选的布尔值或者数字定义了是否对以前的播放列表进行 flush -func (msg *PlayMessage) Encode() []byte { - var amf codec.AMF +func (msg *PlayMessage) Encode(buf *util.Buffer) { // if msg.Start > 0 { // amf.writeNumber(msg.Start) // } @@ -503,7 +475,7 @@ func (msg *PlayMessage) Encode() []byte { // } // amf.writeBool(msg.Reset) - return amf.Marshals(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamName, -2000) } /* @@ -576,8 +548,8 @@ type PublishMessage struct { // “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件. // “live”:发布直播数据而不录制到文件 -func (msg *PublishMessage) Encode() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) +func (msg *PublishMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.PublishingName, msg.PublishingType) } // Seek Message @@ -618,8 +590,8 @@ type ResponseConnectMessage struct { Infomation map[string]any `json:",omitempty"` } -func (msg *ResponseConnectMessage) Encode() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) +func (msg *ResponseConnectMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) } /* @@ -633,9 +605,9 @@ type ResponseCallMessage struct { Response map[string]any } -func (msg *ResponseCallMessage) Encode0() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response) -} +// func (msg *ResponseCallMessage) Encode0() []byte { +// return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Object, msg.Response) +// } // Response Create Stream Message type ResponseCreateStreamMessage struct { @@ -644,55 +616,56 @@ type ResponseCreateStreamMessage struct { StreamId uint32 } -func (msg *ResponseCreateStreamMessage) Encode() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId) +func (msg *ResponseCreateStreamMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.StreamId) } /* func (msg *ResponseCreateStreamMessage) Encode3() { }*/ -func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) { - amf := codec.AMF{chunk.Body} - msg.CommandName = amf.ReadShortString() - msg.TransactionId = uint64(amf.ReadNumber()) - amf.Unmarshal() - msg.StreamId = uint32(amf.ReadNumber()) -} -func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) { - chunk.Body = chunk.Body[1:] - msg.Decode0(chunk) -} +// func (msg *ResponseCreateStreamMessage) Decode0(chunk *Chunk) { +// amf := util.AMF{chunk.Body} +// msg.CommandName = amf.ReadShortString() +// msg.TransactionId = uint64(amf.ReadNumber()) +// amf.Unmarshal() +// msg.StreamId = uint32(amf.ReadNumber()) +// } + +// func (msg *ResponseCreateStreamMessage) Decode3(chunk *Chunk) { +// chunk.Body = chunk.Body[1:] +// msg.Decode0(chunk) +// } // Response Play Message type ResponsePlayMessage struct { CommandMessage - Object map[string]any `json:",omitempty"` - Description string - StreamID uint32 + Infomation map[string]any `json:",omitempty"` + StreamID uint32 } func (msg *ResponsePlayMessage) GetStreamID() uint32 { return msg.StreamID } -func (msg *ResponsePlayMessage) Encode() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Object, msg.Description) +func (msg *ResponsePlayMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, nil, msg.Infomation) } /* func (msg *ResponsePlayMessage) Encode3() { }*/ -func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) { - amf := codec.AMF{chunk.Body} - msg.CommandName = amf.ReadShortString() - msg.TransactionId = uint64(amf.ReadNumber()) - msg.Object = amf.ReadObject() -} -func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) { - chunk.Body = chunk.Body[1:] - msg.Decode0(chunk) -} +// func (msg *ResponsePlayMessage) Decode0(chunk *Chunk) { +// amf := util.AMF{chunk.Body} +// msg.CommandName = amf.ReadShortString() +// msg.TransactionId = uint64(amf.ReadNumber()) +// msg.Infomation = amf.ReadObject() +// } + +// func (msg *ResponsePlayMessage) Decode3(chunk *Chunk) { +// chunk.Body = chunk.Body[1:] +// msg.Decode0(chunk) +// } // Response Publish Message type ResponsePublishMessage struct { @@ -711,8 +684,8 @@ func (msg *ResponsePublishMessage) GetStreamID() uint32 { // 属性 -> null // 信息 -> level, code, description -func (msg *ResponsePublishMessage) Encode() []byte { - return codec.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) +func (msg *ResponsePublishMessage) Encode(buf *util.Buffer) { + buf.MarshalAMFs(msg.CommandName, msg.TransactionId, msg.Properties, msg.Infomation) } /* @@ -771,12 +744,10 @@ type StreamIDMessage struct { StreamID uint32 } -func (msg *StreamIDMessage) Encode() (b []byte) { - buf := util.Buffer(make([]byte, 0, 6)) - buf.WriteUint16(msg.EventType) - buf.WriteUint32(msg.StreamID) - msg.EventData = buf[2:] - return buf +func (msg *StreamIDMessage) Encode(buffer *util.Buffer) { + buffer.WriteUint16(msg.EventType) + msg.EventData = buffer.Malloc(4) + binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) } // SetBuffer Length (=3) @@ -789,13 +760,11 @@ type SetBufferMessage struct { Millisecond uint32 } -func (msg *SetBufferMessage) Encode() []byte { - buf := util.Buffer(make([]byte, 0, 10)) +func (msg *SetBufferMessage) Encode(buf *util.Buffer) { buf.WriteUint16(msg.EventType) - buf.WriteUint32(msg.StreamID) - buf.WriteUint32(msg.Millisecond) - msg.EventData = buf[2:] - return buf + msg.EventData = buf.Malloc(8) + binary.BigEndian.PutUint32(msg.EventData, msg.StreamID) + binary.BigEndian.PutUint32(msg.EventData[4:], msg.Millisecond) } // PingRequest (=6) @@ -807,19 +776,12 @@ type PingRequestMessage struct { Timestamp uint32 } -func (msg *PingRequestMessage) Encode() (b []byte) { - buf := util.Buffer(make([]byte, 0, 6)) +func (msg *PingRequestMessage) Encode(buf *util.Buffer) { buf.WriteUint16(msg.EventType) - buf.WriteUint32(msg.Timestamp) - msg.EventData = buf[2:] - return buf + msg.EventData = buf.Malloc(4) + binary.BigEndian.PutUint32(msg.EventData, msg.Timestamp) } -func (msg *UserControlMessage) Encode() []byte { - return util.PutBE(make([]byte, 2), msg.EventType) -} - -type AVPack struct { - Timestamp uint32 - Payload []byte +func (msg *UserControlMessage) Encode(buf *util.Buffer) { + buf.WriteUint16(msg.EventType) } diff --git a/netConnection.go b/netConnection.go index 93a04c3..34b2858 100644 --- a/netConnection.go +++ b/netConnection.go @@ -43,24 +43,42 @@ const ( ) type NetConnection struct { - *bufio.Reader `json:"-"` - net.Conn `json:"-"` - bandwidth uint32 - readSeqNum uint32 // 当前读的字节 - writeSeqNum uint32 // 当前写的字节 - totalWrite uint32 // 总共写了多少字节 - totalRead uint32 // 总共读了多少字节 - writeChunkSize int - readChunkSize int - incompleteRtmpBody map[uint32]*util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 - rtmpHeader map[uint32]*ChunkHeader // RtmpHeader - objectEncoding float64 - appName string - tmpBuf []byte //用来接收小数据,复用内存 + *bufio.Reader `json:"-"` + net.Conn `json:"-"` + bandwidth uint32 + readSeqNum uint32 // 当前读的字节 + writeSeqNum uint32 // 当前写的字节 + totalWrite uint32 // 总共写了多少字节 + totalRead uint32 // 总共读了多少字节 + writeChunkSize int + readChunkSize int + incommingChunks map[uint32]*Chunk + objectEncoding float64 + appName string + tmpBuf util.Buffer //用来接收/发送小数据,复用内存 + chunkHeader util.Buffer + bytePool util.BytesPool } +func NewNetConnection(conn net.Conn) *NetConnection { + return &NetConnection{ + Conn: conn, + Reader: bufio.NewReader(conn), + writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, + readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, + incommingChunks: make(map[uint32]*Chunk), + bandwidth: RTMP_MAX_CHUNK_SIZE << 3, + tmpBuf: make(util.Buffer, 4), + chunkHeader: make(util.Buffer, 0, 16), + bytePool: make(util.BytesPool, 16), + } +} func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) { - return io.ReadFull(conn.Reader, buf) + n, err = io.ReadFull(conn.Reader, buf) + if err == nil { + conn.readSeqNum += uint32(n) + } + return } func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) { return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID}) @@ -94,62 +112,58 @@ func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) err func (conn *NetConnection) readChunk() (msg *Chunk, err error) { head, err := conn.ReadByte() - conn.readSeqNum++ if err != nil { return nil, err } - + conn.readSeqNum++ ChunkStreamID := uint32(head & 0x3f) // 0011 1111 ChunkType := head >> 6 // 1100 0000 - // 如果块流ID为0,1的话,就需要计算. ChunkStreamID, err = conn.readChunkStreamID(ChunkStreamID) if err != nil { return nil, errors.New("get chunk stream id error :" + err.Error()) } + // println("ChunkStreamID:", ChunkStreamID, "ChunkType:", ChunkType) + chunk, ok := conn.incommingChunks[ChunkStreamID] - h, ok := conn.rtmpHeader[ChunkStreamID] - if !ok { - h = &ChunkHeader{ - ChunkBasicHeader: ChunkBasicHeader{ - ChunkStreamID, - ChunkType, - }, - } - conn.rtmpHeader[ChunkStreamID] = h - } - currentBody, ok := conn.incompleteRtmpBody[ChunkStreamID] - if ChunkType != 3 && ok && currentBody.Len() > 0 { + if ChunkType != 3 && ok && chunk.AVData.Length > 0 { // 如果块类型不为3,那么这个rtmp的body应该为空. return nil, errors.New("incompleteRtmpBody error") } - if err = conn.readChunkType(h, ChunkType); err != nil { + if !ok { + chunk = &Chunk{} + conn.incommingChunks[ChunkStreamID] = chunk + } + + if err = conn.readChunkType(&chunk.ChunkHeader, ChunkType); err != nil { return nil, errors.New("get chunk type error :" + err.Error()) } - msgLen := int(h.MessageLength) - - if !ok { - newBuffer := util.Buffer(make([]byte, 0, msgLen)) - currentBody = &newBuffer - conn.incompleteRtmpBody[ChunkStreamID] = currentBody - } + msgLen := int(chunk.MessageLength) needRead := conn.readChunkSize - if unRead := msgLen - currentBody.Len(); unRead < needRead { + if unRead := msgLen - chunk.AVData.ByteLength; unRead < needRead { needRead = unRead } - if n, err := conn.ReadFull(currentBody.Malloc(needRead)); err != nil { + mem := conn.bytePool.Get(needRead) + if n, err := conn.ReadFull(mem.Value); err != nil { + mem.Recycle() return nil, err } else { conn.readSeqNum += uint32(n) } - // 如果读完了一个完整的块,那么就返回这个消息,没读完继续递归读块. - if currentBody.Len() == msgLen { - msg = &Chunk{ - ChunkHeader: *h, - Body: currentBody.ReadN(msgLen), + chunk.AVData.Push(mem) + // println("read chunk body", chunk.MessageTypeID, chunk.AVData.ByteLength, ChunkType, msgLen, chunk.Timestamp, chunk.ExtendTimestamp) + if chunk.AVData.ByteLength == msgLen { + msg = chunk + switch chunk.MessageTypeID { + case RTMP_MSG_AUDIO, RTMP_MSG_VIDEO: + default: + err = GetRtmpMessage(msg, msg.AVData.ToBytes()) + msg.AVData.Recycle() + } + conn.incommingChunks[ChunkStreamID] = &Chunk{ + ChunkHeader: chunk.ChunkHeader, } - err = GetRtmpMessage(msg) } return } @@ -187,106 +201,53 @@ func (conn *NetConnection) readChunkStreamID(csid uint32) (chunkStreamID uint32, } func (conn *NetConnection) readChunkType(h *ChunkHeader, chunkType byte) (err error) { - b := conn.tmpBuf[:3] - switch chunkType { - case 0: - { - // Timestamp 3 bytes - if _, err := conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 3 - util.GetBE(b, &h.Timestamp) //type = 0的时间戳为绝对时间,其他的都为相对时间 - // Message Length 3 bytes - if _, err = conn.ReadFull(b); err != nil { // 读取Message Length,这里的长度指的是一条信令或者一帧视频数据或音频数据的长度,而不是Chunk data的长度. + conn.tmpBuf.Reset() + b4 := conn.tmpBuf.Malloc(4) + b3 := b4[:3] + if chunkType == 3 { + // 3个字节的时间戳 + } else { + // Timestamp 3 bytes + if _, err = conn.ReadFull(b3); err != nil { + return err + } + util.GetBE(b3, &h.Timestamp) + if chunkType != 2 { + if _, err = conn.ReadFull(b3); err != nil { return err } - conn.readSeqNum += 3 - util.GetBE(b, &h.MessageLength) + util.GetBE(b3, &h.MessageLength) // Message Type ID 1 bytes - v, err := conn.ReadByte() // 读取Message Type ID - if err != nil { + if h.MessageTypeID, err = conn.ReadByte(); err != nil { return err } conn.readSeqNum++ - h.MessageTypeID = v - - // Message Stream ID 4bytes - b = conn.tmpBuf - if _, err = conn.ReadFull(b); err != nil { // 读取Message Stream ID - return err - } - conn.readSeqNum += 4 - h.MessageStreamID = binary.LittleEndian.Uint32(b) - - // ExtendTimestamp 4 bytes - if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 - if _, err = conn.ReadFull(b); err != nil { + if chunkType == 0 { + // Message Stream ID 4bytes + if _, err = conn.ReadFull(b4); err != nil { // 读取Message Stream ID return err } - conn.readSeqNum += 4 - util.GetBE(b, &h.ExtendTimestamp) + h.MessageStreamID = binary.LittleEndian.Uint32(b4) } } - case 1: - { - // Timestamp 3 bytes - if _, err = conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 3 - h.ChunkType = chunkType - util.GetBE(b, &h.Timestamp) - // Message Length 3 bytes - if _, err = conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 3 - util.GetBE(b, &h.MessageLength) - // Message Type ID 1 bytes - v, err := conn.ReadByte() - if err != nil { - return err - } - conn.readSeqNum++ - h.MessageTypeID = v - - // ExtendTimestamp 4 bytes - if h.Timestamp == 0xffffff { - b = conn.tmpBuf - if _, err := conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 4 - util.GetBE(b, &h.ExtendTimestamp) - } - } - case 2: - { - // Timestamp 3 bytes - if _, err = conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 3 - h.ChunkType = chunkType - util.GetBE(b, &h.Timestamp) - // ExtendTimestamp 4 bytes - if h.Timestamp == 0xffffff { - b = conn.tmpBuf - if _, err := conn.ReadFull(b); err != nil { - return err - } - conn.readSeqNum += 4 - util.GetBE(b, &h.ExtendTimestamp) - } - } - case 3: - { - //h.ChunkType = chunkType - } } + // ExtendTimestamp 4 bytes + if h.Timestamp == 0xffffff { // 对于type 0的chunk,绝对时间戳在这里表示,如果时间戳值大于等于0xffffff(16777215),该值必须是0xffffff,且时间戳扩展字段必须发送,其他情况没有要求 + if _, err = conn.ReadFull(b4); err != nil { + return err + } + util.GetBE(b4, &h.ExtendTimestamp) + } else { + if chunkType == 0 { + h.ExtendTimestamp = h.Timestamp + // println("timestamp", h.Timestamp) + } else if chunkType != 3 { + // println("extend timestamp", chunkType, h.Timestamp, h.ExtendTimestamp) + h.ExtendTimestamp += h.Timestamp + } + } return nil } @@ -301,8 +262,9 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) { switch msg.MessageTypeID { case RTMP_MSG_CHUNK_SIZE: conn.readChunkSize = int(msg.MsgData.(Uint32Message)) + println("read chunk size", conn.readChunkSize) case RTMP_MSG_ABORT: - delete(conn.incompleteRtmpBody, uint32(msg.MsgData.(Uint32Message))) + delete(conn.incommingChunks, uint32(msg.MsgData.(Uint32Message))) case RTMP_MSG_ACK, RTMP_MSG_EDGE: case RTMP_MSG_USER_CONTROL: if _, ok := msg.MsgData.(*PingRequestMessage); ok { @@ -320,11 +282,8 @@ func (conn *NetConnection) RecvMessage() (msg *Chunk, err error) { return } func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { - body := msg.Encode() - head := newChunkHeader(t) - head.MessageLength = uint32(len(body)) - if sid, ok := msg.(HaveStreamID); ok { - head.MessageStreamID = sid.GetStreamID() + if conn == nil { + return errors.New("connection is nil") } if conn.writeSeqNum > conn.bandwidth { conn.totalWrite += conn.writeSeqNum @@ -332,26 +291,28 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite)) err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0) } - var chunk = net.Buffers{conn.encodeChunk12(head)} - if len(body) > conn.writeChunkSize { - chunk = append(chunk, body[:conn.writeChunkSize]) - body = body[conn.writeChunkSize:] - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) - _, err = chunk.WriteTo(conn) - for len(body) > conn.writeChunkSize { - chunk = append(chunk[:0], conn.encodeChunk12(head), body[:conn.writeChunkSize]) - body = body[conn.writeChunkSize:] - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) - _, err = chunk.WriteTo(conn) - } - chunk = append(chunk[:0], conn.encodeChunk12(head), body) - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) - _, err = chunk.WriteTo(conn) - } else { - chunk = append(chunk, body) - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk)) - _, err = chunk.WriteTo(conn) + conn.tmpBuf.Reset() + msg.Encode(&conn.tmpBuf) + head := newChunkHeader(t) + head.MessageLength = uint32(conn.tmpBuf.Len()) + if sid, ok := msg.(HaveStreamID); ok { + head.MessageStreamID = sid.GetStreamID() + } + head.WriteTo(RTMP_CHUNK_HEAD_12, &conn.chunkHeader) + for _, chunk := range conn.tmpBuf.Split(conn.writeChunkSize) { + conn.sendChunk(chunk) } - return nil } + +func (conn *NetConnection) sendChunk(writeBuffer ...[]byte) error { + if n, err := conn.Write(conn.chunkHeader); err != nil { + return err + } else { + conn.writeSeqNum += uint32(n) + } + buf := net.Buffers(writeBuffer) + n, err := buf.WriteTo(conn) + conn.writeSeqNum += uint32(n) + return err +} diff --git a/server.go b/server.go index e7bc6c7..9b1ac51 100644 --- a/server.go +++ b/server.go @@ -1,7 +1,6 @@ package rtmp import ( - "bufio" "context" "fmt" "net" @@ -9,7 +8,6 @@ import ( "go.uber.org/zap" "m7s.live/engine/v4" - "m7s.live/engine/v4/util" ) type NetStream struct { @@ -38,16 +36,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { defer conn.Close() senders := make(map[uint32]*RTMPSubscriber) receivers := make(map[uint32]*RTMPReceiver) - nc := &NetConnection{ - Conn: conn, - Reader: bufio.NewReader(conn), - writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, - readChunkSize: RTMP_DEFAULT_CHUNK_SIZE, - rtmpHeader: make(map[uint32]*ChunkHeader), - incompleteRtmpBody: make(map[uint32]*util.Buffer), - bandwidth: RTMP_MAX_CHUNK_SIZE << 3, - tmpBuf: make([]byte, 4), - } + nc := NewNetConnection(conn) ctx, cancel := context.WithCancel(engine.Engine) defer cancel() /* Handshake */ @@ -138,7 +127,6 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { } if RTMPPlugin.Publish(nc.appName+"/"+cmd.PublishingName, receiver) == nil { receivers[cmd.StreamId] = receiver - receiver.absTs = make(map[uint32]uint32) receiver.Begin() err = receiver.Response(cmd.TransactionId, NetStream_Publish_Start, Level_Status) } else {