diff --git a/amf.go b/amf.go index a5acfdc..b72b93c 100644 --- a/amf.go +++ b/amf.go @@ -1,8 +1,8 @@ package rtmp import ( - "github.com/Monibuca/engine/v4/util" + "go.uber.org/zap" ) // Action Message Format -- AMF 0 @@ -97,14 +97,14 @@ func (amf *AMF) decodeObject() (obj AMFValue) { case AMF0_OBJECT: return amf.readObject() case AMF0_MOVIECLIP: - plugin.Println("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)") + plugin.Error("This type is not supported and is reserved for future use.(AMF0_MOVIECLIP)") case AMF0_NULL: return amf.readNull() case AMF0_UNDEFINED: amf.ReadByte() return Undefined case AMF0_REFERENCE: - plugin.Println("reference-type.(AMF0_REFERENCE)") + plugin.Error("reference-type.(AMF0_REFERENCE)") case AMF0_ECMA_ARRAY: return amf.readECMAArray() case AMF0_END_OBJECT: @@ -118,15 +118,15 @@ func (amf *AMF) decodeObject() (obj AMFValue) { AMF0_XML_DOCUMENT: return amf.readLongString() case AMF0_UNSUPPORTED: - plugin.Println("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)") + plugin.Error("If a type cannot be serialized a special unsupported marker can be used in place of the type.(AMF0_UNSUPPORTED)") case AMF0_RECORDSET: - plugin.Println("This type is not supported and is reserved for future use.(AMF0_RECORDSET)") + plugin.Error("This type is not supported and is reserved for future use.(AMF0_RECORDSET)") case AMF0_TYPED_OBJECT: - plugin.Println("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)") + plugin.Error("If a strongly typed object has an alias registered for its class then the type name will also be serialized. Typed objects are considered complex types and reoccurring instances can be sent by reference.(AMF0_TYPED_OBJECT)") case AMF0_AVMPLUS_OBJECT: - plugin.Println("AMF0_AVMPLUS_OBJECT") + plugin.Error("AMF0_AVMPLUS_OBJECT") default: - plugin.Warnf("Unsupported type %v", t) + plugin.Error("Unsupported type", zap.Uint8("type", t)) } return nil } @@ -280,4 +280,4 @@ func (amf *AMF) writeObjectBool(key string, f bool) { func (amf *AMF) writeObjectNumber(key string, value float64) { amf.writeKey(key) amf.writeNumber(value) -} \ No newline at end of file +} diff --git a/client.go b/client.go index 49e0f40..51c2e04 100644 --- a/client.go +++ b/client.go @@ -8,24 +8,21 @@ import ( "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/util" + "go.uber.org/zap" ) -type RTMPClient struct { - NetConnection -} - -func (client *RTMPClient) Connect(addr string) bool { +func NewRTMPClient(addr string) (client *NetConnection) { u, err := url.Parse(addr) if err != nil { - plugin.Error(err) - return false + plugin.Error("connect url parse", zap.Error(err)) + return } conn, err := net.Dial("tcp", u.Host) if err != nil { - plugin.Error(err) - return false + plugin.Error("dial tcp", zap.String("host", u.Host), zap.Error(err)) + return } - client.NetConnection = NetConnection{ + client = &NetConnection{ TCPConn: conn.(*net.TCPConn), Reader: bufio.NewReader(conn), writeChunkSize: RTMP_DEFAULT_CHUNK_SIZE, @@ -36,10 +33,10 @@ func (client *RTMPClient) Connect(addr string) bool { tmpBuf: make([]byte, 4), // subscribers: make(map[uint32]*engine.Subscriber), } - err = client.Handshake() + err = client.ClientHandshake() if err != nil { - plugin.Error(err) - return false + plugin.Error("handshake", zap.Error(err)) + return nil } connectArg := make(AMFObject) connectArg["swfUrl"] = addr @@ -51,7 +48,7 @@ func (client *RTMPClient) Connect(addr string) bool { for { msg, err := client.RecvMessage() if err != nil { - return false + return nil } switch msg.MessageTypeID { case RTMP_MSG_AMF0_COMMAND: @@ -60,96 +57,140 @@ func (client *RTMPClient) Connect(addr string) bool { case "_result": response := msg.MsgData.(*ResponseMessage) if response.Infomation["code"] == NetConnection_Connect_Success { - return true + return } else { - return false + return nil } } } } } -var _ engine.IPusher = (*RTMPPusher)(nil) -var _ engine.IPuller = (*RTMPPuller)(nil) - type RTMPPusher struct { + RTMPSender engine.Pusher - RTMPClient +} + +func (pusher *RTMPPusher) OnEvent(event any) any { + pusher.RTMPSender.OnEvent(event) + switch event.(type) { + case *engine.Stream: + pusher.NetConnection = NewRTMPClient(pusher.RemoteURL) + if pusher.NetConnection != nil { + pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) + go pusher.push() + } + case engine.PushEvent: + pusher.PushCount++ + if pusher.Stream == nil { + if plugin.Subscribe(pusher.StreamPath, pusher) { + } + } + } + return event } func (pusher *RTMPPusher) push() { - SendMedia(&pusher.NetConnection, &pusher.Subscriber) - pusher.NetConnection.Close() -} + defer pusher.Unsubscribe() + for { + msg, err := pusher.RecvMessage() + if err != nil { + break + } + switch msg.MessageTypeID { + case RTMP_MSG_AMF0_COMMAND: + cmd := msg.MsgData.(Commander).GetCommand() + switch cmd.CommandName { + case "_result": + if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok { + pusher.StreamID = response.StreamId + m := &PublishMessage{ + CURDStreamMessage{ + CommandMessage{ + "publish", + 0, + }, + response.StreamId, + }, + pusher.Stream.StreamName, + "live", + } + pusher.SendMessage(RTMP_MSG_AMF0_COMMAND, m) + } else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok { + if response.Infomation["code"] == "NetStream.Publish.Start" { -func (pusher *RTMPPusher) Push(count int) { - if pusher.Connect(pusher.RemoteURL) { - pusher.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) - for { - msg, err := pusher.RecvMessage() - if err != nil { - return - } - switch msg.MessageTypeID { - case RTMP_MSG_AMF0_COMMAND: - cmd := msg.MsgData.(Commander).GetCommand() - switch cmd.CommandName { - case "_result": - if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok { - arg := make(AMFObject) - arg["streamid"] = response.StreamId - pusher.SendCommand(SEND_PUBLISH_START_MESSAGE, arg) - } else if response, ok := msg.MsgData.(*ResponsePublishMessage); ok { - if response.Infomation["code"] == "NetStream.Publish.Start" { - go pusher.push() - } else { - return - } + } else { + return } } } } } + if !pusher.Stream.IsClosed() && pusher.Reconnect() { + pusher.OnEvent(engine.PullEvent(pusher.PushCount)) + } } type RTMPPuller struct { + RTMPReceiver engine.Puller - RTMPClient +} + +func (puller *RTMPPuller) OnEvent(event any) any { + puller.RTMPReceiver.OnEvent(event) + switch event.(type) { + case *engine.Stream: + puller.NetConnection = NewRTMPClient(puller.RemoteURL) + if puller.NetConnection != nil { + puller.absTs = make(map[uint32]uint32) + puller.SendCommand(SEND_CREATE_STREAM_MESSAGE, nil) + go puller.pull() + break + } + case engine.PullEvent: + puller.PullCount++ + if puller.Stream == nil { + if plugin.Publish(puller.StreamPath, puller) { + break + } + } + } + return event } func (puller *RTMPPuller) pull() { + defer puller.Unpublish() for { msg, err := puller.RecvMessage() if err != nil { - return + break } switch msg.MessageTypeID { case RTMP_MSG_AUDIO: puller.ReceiveAudio(msg) case RTMP_MSG_VIDEO: puller.ReceiveVideo(msg) - // case RTMP_MSG_AMF0_COMMAND: - // cmd := msg.MsgData.(Commander).GetCommand() - // switch cmd.CommandName { - // case "_result": - // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok { - // if response.Object["code"] == "NetStream.Play.Start" { + case RTMP_MSG_AMF0_COMMAND: + cmd := msg.MsgData.(Commander).GetCommand() + switch cmd.CommandName { + case "_result": + if response, ok := msg.MsgData.(*ResponseCreateStreamMessage); ok { + puller.StreamID = response.StreamId + m := &PlayMessage{} + m.CommandMessage.CommandName = "play" + m.StreamName = puller.Stream.StreamName + puller.SendMessage(RTMP_MSG_AMF0_COMMAND, m) + // if response, ok := msg.MsgData.(*ResponsePlayMessage); ok { + // if response.Object["code"] == "NetStream.Play.Start" { - // } else if response.Object["level"] == Level_Error { - // return errors.New(response.Object["code"].(string)) - // } - // } else { - // return errors.New("pull faild") - // } - // } - // } + // } else if response.Object["level"] == Level_Error { + // return errors.New(response.Object["code"].(string)) + // } + // } else { + // return errors.New("pull faild") + // } + } + } } } } -func (puller *RTMPPuller) Pull(count int) { - if puller.Connect(puller.RemoteURL) { - puller.SendCommand(SEND_PLAY_MESSAGE, AMFObject{"StreamPath": puller.StreamName}) - puller.MediaReceiver = NewMediaReceiver(&puller.Publisher) - go puller.pull() - } -} diff --git a/handshake.go b/handshake.go index 21669ae..f0af9f4 100644 --- a/handshake.go +++ b/handshake.go @@ -90,7 +90,7 @@ func (nc *NetConnection) Handshake() error { return nc.complex_handshake(C1) } -func (client *RTMPClient) Handshake() error { +func (client *NetConnection) ClientHandshake() error { C0C1 := make([]byte, 1536+1) C0C1[0] = RTMP_HANDSHAKE_VERSION client.Write(C0C1) diff --git a/main.go b/main.go index ad69ce7..65fd4a4 100644 --- a/main.go +++ b/main.go @@ -2,11 +2,10 @@ package rtmp import ( "context" - "errors" . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/config" - . "github.com/logrusorgru/aurora" + "go.uber.org/zap" ) type RTMPConfig struct { @@ -18,13 +17,15 @@ type RTMPConfig struct { ChunkSize int } +var _ PullPlugin = (*RTMPConfig)(nil) + func (config *RTMPConfig) Update(override config.Config) { - plugin.Info(Green("server rtmp start at"), BrightBlue(config.ListenAddr)) + plugin.Info("server rtmp start at", zap.String("listen addr", config.ListenAddr)) err := config.Listen(plugin, config) if err == context.Canceled { - plugin.Println(err) + plugin.Info("rtmp listen shutdown") } else { - plugin.Fatal(err) + plugin.Fatal("rtmp server", zap.Error(err)) } } @@ -33,22 +34,16 @@ var plugin = InstallPlugin(&RTMPConfig{ TCP: config.TCP{ListenAddr: ":1935"}, }) -func (config *RTMPConfig) PullStream(streamPath string, puller Puller) error { - var client RTMPPuller - client.Puller = puller - if client.Publish(streamPath, &client, config.Publish) { - return nil - } else { - return errors.New("publish faild") +func (config *RTMPConfig) PullStream(puller Puller) { + client := RTMPPuller{ + Puller: puller, } + client.OnEvent(PullEvent(0)) } -func (config *RTMPConfig) PushStream(stream *Stream, pusher Pusher) error { - var client RTMPPusher - client.ID = "RTMPPusher" - client.Pusher = pusher - if client.Subscribe(stream.Path,config.Subscribe) { - client.Pusher.Push(&client, config.Push) +func (config *RTMPConfig) PushStream(pusher Pusher) { + client := RTMPPusher{ + Pusher: pusher, } - return nil + client.OnEvent(PushEvent(0)) } diff --git a/media.go b/media.go index fab7910..6686f13 100644 --- a/media.go +++ b/media.go @@ -3,55 +3,125 @@ package rtmp import ( "net" - "github.com/Monibuca/engine/v4" . "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/codec" "github.com/Monibuca/engine/v4/track" + "github.com/Monibuca/engine/v4/util" ) -func SendMedia(nc *NetConnection, sub *Subscriber) (err error) { - vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack() - if vt != nil { - frame := vt.DecoderConfiguration - err = nc.sendAVMessage(0, net.Buffers(frame.AVCC), false, true) - sub.OnVideo = func(frame *engine.VideoFrame) error { - return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, false, false) - } - } - if at != nil { - sub.OnAudio = func(frame *engine.AudioFrame) (err error) { - if at.CodecID == codec.CodecID_AAC { - frame := at.DecoderConfiguration - err = nc.sendAVMessage(0, net.Buffers{frame.AVCC}, true, true) - } else { - err = nc.sendAVMessage(0, frame.AVCC, true, true) +type RTMPSender struct { + Subscriber + NetStream +} + +func (rtmp *RTMPSender) OnEvent(event any) any { + rtmp.Subscriber.OnEvent(event) + switch v := event.(type) { + case TrackRemoved: + //TODO + case *track.Audio: + isPlaying := rtmp.IsPlaying() + if rtmp.AddTrack(v) { + if v.CodecID == codec.CodecID_AAC { + rtmp.sendAVMessage(0, net.Buffers{rtmp.Subscriber.AudioTrack.DecoderConfiguration.AVCC}, false, true) } - sub.OnAudio = func(frame *engine.AudioFrame) error { - return nc.sendAVMessage(frame.DeltaTime, frame.AVCC, true, false) + // 如果不订阅视频则遇到音频也播放,否则需要等视频先播放 + if !isPlaying && !rtmp.Config.SubVideo { + go rtmp.Play() } - return } + case *track.Video: + isPlaying := rtmp.IsPlaying() + if rtmp.AddTrack(v) { + rtmp.sendAVMessage(0, net.Buffers(rtmp.Subscriber.VideoTrack.DecoderConfiguration.AVCC), true, true) + if !isPlaying { + go rtmp.Play() + } + } + case *AudioFrame: + rtmp.sendAVMessage(v.DeltaTime, v.AVCC, true, false) + case *VideoFrame: + rtmp.sendAVMessage(v.DeltaTime, v.AVCC, false, false) } - sub.Play(at, vt) + return event +} + +// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 +// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 +// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 +func (sender *RTMPSender) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) { + if sender.writeSeqNum > sender.bandwidth { + sender.totalWrite += sender.writeSeqNum + sender.writeSeqNum = 0 + sender.SendMessage(RTMP_MSG_ACK, Uint32Message(sender.totalWrite)) + sender.SendStreamID(RTMP_USER_PING_REQUEST, 0) + } + + var head *ChunkHeader + if isAudio { + head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_AUDIO, sender.StreamID, 0) + } else { + head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_VIDEO, sender.StreamID, 0) + } + + // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) + // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) + // 当Chunk Type为0时(即Chunk12), + var chunk1 net.Buffers + if isFirst { + chunk1 = append(chunk1, sender.encodeChunk12(head)) + } else { + chunk1 = append(chunk1, sender.encodeChunk8(head)) + } + chunks := util.SplitBuffers(payload, sender.writeChunkSize) + chunk1 = append(chunk1, chunks[0]...) + sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) + _, err = chunk1.WriteTo(sender.NetConnection) + // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) + for _, chunk := range chunks[1:] { + chunk1 = net.Buffers{sender.encodeChunk1(head)} + chunk1 = append(chunk1, chunk...) + sender.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) + _, err = chunk1.WriteTo(sender.NetConnection) + } + return nil } -func NewMediaReceiver(pub *Publisher) *MediaReceiver { - return &MediaReceiver{ - pub, - make(map[uint32]uint32), pub.NewVideoTrack(), pub.NewAudioTrack(), +func (r *RTMPSender) Response(code, level string) error { + m := new(ResponsePlayMessage) + m.CommandName = Response_OnStatus + m.TransactionId = 0 + m.Object = AMFObject{ + "code": code, + "level": level, + "clientid": 1, } + m.StreamID = r.StreamID + return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m) } -type MediaReceiver struct { - *Publisher - absTs map[uint32]uint32 - vt *track.UnknowVideo - at *track.UnknowAudio +type RTMPReceiver struct { + Publisher + NetStream + absTs map[uint32]uint32 } -func (r *MediaReceiver) ReceiveAudio(msg *Chunk) { - plugin.Tracef("rec_audio chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp) +func (r *RTMPReceiver) Response(code, level string) error { + m := new(ResponsePublishMessage) + m.CommandName = Response_OnStatus + m.TransactionId = 0 + m.Infomation = AMFObject{ + "code": code, + "level": level, + "clientid": 1, + } + m.StreamID = r.StreamID + return r.SendMessage(RTMP_MSG_AMF0_COMMAND, m) +} + +func (r *RTMPReceiver) ReceiveAudio(msg *Chunk) { + // plugin.Tracef("rec_audio chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp) if msg.ChunkType == 0 { r.absTs[msg.ChunkStreamID] = 0 } @@ -60,10 +130,10 @@ func (r *MediaReceiver) ReceiveAudio(msg *Chunk) { } else { r.absTs[msg.ChunkStreamID] += msg.Timestamp } - r.at.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) + r.AudioTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) } -func (r *MediaReceiver) ReceiveVideo(msg *Chunk) { - plugin.Tracef("rev_video chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp) +func (r *RTMPReceiver) ReceiveVideo(msg *Chunk) { + // plugin.Tracef("rev_video chunkType:%d chunkStreamID:%d ts:%d", msg.ChunkType, msg.ChunkStreamID, msg.Timestamp) if msg.ChunkType == 0 { r.absTs[msg.ChunkStreamID] = 0 } @@ -72,5 +142,5 @@ func (r *MediaReceiver) ReceiveVideo(msg *Chunk) { } else { r.absTs[msg.ChunkStreamID] += msg.Timestamp } - r.vt.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) + r.VideoTrack.WriteAVCC(r.absTs[msg.ChunkStreamID], msg.Body) } diff --git a/msg.go b/msg.go index f5ab6bc..101e39f 100644 --- a/msg.go +++ b/msg.go @@ -6,6 +6,7 @@ import ( "strings" "github.com/Monibuca/engine/v4/util" + "go.uber.org/zap" ) const ( @@ -217,6 +218,7 @@ func decodeCommandAMF0(chunk *Chunk) { chunk.MsgData = &CreateStreamMessage{ cmdMsg, amf.readObject(), } + case "play": amf.readNull() chunk.MsgData = &PlayMessage{ @@ -239,7 +241,10 @@ func decodeCommandAMF0(chunk *Chunk) { case "publish": amf.readNull() chunk.MsgData = &PublishMessage{ - cmdMsg, + CURDStreamMessage{ + cmdMsg, + chunk.MessageStreamID, + }, amf.readString(), amf.readString(), } @@ -286,13 +291,14 @@ func decodeCommandAMF0(chunk *Chunk) { amf.readObject(), amf.readObject(), "", } + codef := zap.String("code", response.Infomation["code"].(string)) switch response.Infomation["level"] { case Level_Status: - plugin.Infof("_result :", response.Infomation["code"]) + plugin.Info("_result :", codef) case Level_Warning: - plugin.Warnf("_result :", response.Infomation["code"]) + plugin.Warn("_result :", codef) case Level_Error: - plugin.Errorf("_result :", response.Infomation["code"]) + plugin.Error("_result :", codef) } if strings.HasPrefix(response.Infomation["code"].(string), "NetStream.Publish") { chunk.MsgData = &ResponsePublishMessage{ @@ -313,7 +319,7 @@ func decodeCommandAMF0(chunk *Chunk) { } case "FCPublish", "FCUnpublish": default: - plugin.Println("decode command amf0 cmd:", cmd) + plugin.Info("decode command amf0 ", zap.String("cmd", cmd)) } } @@ -477,7 +483,7 @@ type PlayMessage struct { StreamName string Start uint64 Duration uint64 - Rest bool + Reset bool } // 命令名 -> 命令名,设置为”play” @@ -502,7 +508,7 @@ func (msg *PlayMessage) Encode() []byte { amf.writeNumber(float64(msg.Duration)) } - amf.writeBool(msg.Rest) + amf.writeBool(msg.Reset) return amf.Buffer } @@ -532,6 +538,10 @@ type CURDStreamMessage struct { StreamId uint32 } +func (msg *CURDStreamMessage) GetStreamID() uint32 { + return msg.StreamId +} + func (msg *CURDStreamMessage) Encode0() { } @@ -557,7 +567,7 @@ func (msg *ReceiveAVMessage) Encode0() { // The client sends the publish command to publish a named stream to the server. Using this name, // any client can play this stream and receive the published audio, video, and data messages type PublishMessage struct { - CommandMessage + CURDStreamMessage PublishingName string PublishingType string } @@ -572,7 +582,14 @@ type PublishMessage struct { // “append”:流被发布并且附加到一个文件之后.如果没有发现文件则创建一个文件. // “live”:发布直播数据而不录制到文件 -func (msg *PublishMessage) Encode0() { +func (msg *PublishMessage) Encode0() []byte { + var amf AMF + amf.writeString(msg.CommandName) + amf.writeNumber(float64(msg.TransactionId)) + amf.writeNull() + amf.writeString(msg.PublishingName) + amf.writeString(msg.PublishingType) + return amf.Buffer } // Seek Message diff --git a/netConnection.go b/netConnection.go index 75e53fe..73ba6e1 100644 --- a/netConnection.go +++ b/netConnection.go @@ -28,7 +28,6 @@ const ( SEND_CONNECT_RESPONSE_MESSAGE = "Send Connect Response Message" SEND_CREATE_STREAM_MESSAGE = "Send Create Stream Message" - SEND_CREATE_STREAM_RESPONSE_MESSAGE = "Send Create Stream Response Message" SEND_PLAY_MESSAGE = "Send Play Message" SEND_PLAY_RESPONSE_MESSAGE = "Send Play Response Message" @@ -76,8 +75,6 @@ func newPlayResponseMessageData(streamid uint32, code, level string) (amfobj AMF } type NetConnection struct { - *MediaReceiver - subscribers map[uint32]*Subscriber *bufio.Reader *net.TCPConn bandwidth uint32 @@ -88,35 +85,30 @@ type NetConnection struct { writeChunkSize int readChunkSize int incompleteRtmpBody map[uint32]util.Buffer // 完整的RtmpBody,在网络上是被分成一块一块的,需要将其组装起来 - streamID uint32 // 流ID rtmpHeader map[uint32]*ChunkHeader // RtmpHeader objectEncoding float64 appName string tmpBuf []byte //用来接收小数据,复用内存 } -func (conn *NetConnection) Close() { - if conn.MediaReceiver != nil { - conn.UnPublish() - } - conn.TCPConn.Close() - for _, s := range conn.subscribers { - s.Close() - } -} - func (conn *NetConnection) ReadFull(buf []byte) (n int, err error) { return io.ReadFull(conn.Reader, buf) } -func (conn *NetConnection) SendStreamID0(eventType uint16) (err error) { - return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, 0}) -} -func (conn *NetConnection) SendStreamID(eventType uint16) (err error) { - return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, conn.streamID}) +func (conn *NetConnection) SendStreamID(eventType uint16, streamID uint32) (err error) { + return conn.SendMessage(RTMP_MSG_USER_CONTROL, &StreamIDMessage{UserControlMessage{EventType: eventType}, streamID}) } func (conn *NetConnection) SendUserControl(eventType uint16) error { return conn.SendMessage(RTMP_MSG_USER_CONTROL, &UserControlMessage{EventType: eventType}) } + +func (conn *NetConnection) ResponseCreateStream(tid uint64, streamID uint32) error { + m := &ResponseCreateStreamMessage{} + m.CommandName = Response_Result + m.TransactionId = tid + m.StreamId = streamID + return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) +} + func (conn *NetConnection) SendCommand(message string, args any) error { switch message { // case SEND_SET_BUFFER_LENGTH_MESSAGE: @@ -137,16 +129,6 @@ func (conn *NetConnection) SendCommand(message string, args any) error { m.CommandName = "createStream" m.TransactionId = 2 return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_CREATE_STREAM_RESPONSE_MESSAGE: - tid, ok := args.(uint64) - if !ok { - return errors.New(SEND_CREATE_STREAM_RESPONSE_MESSAGE + ", The parameter only one(TransactionId uint64)!") - } - m := &ResponseCreateStreamMessage{} - m.CommandName = Response_Result - m.TransactionId = tid - m.StreamId = conn.streamID - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) case SEND_PLAY_MESSAGE: data, ok := args.(AMFObject) if !ok { @@ -156,14 +138,14 @@ func (conn *NetConnection) SendCommand(message string, args any) error { m.CommandName = "play" m.TransactionId = 1 for i, v := range data { - if i == "StreamPath" { + if i == "StreamName" { m.StreamName = v.(string) } else if i == "Start" { m.Start = v.(uint64) } else if i == "Duration" { m.Duration = v.(uint64) - } else if i == "Rest" { - m.Rest = v.(bool) + } else if i == "Reset" { + m.Reset = v.(bool) } } return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) @@ -246,16 +228,7 @@ func (conn *NetConnection) SendCommand(message string, args any) error { m.Object = obj m.Optional = info return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) - case SEND_PUBLISH_RESPONSE_MESSAGE, SEND_PUBLISH_START_MESSAGE: - info := args.(AMFObject) - info["clientid"] = 1 - m := new(ResponsePublishMessage) - m.CommandName = Response_OnStatus - m.TransactionId = 0 - m.Infomation = info - m.StreamID = info["streamid"].(uint32) - delete(info, "streamid") - return conn.SendMessage(RTMP_MSG_AMF0_COMMAND, m) + case SEND_UNPUBLISH_RESPONSE_MESSAGE: data, ok := args.(AMFObject) if !ok { @@ -269,48 +242,6 @@ func (conn *NetConnection) SendCommand(message string, args any) error { return errors.New("send message no exist") } -// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 -// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 -// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 -func (conn *NetConnection) sendAVMessage(ts uint32, payload net.Buffers, isAudio bool, isFirst bool) (err error) { - if conn.writeSeqNum > conn.bandwidth { - conn.totalWrite += conn.writeSeqNum - conn.writeSeqNum = 0 - conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite)) - conn.SendStreamID0(RTMP_USER_PING_REQUEST) - } - - var head *ChunkHeader - if isAudio { - head = newRtmpHeader(RTMP_CSID_AUDIO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_AUDIO, conn.streamID, 0) - } else { - head = newRtmpHeader(RTMP_CSID_VIDEO, ts, uint32(util.SizeOfBuffers(payload)), RTMP_MSG_VIDEO, conn.streamID, 0) - } - - // 第一次是发送关键帧,需要完整的消息头(Chunk Basic Header(1) + Chunk Message Header(11) + Extended Timestamp(4)(可能会要包括)) - // 后面开始,就是直接发送音视频数据,那么直接发送,不需要完整的块(Chunk Basic Header(1) + Chunk Message Header(7)) - // 当Chunk Type为0时(即Chunk12), - var chunk1 net.Buffers - if isFirst { - chunk1 = append(chunk1, conn.encodeChunk12(head)) - } else { - chunk1 = append(chunk1, conn.encodeChunk8(head)) - } - chunks := util.SplitBuffers(payload, conn.writeChunkSize) - chunk1 = append(chunk1, chunks[0]...) - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) - _, err = chunk1.WriteTo(conn) - // 如果在音视频数据太大,一次发送不完,那么这里进行分割(data + Chunk Basic Header(1)) - for _, chunk := range chunks[1:] { - chunk1 = net.Buffers{conn.encodeChunk1(head)} - chunk1 = append(chunk1, chunk...) - conn.writeSeqNum += uint32(util.SizeOfBuffers(chunk1)) - _, err = chunk1.WriteTo(conn) - } - - return nil -} - func (conn *NetConnection) readChunk() (msg *Chunk, err error) { head, err := conn.ReadByte() conn.readSeqNum++ @@ -357,7 +288,6 @@ func (conn *NetConnection) readChunk() (msg *Chunk, err error) { needRead = unRead } if n, err := conn.ReadFull(currentBody.Malloc(needRead)); err != nil { - plugin.Error(err) return nil, err } else { conn.readSeqNum += uint32(n) @@ -550,7 +480,7 @@ func (conn *NetConnection) SendMessage(t byte, msg RtmpMessage) (err error) { conn.totalWrite += conn.writeSeqNum conn.writeSeqNum = 0 err = conn.SendMessage(RTMP_MSG_ACK, Uint32Message(conn.totalWrite)) - err = conn.SendStreamID0(RTMP_USER_PING_REQUEST) + err = conn.SendStreamID(RTMP_USER_PING_REQUEST, 0) } var chunk = net.Buffers{conn.encodeChunk12(head)} if len(body) > conn.writeChunkSize { diff --git a/netStream.go b/netStream.go index bb5565b..5492451 100644 --- a/netStream.go +++ b/netStream.go @@ -2,17 +2,30 @@ package rtmp import ( "bufio" + "context" "fmt" "net" "sync/atomic" "github.com/Monibuca/engine/v4" "github.com/Monibuca/engine/v4/util" + "go.uber.org/zap" ) +type NetStream struct { + *NetConnection + StreamID uint32 +} + +func (ns *NetStream) Begin() { + ns.SendStreamID(RTMP_USER_STREAM_BEGIN, ns.StreamID) +} + var gstreamid = uint32(64) func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { + senders := make(map[uint32]*RTMPSender) + receivers := make(map[uint32]*RTMPReceiver) nc := NetConnection{ TCPConn: conn, Reader: bufio.NewReader(conn), @@ -22,12 +35,13 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { incompleteRtmpBody: make(map[uint32]util.Buffer), bandwidth: RTMP_MAX_CHUNK_SIZE << 3, tmpBuf: make([]byte, 4), - subscribers: make(map[uint32]*engine.Subscriber), } + ctx, cancel := context.WithCancel(engine.Engine) defer nc.Close() + defer cancel() /* Handshake */ if err := nc.Handshake(); err != nil { - plugin.Error("handshake", err) + plugin.Error("handshake", zap.Error(err)) return } for { @@ -41,7 +55,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { break } cmd := msg.MsgData.(Commander).GetCommand() - plugin.Debugf("recv cmd '%s'", cmd.CommandName) + plugin.Debug("recv cmd", zap.String("commandName", cmd.CommandName), zap.Uint32("streamID", msg.MessageStreamID)) switch cmd.CommandName { case "connect": connect := msg.MsgData.(*CallMessage) @@ -51,7 +65,7 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { nc.objectEncoding = objectEncoding.(float64) } nc.appName = app.(string) - plugin.Infof("connect app:'%s',objectEncoding:%v", nc.appName, objectEncoding) + plugin.Info("connect", zap.String("appName", nc.appName), zap.Float64("objectEncoding", nc.objectEncoding)) err = nc.SendMessage(RTMP_MSG_ACK_SIZE, Uint32Message(512<<10)) nc.writeChunkSize = config.ChunkSize err = nc.SendMessage(RTMP_MSG_CHUNK_SIZE, Uint32Message(config.ChunkSize)) @@ -59,59 +73,61 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { AcknowledgementWindowsize: uint32(512 << 10), LimitType: byte(2), }) - err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN) + err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN, 0) err = nc.SendCommand(SEND_CONNECT_RESPONSE_MESSAGE, nc.objectEncoding) case "createStream": - nc.streamID = atomic.AddUint32(&gstreamid, 1) - plugin.Info("createStream:", nc.streamID) - err = nc.SendCommand(SEND_CREATE_STREAM_RESPONSE_MESSAGE, cmd.TransactionId) - if err != nil { - plugin.Error(err) - return - } + streamId := atomic.AddUint32(&gstreamid, 1) + plugin.Info("createStream:", zap.Uint32("streamId", streamId)) + nc.ResponseCreateStream(cmd.TransactionId, streamId) case "publish": pm := msg.MsgData.(*PublishMessage) - var puber engine.Publisher - if puber.Publish(nc.appName+"/"+pm.PublishingName, &nc, config.Publish) { - nc.MediaReceiver = NewMediaReceiver(&puber) - nc.SendStreamID(RTMP_USER_STREAM_BEGIN) - err = nc.SendCommand(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status)) + receiver := &RTMPReceiver{ + NetStream: NetStream{ + NetConnection: &nc, + StreamID: pm.StreamId, + }, + } + receiver.OnEvent(ctx) + if plugin.Publish(nc.appName+"/"+pm.PublishingName, receiver) { + receiver.absTs = make(map[uint32]uint32) + receiver.Begin() + err = receiver.Response(NetStream_Publish_Start, Level_Status) } else { - err = nc.SendCommand(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error)) + err = receiver.Response(NetStream_Publish_BadName, Level_Error) } case "play": pm := msg.MsgData.(*PlayMessage) streamPath := nc.appName + "/" + pm.StreamName - subscriber := &engine.Subscriber{ - Type: "RTMP", - ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID), + sender := &RTMPSender{ + NetStream: NetStream{ + NetConnection: &nc, + StreamID: msg.MessageStreamID, + }, } - if subscriber.Subscribe(streamPath, config.Subscribe) { - nc.subscribers[nc.streamID] = subscriber - err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED) - err = nc.SendStreamID(RTMP_USER_STREAM_BEGIN) - err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) - err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) - go func() { - SendMedia(&nc, subscriber) - err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Stop, Level_Status)) - err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Complete, Level_Status)) - }() + sender.OnEvent(ctx) + sender.ID = fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), sender.StreamID) + if plugin.Subscribe(streamPath, sender) { + senders[msg.MessageStreamID] = sender + err = nc.SendStreamID(RTMP_USER_STREAM_IS_RECORDED, msg.MessageStreamID) + sender.Begin() + sender.Response(NetStream_Play_Reset, Level_Status) + sender.Response(NetStream_Play_Start, Level_Status) } else { - err = nc.SendCommand(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Failed, Level_Error)) + sender.Response(NetStream_Play_Failed, Level_Error) } case "closeStream": cm := msg.MsgData.(*CURDStreamMessage) - if stream, ok := nc.subscribers[cm.StreamId]; ok { - stream.Close() - delete(nc.subscribers, cm.StreamId) + if stream, ok := senders[cm.StreamId]; ok { + stream.Unsubscribe() + delete(senders, cm.StreamId) } case "releaseStream": cm := msg.MsgData.(*ReleaseStreamMessage) amfobj := make(AMFObject) - if nc.Stream != nil && nc.Stream.AppName == nc.appName && nc.Stream.StreamName == cm.StreamName { + p, ok := receivers[msg.MessageStreamID] + if ok { amfobj["level"] = "_result" - nc.Stream.UnPublish() + p.Unpublish() } else { amfobj["level"] = "_error" } @@ -119,12 +135,16 @@ func (config *RTMPConfig) ServeTCP(conn *net.TCPConn) { err = nc.SendCommand(SEND_UNPUBLISH_RESPONSE_MESSAGE, amfobj) } case RTMP_MSG_AUDIO: - nc.ReceiveAudio(msg) + if r, ok := receivers[msg.MessageStreamID]; ok { + r.ReceiveAudio(msg) + } case RTMP_MSG_VIDEO: - nc.ReceiveVideo(msg) + if r, ok := receivers[msg.MessageStreamID]; ok { + r.ReceiveVideo(msg) + } } } else { - plugin.Error(err) + plugin.Error("receive", zap.Error(err)) return } }