From 70daa477fa14c1bb7be985f6b376e303d88f6979 Mon Sep 17 00:00:00 2001 From: dexter <178529795@qq.com> Date: Wed, 1 Dec 2021 18:57:36 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9B=B4=E6=96=B0ReadMe?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 151 +++++++++++++++++++++++------------------------------- 1 file changed, 65 insertions(+), 86 deletions(-) diff --git a/README.md b/README.md index 50dd567..6990ab9 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,22 @@ 该项目为m7s的引擎部分,该部分逻辑是流媒体服务器的核心转发逻辑。仅包含最基础的功能,不含任何网络协议部分,但包含了一个插件的引入机制,其他功能均由插件实现 +# 引擎配置 +```toml +[Engine] +EnableAudio = true +EnableVideo = true +# 发布流默认过期时间单位秒 +PublishTimeout = 60 +# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭) +AutoCloseDelay = 10 +# 启用RTP包乱序重排 +RTPReorder = false +``` + # 引擎的基本功能 - 引擎初始化会加载配置文件,并逐个调用插件的Run函数 -- 具有发布功能的插件,会通过GetStream函数创建一个流,即Stream对象,这个Stream对象随后可以被订阅 +- 具有发布功能的插件,新建一个Stream对象,这个Stream对象随后可以被订阅 - Stream对象中含有两个列表,一个是VideoTracks一个是AudioTracks用来存放视频数据和音频数据 - 每一个VideoTrack或者AudioTrack中包含一个RingBuffer,用来存储发布者提供的数据,同时提供订阅者访问。 - 具有订阅功能的插件,会通过GetStream函数获取到一个流,然后选择VideoTracks、AudioTracks里面的RingBuffer进行连续的读取 @@ -13,104 +26,70 @@ 以rtmp协议为例子 ```go -if pub := new(engine.Publisher); pub.Publish(streamPath) { - pub.Type = "RTMP" - stream = pub.Stream - err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) - err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status)) +stream = &engine.Stream{Type: "RTMP", StreamPath: streamPath} +if stream.Publish() { + absTs := make(map[uint32]uint32) + vt := stream.NewVideoTrack(0) + at := stream.NewAudioTrack(0) + rec_audio = func(msg *Chunk) { + if msg.ChunkType == 0 { + absTs[msg.ChunkStreamID] = 0 + } + if msg.Timestamp == 0xffffff { + absTs[msg.ChunkStreamID] += msg.ExtendTimestamp + } else { + absTs[msg.ChunkStreamID] += msg.Timestamp + } + at.PushByteStream(absTs[msg.ChunkStreamID], msg.Body) + } + rec_video = func(msg *Chunk) { + if msg.ChunkType == 0 { + absTs[msg.ChunkStreamID] = 0 + } + if msg.Timestamp == 0xffffff { + absTs[msg.ChunkStreamID] += msg.ExtendTimestamp + } else { + absTs[msg.ChunkStreamID] += msg.Timestamp + } + vt.PushByteStream(absTs[msg.ChunkStreamID], msg.Body) + } + err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) + err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status)) } else { - err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error)) + err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error)) } ``` 默认会创建一个VideoTrack和一个AudioTrack 当我们接收到数据的时候就可以朝里面填充物数据了 -```go -rec_video = func(msg *Chunk) { - // 等待AVC序列帧 - if msg.Body[1] != 0 { - return - } - vt := stream.VideoTracks[0] - var ts_video uint32 - var info codec.AVCDecoderConfigurationRecord - //0:codec,1:IsAVCSequence,2~4:compositionTime - if _, err := info.Unmarshal(msg.Body[5:]); err == nil { - vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit) - vt.SPS = info.SequenceParameterSetNALUnit - vt.PPS = info.PictureParameterSetNALUnit - } - vt.RtmpTag = msg.Body - nalulenSize := int(info.LengthSizeMinusOne&3 + 1) - rec_video = func(msg *Chunk) { - nalus := msg.Body[5:] - if msg.Timestamp == 0xffffff { - ts_video += msg.ExtendTimestamp - } else { - ts_video += msg.Timestamp // 绝对时间戳 - } - for len(nalus) > nalulenSize { - nalulen := 0 - for i := 0; i < nalulenSize; i++ { - nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1)) - } - vt.Push(ts_video, nalus[nalulenSize:nalulen+nalulenSize]) - nalus = nalus[nalulen+nalulenSize:] - } - } - close(vt.WaitFirst) -} -``` + 在填充数据之前,需要获取到SPS和PPS,然后设置好,因为订阅者需要先发送这个数据 然后通过Track到Push函数将数据填充到RingBuffer里面去 # 订阅插件如何订阅流 ```go -subscriber := engine.Subscriber{ - Type: "RTMP", - ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID), -} -if err = subscriber.Subscribe(streamPath); err == nil { - streams[nc.streamID] = &subscriber - err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize)) - err = nc.SendMessage(SEND_STREAM_IS_RECORDED_MESSAGE, nil) - err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) - err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status)) - err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status)) - vt, at := subscriber.GetVideoTrack("h264"), subscriber.OriginAudioTrack - if vt != nil { - var lastVideoTime uint32 - err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag}) - subscriber.OnVideo = func(pack engine.VideoPack) { - if lastVideoTime == 0 { - lastVideoTime = pack.Timestamp - } - t := pack.Timestamp - lastVideoTime - lastVideoTime = pack.Timestamp - payload := codec.Nalu2RTMPTag(pack.Payload) - defer utils.RecycleSlice(payload) - err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) - } +sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()} +if err := sub.Subscribe(stringPath); err == nil { + vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack() + var buffer bytes.Buffer + if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil { + return + } + if vt != nil { + codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload) + sub.OnVideo = func(ts uint32, pack *VideoPack) { + codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload) } - if at != nil { - var lastAudioTime uint32 - var aac byte - if at.SoundFormat == 10 { - aac = at.RtmpTag[0] - err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag}) - } - subscriber.OnAudio = func(pack engine.AudioPack) { - if lastAudioTime == 0 { - lastAudioTime = pack.Timestamp - } - t := pack.Timestamp - lastAudioTime - lastAudioTime = pack.Timestamp - payload := codec.Audio2RTMPTag(aac, pack.Payload) - defer utils.RecycleSlice(payload) - err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload}) - } + } + if at != nil { + if at.CodecID == 10 { + codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData) } - go subscriber.Play(at, vt) + sub.OnAudio = func(ts uint32, pack *AudioPack) { + codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload) + } + } + sub.Play(at, vt) } ``` - 在发送数据前,需要先发送音视频的序列帧 \ No newline at end of file