开发提交

This commit is contained in:
langhuihui
2021-01-11 23:08:15 +08:00
parent 3db1a0c435
commit ca016d834b
21 changed files with 374 additions and 863 deletions

369
stream.go
View File

@@ -1,369 +0,0 @@
package engine
import (
"bytes"
"context"
"log"
"sync"
"time"
. "github.com/Monibuca/engine/v2/avformat"
. "github.com/logrusorgru/aurora"
)
var streamCollection Collection
// Collection 对sync.Map的包装
type Collection struct {
sync.Map
}
//FindStream 根据流路径查找流
func FindStream(streamPath string) *Stream {
if s, ok := streamCollection.Load(streamPath); ok {
return s.(*Stream)
}
return nil
}
//GetStream 根据流路径获取流,如果不存在则创建一个新的
func GetStream(streamPath string) (result *Stream) {
item, loaded := streamCollection.LoadOrStore(streamPath, &Stream{
Subscribers: make(map[string]*Subscriber),
Control: make(chan interface{}),
AVRing: NewRing(config.RingSize),
StreamInfo: StreamInfo{
StreamPath: streamPath,
SubscriberInfo: make([]*SubscriberInfo, 0),
HasVideo: true,
HasAudio: true,
EnableAudio: &config.EnableAudio,
EnableVideo: &config.EnableVideo,
},
WaitPub: make(chan struct{}),
})
result = item.(*Stream)
if !loaded {
Summary.Streams = append(Summary.Streams, &result.StreamInfo)
result.Context, result.Cancel = context.WithCancel(context.Background())
if config.EnableVideo {
result.EnableVideo = &result.HasVideo
}
if config.EnableAudio {
result.EnableAudio = &result.HasAudio
}
go result.Run()
}
return
}
// Stream 流定义
type Stream struct {
context.Context
*Publisher
StreamInfo //可序列化,供后台查看的数据
Control chan interface{}
Cancel context.CancelFunc
Subscribers map[string]*Subscriber // 订阅者
VideoTag *AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen *Ring //最近的关键帧位置,首屏渲染
AVRing *Ring //数据环
WaitPub chan struct{} //用于订阅和等待发布者
UseTimestamp bool //是否采用数据包中的时间戳
SPS []byte
PPS []byte
}
// StreamInfo 流可序列化信息,用于控制台显示
type StreamInfo struct {
StreamPath string
StartTime time.Time
SubscriberInfo []*SubscriberInfo
Type string
VideoInfo struct {
PacketCount int
CodecID byte
SPSInfo SPSInfo
BPS int
lastIndex int
GOP int //关键帧间隔
}
AudioInfo struct {
PacketCount int
SoundFormat byte //4bit
SoundRate int //2bit
SoundSize byte //1bit
SoundType byte //1bit
lastIndex int
BPS int
}
HasAudio bool
HasVideo bool
EnableVideo *bool
EnableAudio *bool
}
// UnSubscribeCmd 取消订阅命令
type UnSubscribeCmd struct {
*Subscriber
}
// SubscribeCmd 订阅流命令
type SubscribeCmd struct {
*Subscriber
}
// ChangeStreamCmd 切换流命令
type ChangeStreamCmd struct {
*Subscriber
NewStream *Stream
}
func (r *Stream) onClosed() {
Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
streamCollection.Delete(r.StreamPath)
for i, val := range Summary.Streams {
if val == &r.StreamInfo {
Summary.Streams = append(Summary.Streams[:i], Summary.Streams[i+1:]...)
break
}
}
OnStreamClosedHooks.Trigger(r)
}
//Subscribe 订阅流
func (r *Stream) Subscribe(s *Subscriber) {
s.Stream = r
if r.Err() == nil {
s.SubscribeTime = time.Now()
Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.Context, s.Cancel = context.WithCancel(r)
s.Control <- &SubscribeCmd{s}
}
}
//UnSubscribe 取消订阅流
func (r *Stream) UnSubscribe(s *Subscriber) {
if r.Err() == nil {
r.Control <- &UnSubscribeCmd{s}
}
}
// Run 流运行
func (r *Stream) Run() {
Print(Green("Stream create:"), BrightCyan(r.StreamPath))
defer r.onClosed()
for {
select {
case <-r.Done():
return
case s := <-r.Control:
switch v := s.(type) {
case *UnSubscribeCmd:
if _, ok := r.Subscribers[v.ID]; ok {
delete(r.Subscribers, v.ID)
for i, val := range r.SubscriberInfo {
if val == &v.SubscriberInfo {
r.SubscriberInfo = append(r.SubscriberInfo[:i], r.SubscriberInfo[i+1:]...)
break
}
}
OnUnSubscribeHooks.Trigger(v.Subscriber)
Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo))))
if len(r.SubscriberInfo) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) {
r.Cancel()
}
}
case *SubscribeCmd:
//防止重复添加
if _, ok := r.Subscribers[v.ID]; !ok {
r.Subscribers[v.ID] = v.Subscriber
r.SubscriberInfo = append(r.SubscriberInfo, &v.SubscriberInfo)
Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo))))
OnSubscribeHooks.Trigger(v.Subscriber)
}
case *ChangeStreamCmd:
if _, ok := v.NewStream.Subscribers[v.ID]; !ok {
delete(r.Subscribers, v.ID)
v.NewStream.Subscribe(v.Subscriber)
if len(r.SubscriberInfo) == 0 && r.Publisher == nil {
r.Cancel()
}
}
}
}
}
}
// GetBuffer 获取用于写入的缓冲区
func (r *Stream) GetBuffer() *bytes.Buffer {
return r.AVRing.GetBuffer()
}
func (r *Stream) WriteASC(asc []byte) {
if r.AudioTag == nil {
r.AudioTag = NewAVPacket(FLV_TAG_TYPE_AUDIO)
r.AudioTag.IsSequence = true
r.AudioTag.Payload = append(append(r.AudioTag.Payload, 0xAF, 0), asc...)
} else {
r.AudioTag.Payload = append(r.AudioTag.Payload[:2], asc...)
}
config1 := asc[0]
config2 := asc[1]
r.AudioInfo.SoundFormat = 10
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
r.AudioInfo.SoundRate = SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]
r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道
//frameLengthFlag = (config2 >> 2) & 0x01
//dependsOnCoreCoder = (config2 >> 1) & 0x01
//extensionFlag = config2 & 0x01
}
// PushAudio 来自发布者推送的音频
func (r *Stream) PushAudio(timestamp uint32, payload []byte) {
audio := r.AVRing
payloadLen := len(payload)
audio.Type = FLV_TAG_TYPE_AUDIO
audio.Timestamp = timestamp
audio.Payload = payload
audio.IsKeyFrame = false
audio.IsSequence = false
if payloadLen < 4 {
return
}
if payload[0] == 0xFF && (payload[1]&0xF0) == 0xF0 {
//将ADTS转换成ASC
r.AudioInfo.SoundFormat = 10
r.AudioInfo.SoundRate = SamplingFrequencies[(payload[2]&0x3c)>>2]
r.AudioInfo.SoundType = ((payload[2] & 0x1) << 2) | ((payload[3] & 0xc0) >> 6)
r.AudioTag = audio.ADTS2ASC()
} else if r.AudioTag == nil && r.AudioInfo.SoundRate == 0 {
audio.IsSequence = true
// if payloadLen < 5 {
// return
// }
r.AudioTag = audio.AVPacket.Clone()
tmp := payload[0] // 第一个字节保存着音频的相关信息
if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息
//0 = AAC sequence header1 = AAC raw。
if aacPacketType := payload[1]; aacPacketType == 0 {
config1 := payload[2]
config2 := payload[3]
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
// 2 AAC LC ISO/IEC 14496-3 subpart 4
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
r.AudioInfo.SoundRate = SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)]
r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道
//frameLengthFlag = (config2 >> 2) & 0x01
//dependsOnCoreCoder = (config2 >> 1) & 0x01
//extensionFlag = config2 & 0x01
}
return
} else {
r.AudioInfo.SoundRate = SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz
r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道1立体声
}
}
if !r.UseTimestamp {
audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
}
lastTimestamp := audio.GetAt(r.AudioInfo.lastIndex).Timestamp
if lastTimestamp > 0 && lastTimestamp != audio.Timestamp {
r.AudioInfo.BPS = payloadLen * 1000 / int(audio.Timestamp-lastTimestamp)
}
r.AudioInfo.PacketCount++
audio.Number = r.AudioInfo.PacketCount
r.AudioInfo.lastIndex = audio.Index
audio.NextW()
if r.AudioInfo.PacketCount == 1 && (!*r.EnableVideo) {
close(r.WaitPub)
}
}
func (r *Stream) setH264Info(video *Ring) {
r.VideoTag = video.AVPacket.Clone()
if r.VideoInfo.CodecID != 7 {
return
}
var info AVCDecoderConfigurationRecord
//0:codec,1:IsAVCSequence,2~4:compositionTime
if _, err := info.Unmarshal(video.Payload[5:]); err == nil {
r.VideoInfo.SPSInfo, err = ParseSPS(info.SequenceParameterSetNALUnit)
r.SPS = info.SequenceParameterSetNALUnit
r.PPS = info.PictureParameterSetNALUnit
}
}
func (r *Stream) WriteSPS(sps []byte) {
lenSPS := len(sps)
r.SPS = sps
if r.VideoTag == nil {
r.VideoTag = NewAVPacket(FLV_TAG_TYPE_VIDEO)
r.VideoTag.IsSequence = true
r.VideoTag.IsKeyFrame = true
r.VideoTag.Payload = append(r.VideoTag.Payload, RTMP_AVC_HEAD...)
}
r.VideoInfo.SPSInfo, _ = ParseSPS(sps)
copy(r.VideoTag.Payload[6:], sps[1:4])
r.VideoTag.Payload = append(append(r.VideoTag.Payload[:10], 0xE1, byte(lenSPS>>8), byte(lenSPS)), sps...)
}
func (r *Stream) WritePPS(pps []byte) {
lenPPS := len(pps)
r.PPS = pps
r.VideoTag.Payload = append(append(r.VideoTag.Payload, 0x01, byte(lenPPS>>8), byte(lenPPS)), pps...)
}
// PushVideo 来自发布者推送的视频
func (r *Stream) PushVideo(timestamp uint32, payload []byte) {
payloadLen := len(payload)
if payloadLen < 3 {
return
}
video := r.AVRing
video.Type = FLV_TAG_TYPE_VIDEO
video.Timestamp = timestamp
video.Payload = payload
videoFrameType := payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
r.VideoInfo.CodecID = payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC...
video.IsSequence = videoFrameType == 1 && payload[1] == 0
video.IsKeyFrame = videoFrameType == 1 || videoFrameType == 4
r.VideoInfo.PacketCount++
video.Number = r.VideoInfo.PacketCount
if r.VideoTag == nil {
if video.IsSequence {
r.setH264Info(video)
} else {
log.Println("no AVCSequence")
}
} else {
//更换AVCSequence
if video.IsSequence {
r.setH264Info(video)
}
if video.IsKeyFrame {
if r.FirstScreen == nil {
defer close(r.WaitPub)
r.FirstScreen = video.Clone()
} else {
oldNumber := r.FirstScreen.Number
r.FirstScreen.GoTo(video.Index)
r.VideoInfo.GOP = r.FirstScreen.Number - oldNumber
}
}
if !r.UseTimestamp {
video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
}
lastTimestamp := video.GetAt(r.VideoInfo.lastIndex).Timestamp
if lastTimestamp > 0 && lastTimestamp != video.Timestamp {
r.VideoInfo.BPS = payloadLen * 1000 / int(video.Timestamp-lastTimestamp)
}
r.VideoInfo.lastIndex = video.Index
video.NextW()
}
}