更新ReadMe

This commit is contained in:
dexter
2021-12-01 18:57:36 +08:00
parent b05653364b
commit 70daa477fa

151
README.md
View File

@@ -2,9 +2,22 @@
该项目为m7s的引擎部分该部分逻辑是流媒体服务器的核心转发逻辑。仅包含最基础的功能不含任何网络协议部分但包含了一个插件的引入机制其他功能均由插件实现
# 引擎配置
```toml
[Engine]
EnableAudio = true
EnableVideo = true
# 发布流默认过期时间单位秒
PublishTimeout = 60
# 自动关闭触发后延迟的秒数(期间内如果有新的订阅则取消触发关闭)
AutoCloseDelay = 10
# 启用RTP包乱序重排
RTPReorder = false
```
# 引擎的基本功能
- 引擎初始化会加载配置文件并逐个调用插件的Run函数
- 具有发布功能的插件,会通过GetStream函数创建一个流Stream对象这个Stream对象随后可以被订阅
- 具有发布功能的插件,新建一个Stream对象这个Stream对象随后可以被订阅
- Stream对象中含有两个列表一个是VideoTracks一个是AudioTracks用来存放视频数据和音频数据
- 每一个VideoTrack或者AudioTrack中包含一个RingBuffer用来存储发布者提供的数据同时提供订阅者访问。
- 具有订阅功能的插件会通过GetStream函数获取到一个流然后选择VideoTracks、AudioTracks里面的RingBuffer进行连续的读取
@@ -13,104 +26,70 @@
以rtmp协议为例子
```go
if pub := new(engine.Publisher); pub.Publish(streamPath) {
pub.Type = "RTMP"
stream = pub.Stream
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
stream = &engine.Stream{Type: "RTMP", StreamPath: streamPath}
if stream.Publish() {
absTs := make(map[uint32]uint32)
vt := stream.NewVideoTrack(0)
at := stream.NewAudioTrack(0)
rec_audio = func(msg *Chunk) {
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
at.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
}
rec_video = func(msg *Chunk) {
if msg.ChunkType == 0 {
absTs[msg.ChunkStreamID] = 0
}
if msg.Timestamp == 0xffffff {
absTs[msg.ChunkStreamID] += msg.ExtendTimestamp
} else {
absTs[msg.ChunkStreamID] += msg.Timestamp
}
vt.PushByteStream(absTs[msg.ChunkStreamID], msg.Body)
}
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
} else {
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
err = nc.SendMessage(SEND_PUBLISH_RESPONSE_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_BadName, Level_Error))
}
```
默认会创建一个VideoTrack和一个AudioTrack
当我们接收到数据的时候就可以朝里面填充物数据了
```go
rec_video = func(msg *Chunk) {
// 等待AVC序列帧
if msg.Body[1] != 0 {
return
}
vt := stream.VideoTracks[0]
var ts_video uint32
var info codec.AVCDecoderConfigurationRecord
//0:codec,1:IsAVCSequence,2~4:compositionTime
if _, err := info.Unmarshal(msg.Body[5:]); err == nil {
vt.SPSInfo, err = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.SPS = info.SequenceParameterSetNALUnit
vt.PPS = info.PictureParameterSetNALUnit
}
vt.RtmpTag = msg.Body
nalulenSize := int(info.LengthSizeMinusOne&3 + 1)
rec_video = func(msg *Chunk) {
nalus := msg.Body[5:]
if msg.Timestamp == 0xffffff {
ts_video += msg.ExtendTimestamp
} else {
ts_video += msg.Timestamp // 绝对时间戳
}
for len(nalus) > nalulenSize {
nalulen := 0
for i := 0; i < nalulenSize; i++ {
nalulen += int(nalus[i]) << (8 * (nalulenSize - i - 1))
}
vt.Push(ts_video, nalus[nalulenSize:nalulen+nalulenSize])
nalus = nalus[nalulen+nalulenSize:]
}
}
close(vt.WaitFirst)
}
```
在填充数据之前需要获取到SPS和PPS然后设置好因为订阅者需要先发送这个数据
然后通过Track到Push函数将数据填充到RingBuffer里面去
# 订阅插件如何订阅流
```go
subscriber := engine.Subscriber{
Type: "RTMP",
ID: fmt.Sprintf("%s|%d", conn.RemoteAddr().String(), nc.streamID),
}
if err = subscriber.Subscribe(streamPath); err == nil {
streams[nc.streamID] = &subscriber
err = nc.SendMessage(SEND_CHUNK_SIZE_MESSAGE, uint32(nc.writeChunkSize))
err = nc.SendMessage(SEND_STREAM_IS_RECORDED_MESSAGE, nil)
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Reset, Level_Status))
err = nc.SendMessage(SEND_PLAY_RESPONSE_MESSAGE, newPlayResponseMessageData(nc.streamID, NetStream_Play_Start, Level_Status))
vt, at := subscriber.GetVideoTrack("h264"), subscriber.OriginAudioTrack
if vt != nil {
var lastVideoTime uint32
err = nc.SendMessage(SEND_FULL_VDIEO_MESSAGE, &AVPack{Payload: vt.RtmpTag})
subscriber.OnVideo = func(pack engine.VideoPack) {
if lastVideoTime == 0 {
lastVideoTime = pack.Timestamp
}
t := pack.Timestamp - lastVideoTime
lastVideoTime = pack.Timestamp
payload := codec.Nalu2RTMPTag(pack.Payload)
defer utils.RecycleSlice(payload)
err = nc.SendMessage(SEND_VIDEO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
}
sub := Subscriber{ID: r.RemoteAddr, Type: "FLV", Ctx2: r.Context()}
if err := sub.Subscribe(stringPath); err == nil {
vt, at := sub.WaitVideoTrack(), sub.WaitAudioTrack()
var buffer bytes.Buffer
if _, err := amf.WriteString(&buffer, "onMetaData"); err != nil {
return
}
if vt != nil {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, 0, vt.ExtraData.Payload)
sub.OnVideo = func(ts uint32, pack *VideoPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_VIDEO, ts, pack.Payload)
}
if at != nil {
var lastAudioTime uint32
var aac byte
if at.SoundFormat == 10 {
aac = at.RtmpTag[0]
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, &AVPack{Payload: at.RtmpTag})
}
subscriber.OnAudio = func(pack engine.AudioPack) {
if lastAudioTime == 0 {
lastAudioTime = pack.Timestamp
}
t := pack.Timestamp - lastAudioTime
lastAudioTime = pack.Timestamp
payload := codec.Audio2RTMPTag(aac, pack.Payload)
defer utils.RecycleSlice(payload)
err = nc.SendMessage(SEND_AUDIO_MESSAGE, &AVPack{Timestamp: t, Payload: payload})
}
}
if at != nil {
if at.CodecID == 10 {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, 0, at.ExtraData)
}
go subscriber.Play(at, vt)
sub.OnAudio = func(ts uint32, pack *AudioPack) {
codec.WriteFLVTag(w, codec.FLV_TAG_TYPE_AUDIO, ts, pack.Payload)
}
}
sub.Play(at, vt)
}
```
- 在发送数据前,需要先发送音视频的序列帧