From c2ff0bbcaef3b0bcc83f66e4aeb30b572bc4e394 Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Sun, 14 Feb 2021 22:56:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=A7=E6=94=B9=E7=89=88?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- audio_track.go | 40 +----- base_track.go | 58 +------- go.mod | 2 +- go.sum | 4 +- audio_track.go2 => go2/audio_track.go2 | 0 base_track.go2 => go2/base_track.go2 | 0 hook.go2 => go2/hook.go2 | 0 video_track.go2 => go2/video_track.go2 | 0 hook.go | 13 +- main.go | 16 +-- publisher.go | 20 ++- stream.go | 61 ++++----- subscriber.go | 176 ++++++++++++++++++++++--- video_track.go | 82 ++++-------- 14 files changed, 250 insertions(+), 222 deletions(-) rename audio_track.go2 => go2/audio_track.go2 (100%) rename base_track.go2 => go2/base_track.go2 (100%) rename hook.go2 => go2/hook.go2 (100%) rename video_track.go2 => go2/video_track.go2 (100%) diff --git a/audio_track.go b/audio_track.go index 8ecf01b..057c23c 100644 --- a/audio_track.go +++ b/audio_track.go @@ -1,16 +1,12 @@ package engine -import ( - "context" -) - type AudioPack struct { Timestamp uint32 Payload []byte SequenceNumber uint16 } type AudioTrack struct { - Track_Audio + Track_Audio SoundFormat byte //4bit SoundRate int //2bit SoundSize byte //1bit @@ -30,33 +26,9 @@ func (at *AudioTrack) Push(timestamp uint32, payload []byte) { at.Track_Audio.GetBPS(payloadLen) audio.NextW() } -func (at *AudioTrack) Play(ctx context.Context, callback func(AudioPack)) { - ring := at.Buffer.SubRing(at.Buffer.Index) - ring.Current.Wait() - droped := 0 - var action, send func() - drop := func() { - if at.Buffer.Index-ring.Index < 10 { - action = send - } else { - droped++ - } - } - send = func() { - callback(ring.Current.AudioPack) - //s.BufferLength = pIndex - ring.Index - //s.Delay = s.AVRing.Timestamp - packet.Timestamp - if at.Buffer.Index-ring.Index > 128 { - action = drop - } - } - for action = send; ; ring.NextR() { - select { - case <-ctx.Done(): - return - default: - action() - } - } -} +func NewAudioTrack() *AudioTrack { + var result AudioTrack + result.Buffer = NewRing_Audio() + return &result +} \ No newline at end of file diff --git a/base_track.go b/base_track.go index cf89dc2..de74173 100644 --- a/base_track.go +++ b/base_track.go @@ -1,11 +1,5 @@ package engine -import ( - "context" - - "github.com/Monibuca/utils/v3/codec" -) - type Track interface { Push(uint32, []byte) } @@ -39,54 +33,4 @@ func (t *Track_Video) GetBPS(payloadLen int) { t.BPS = payloadLen * 1000 / int(t.Buffer.Current.Timestamp-lastTimestamp) } t.lastIndex = t.Buffer.Index -} - -type TrackCP struct { - Audio *AudioTrack - Video *VideoTrack -} - -func (tcp *TrackCP) Play(ctx context.Context, cba func(AudioPack), cbv func(VideoPack)) { - select { - case <-tcp.Video.WaitFirst: - case <-ctx.Done(): - return - } - vr := tcp.Video.Buffer.SubRing(tcp.Video.FirstScreen) - ar := tcp.Audio.Buffer.SubRing(tcp.Audio.Buffer.Index) - vr.Current.Wait() - ar.Current.Wait() - dropping := false - send_audio := func() { - cba(ar.Current.AudioPack) - if tcp.Audio.Buffer.Index-ar.Index > 128 { - dropping = true - } - } - send_video := func() { - cbv(vr.Current.VideoPack) - if tcp.Video.Buffer.Index-vr.Index > 128 { - dropping = true - } - } - for { - select { - case <-ctx.Done(): - return - default: - if ar.Current.Timestamp > vr.Current.Timestamp { - if !dropping { - send_video() - } else if vr.Current.NalType == codec.NALU_IDR_Picture { - dropping = false - } - vr.NextR() - } else { - if !dropping { - send_audio() - } - ar.NextR() - } - } - } -} +} \ No newline at end of file diff --git a/go.mod b/go.mod index b5b1061..680a750 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.13 require ( github.com/BurntSushi/toml v0.3.1 - github.com/Monibuca/utils/v3 v3.0.0-alpha3 + github.com/Monibuca/utils/v3 v3.0.0-alpha4 github.com/logrusorgru/aurora v2.0.3+incompatible github.com/pkg/errors v0.9.1 ) diff --git a/go.sum b/go.sum index 466b4fa..fd41663 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/Monibuca/utils/v3 v3.0.0-alpha3 h1:n4Sq7mS1Iz8oBj2BcV4sXgKbZgix0fFLvjAfXYoiXl0= -github.com/Monibuca/utils/v3 v3.0.0-alpha3/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= +github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04= +github.com/Monibuca/utils/v3 v3.0.0-alpha4/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= diff --git a/audio_track.go2 b/go2/audio_track.go2 similarity index 100% rename from audio_track.go2 rename to go2/audio_track.go2 diff --git a/base_track.go2 b/go2/base_track.go2 similarity index 100% rename from base_track.go2 rename to go2/base_track.go2 diff --git a/hook.go2 b/go2/hook.go2 similarity index 100% rename from hook.go2 rename to go2/hook.go2 diff --git a/video_track.go2 b/go2/video_track.go2 similarity index 100% rename from video_track.go2 rename to go2/video_track.go2 diff --git a/hook.go b/hook.go index 1ed1cd0..07fa60e 100644 --- a/hook.go +++ b/hook.go @@ -11,12 +11,17 @@ type Hook struct { Name string Payload interface{} } +type TransCodeReq struct { + *Subscriber + RequestCodec string +} const ( - HOOK_SUBSCRIBE = "Subscribe" - HOOK_UNSUBSCRIBE = "UnSubscibe" - HOOK_STREAMCLOSE = "StreamClose" - HOOK_PUBLISH = "Publish" + HOOK_SUBSCRIBE = "Subscribe" + HOOK_UNSUBSCRIBE = "UnSubscibe" + HOOK_STREAMCLOSE = "StreamClose" + HOOK_PUBLISH = "Publish" + HOOK_REQUEST_TRANSAUDIO = "RequestTransAudio" ) var Hooks = NewRing_Hook() diff --git a/main.go b/main.go index e188da1..8a8f706 100644 --- a/main.go +++ b/main.go @@ -21,15 +21,15 @@ const Version = "3.0.1" var ( config = &struct { - EnableWaitStream bool - EnableAudio bool - EnableVideo bool - PublishTimeout time.Duration - }{true, true, true, time.Minute} + EnableAudio bool + EnableVideo bool + PublishTimeout time.Duration + }{true, true, time.Minute} // ConfigRaw 配置信息的原始数据 - ConfigRaw []byte - StartTime time.Time //启动时间 - Plugins = make(map[string]*PluginConfig) // Plugins 所有的插件配置 + ConfigRaw []byte + StartTime time.Time //启动时间 + Plugins = make(map[string]*PluginConfig) // Plugins 所有的插件配置 + HasTranscoder bool ) //PluginConfig 插件配置定义 diff --git a/publisher.go b/publisher.go index 40559f4..8c93f22 100644 --- a/publisher.go +++ b/publisher.go @@ -8,10 +8,12 @@ import ( // Publisher 发布者实体定义 type Publisher struct { context.Context - cancel context.CancelFunc - AutoUnPublish bool // 当无人订阅时自动停止发布 - *Stream `json:"-"` - Type string //类型,用来区分不同的发布者 + cancel context.CancelFunc + AutoUnPublish bool // 当无人订阅时自动停止发布 + *Stream `json:"-"` + Type string //类型,用来区分不同的发布者 + OriginVideoTrack *VideoTrack //原始视频轨 + OriginAudioTrack *AudioTrack //原始音频轨 } // Close 关闭发布者 @@ -23,11 +25,17 @@ func (p *Publisher) Close() { // Dispose 释放RingBuffer的锁,防止订阅者一直阻塞读取 func (p *Publisher) Dispose() { + p.OriginVideoTrack.Buffer.Current.Done() + p.OriginAudioTrack.Buffer.Current.Done() for _, vt := range p.Stream.VideoTracks { - vt.Buffer.Current.Done() + if vt != p.OriginVideoTrack { + vt.Buffer.Current.Done() + } } for _, at := range p.Stream.AudioTracks { - at.Buffer.Current.Done() + if at != p.OriginAudioTrack { + at.Buffer.Current.Done() + } } } diff --git a/stream.go b/stream.go index 7282e5d..0478f72 100644 --- a/stream.go +++ b/stream.go @@ -24,22 +24,12 @@ func FindStream(streamPath string) *Stream { func GetStream(streamPath string) (result *Stream) { item, loaded := Streams.LoadOrStore(streamPath, &Stream{ StreamPath: streamPath, - HasVideo: true, - HasAudio: true, - EnableAudio: &config.EnableAudio, - EnableVideo: &config.EnableVideo, + AudioTracks: make(map[string]*AudioTrack), + VideoTracks: make(map[string]*VideoTrack), }) result = item.(*Stream) if !loaded { result.Context, result.cancel = context.WithCancel(context.Background()) - if config.EnableVideo { - result.EnableVideo = &result.HasVideo - } - if config.EnableAudio { - result.EnableAudio = &result.HasAudio - } - result.AddVideoTrack() - result.AddAudioTrack() utils.Print(Green("Stream create:"), BrightCyan(streamPath)) } return @@ -53,28 +43,33 @@ type Stream struct { StartTime time.Time //流的创建时间 *Publisher Subscribers []*Subscriber // 订阅者 - VideoTracks []*VideoTrack - AudioTracks []*AudioTrack - HasAudio bool - HasVideo bool - EnableVideo *bool - EnableAudio *bool + VideoTracks map[string]*VideoTrack + AudioTracks map[string]*AudioTrack subscribeMutex sync.Mutex + audioRW sync.RWMutex + videoRW sync.RWMutex } -func (r *Stream) AddVideoTrack() (vt *VideoTrack) { - vt = new(VideoTrack) - vt.WaitFirst = make(chan struct{}) - vt.Buffer = NewRing_Video() - r.VideoTracks = append(r.VideoTracks, vt) - return +func (r *Stream) AddVideoTrack(codec string, vt *VideoTrack) *VideoTrack { + if vt == nil { + vt = NewVideoTrack() + } + r.videoRW.Lock() + r.VideoTracks[codec] = vt + r.videoRW.Unlock() + return vt } -func (r *Stream) AddAudioTrack() (at *AudioTrack) { - at = new(AudioTrack) - at.Buffer = NewRing_Audio() - r.AudioTracks = append(r.AudioTracks, at) - return + +func (r *Stream) AddAudioTrack(codec string, at *AudioTrack) *AudioTrack { + if at == nil { + at = NewAudioTrack() + } + r.audioRW.Lock() + r.AudioTracks[codec] = at + r.audioRW.Unlock() + return at } + func (r *Stream) Close() { r.cancel() utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) @@ -86,7 +81,7 @@ func (r *Stream) Close() { func (r *Stream) Subscribe(s *Subscriber) { if s.Stream = r; r.Err() == nil { s.SubscribeTime = time.Now() - utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(r.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) + utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) s.Context, s.cancel = context.WithCancel(r) r.subscribeMutex.Lock() r.Subscribers = append(r.Subscribers, s) @@ -117,9 +112,3 @@ func DeleteSliceItem_Subscriber(slice []*Subscriber, item *Subscriber) []*Subscr } return slice } -func (r *Stream) PushVideo(ts uint32, payload []byte) { - r.VideoTracks[0].Push(ts, payload) -} -func (r *Stream) PushAudio(ts uint32, payload []byte) { - r.AudioTracks[0].Push(ts, payload) -} diff --git a/subscriber.go b/subscriber.go index e56c9fd..b2cfadd 100644 --- a/subscriber.go +++ b/subscriber.go @@ -2,14 +2,16 @@ package engine import ( "context" - "encoding/json" "time" + "github.com/Monibuca/utils/v3/codec" "github.com/pkg/errors" ) -// SubscriberInfo 订阅者可序列化信息,用于控制台输出 -type SubscriberInfo struct { +// Subscriber 订阅者实体定义 +type Subscriber struct { + context.Context + *Stream `json:"-"` ID string TotalDrop int //总丢帧 TotalPacket int @@ -17,19 +19,12 @@ type SubscriberInfo struct { BufferLength int Delay uint32 SubscribeTime time.Time -} - -// Subscriber 订阅者实体定义 -type Subscriber struct { - context.Context - *Stream `json:"-"` - SubscriberInfo - cancel context.CancelFunc - Sign string - OffsetTime uint32 - startTime uint32 - vtIndex int //第几个视频轨 - atIndex int //第几个音频轨 + cancel context.CancelFunc + Sign string + OffsetTime uint32 + startTime uint32 + OnAudio func(pack AudioPack) `json:"-"` + OnVideo func(pack VideoPack) `json:"-"` } // IsClosed 检查订阅者是否已经关闭 @@ -44,14 +39,41 @@ func (s *Subscriber) Close() { s.cancel() } } - -func (s *Subscriber) MarshalJSON() ([]byte, error) { - return json.Marshal(s.SubscriberInfo) +func (r *Subscriber) GetVideoTrack(codec string) *VideoTrack { + if !config.EnableVideo { + return nil + } + r.videoRW.RLock() + defer r.videoRW.RUnlock() + return r.VideoTracks[codec] +} +func (s *Subscriber) GetAudioTrack(codecs ...string) (at *AudioTrack) { + if !config.EnableAudio { + return nil + } + if HasTranscoder { + s.audioRW.Lock() + defer s.audioRW.Unlock() + } else { + s.audioRW.RLock() + defer s.audioRW.RUnlock() + } + for _, codec := range codecs { + if at, ok := s.AudioTracks[codec]; ok { + return at + } + } + if HasTranscoder { + at = s.AddAudioTrack(codecs[0], nil) + at.SoundFormat = codec.Codec2SoundFormat[codecs[0]] + TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}}) + } + return } -//Subscribe 开始订阅 +//Subscribe 开始订阅 将Subscriber与Stream关联 func (s *Subscriber) Subscribe(streamPath string) error { - if !config.EnableWaitStream && FindStream(streamPath) == nil { + if FindStream(streamPath) == nil { return errors.Errorf("Stream not found:%s", streamPath) } GetStream(streamPath).Subscribe(s) @@ -60,3 +82,115 @@ func (s *Subscriber) Subscribe(streamPath string) error { } return nil } + +//Play 开始播放 +func (s *Subscriber) Play(ctx context.Context, at *AudioTrack, vt *VideoTrack) { + defer s.Close() + if vt == nil && at == nil { + return + } + if vt == nil { + s.PlayAudio(ctx, at) + return + } else if at == nil { + s.PlayVideo(ctx, vt) + return + } + select { + case <-vt.WaitFirst: //等待获取到第一个关键帧 + case <-s.Context.Done(): + return + case <-ctx.Done(): //可能等不到关键帧就退出了 + return + } + vr := vt.Buffer.SubRing(vt.FirstScreen) //从关键帧开始读取,首屏秒开 + vr.Current.Wait() //等到RingBuffer可读 + ar := at.Buffer.SubRing(at.Buffer.Index) + ar.Current.Wait() + dropping := false //是否处于丢帧中 + send_audio := func() { + s.OnAudio(ar.Current.AudioPack) + if at.Buffer.Index-ar.Index > 128 { + dropping = true + } + } + send_video := func() { + s.OnVideo(vr.Current.VideoPack) + if vt.Buffer.Index-vr.Index > 128 { + dropping = true + } + } + for ctx.Err() == nil && s.Context.Err() == nil { + if ar.Current.Timestamp > vr.Current.Timestamp || ar.Current.Timestamp == 0 { + if !dropping { + send_video() + } else if vr.Current.NalType == codec.NALU_IDR_Picture { + dropping = false + } + vr.NextR() + } else { + if !dropping { + send_audio() + } + ar.NextR() + } + } +} +func (s *Subscriber) PlayAudio(ctx context.Context, at *AudioTrack) { + ring := at.Buffer.SubRing(at.Buffer.Index) + ring.Current.Wait() + droped := 0 + var action, send func() + drop := func() { + if at.Buffer.Index-ring.Index < 10 { + action = send + } else { + droped++ + } + } + send = func() { + s.OnAudio(ring.Current.AudioPack) + + //s.BufferLength = pIndex - ring.Index + //s.Delay = s.AVRing.Timestamp - packet.Timestamp + if at.Buffer.Index-ring.Index > 128 { + action = drop + } + } + for action = send; ctx.Err() == nil && s.Context.Err() == nil; ring.NextR() { + action() + } +} + +func (s *Subscriber) PlayVideo(ctx context.Context, vt *VideoTrack) { + select { + case <-vt.WaitFirst: + case <-s.Context.Done(): + return + case <-ctx.Done(): //可能等不到关键帧就退出了 + return + } + ring := vt.Buffer.SubRing(vt.FirstScreen) + ring.Current.Wait() + droped := 0 + var action, send func() + drop := func() { + if ring.Current.NalType == codec.NALU_IDR_Picture { + action = send + } else { + droped++ + } + } + send = func() { + s.OnVideo(ring.Current.VideoPack) + pIndex := vt.Buffer.Index + //s.BufferLength = pIndex - ring.Index + //s.Delay = s.AVRing.Timestamp - packet.Timestamp + if pIndex-ring.Index > 128 { + action = drop + } + } + for action = send; ctx.Err() == nil && s.Context.Err() == nil; ring.NextR() { + action() + } +} diff --git a/video_track.go b/video_track.go index e96e602..ff5b23a 100644 --- a/video_track.go +++ b/video_track.go @@ -1,7 +1,6 @@ package engine import ( - "context" "encoding/binary" "github.com/Monibuca/utils/v3" @@ -28,12 +27,22 @@ type VideoPack struct { type VideoTrack struct { FirstScreen byte //最近的关键帧位置,首屏渲染 Track_Video - SPS []byte - PPS []byte + SPS []byte `json:"-"` + PPS []byte `json:"-"` SPSInfo codec.SPSInfo - GOP byte //关键帧间隔 - RtmpTag []byte //rtmp需要先发送一个序列帧,包含SPS和PPS - WaitFirst chan struct{} + GOP byte //关键帧间隔 + RtmpTag []byte `json:"-"` //rtmp需要先发送一个序列帧,包含SPS和PPS + WaitFirst chan struct{} `json:"-"` + revIDR func() +} + +func NewVideoTrack() *VideoTrack { + result := &VideoTrack{ + WaitFirst: make(chan struct{}), + } + result.Buffer = NewRing_Video() + result.revIDR = result.firstRevIDR + return result } // Push 来自发布者推送的视频 @@ -79,18 +88,13 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) { vt.SPSInfo, _ = codec.ParseSPS(payload) case codec.NALU_PPS: vt.PPS = payload - + if vt.RtmpTag == nil { + vt.SetRtmpTag() + } case codec.NALU_Access_Unit_Delimiter: case codec.NALU_IDR_Picture: - if vt.RtmpTag == nil { - vt.FirstScreen = vbr.Index - vt.setRtmpTag() - close(vt.WaitFirst) - } else { - vt.GOP = vbr.Index - vt.FirstScreen - vt.FirstScreen = vbr.Index - } + vt.revIDR() fallthrough case codec.NALU_Non_IDR_Picture: video.Payload = payload @@ -102,8 +106,16 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) { utils.Printf("nalType not support yet:%d", video.NalType) } } - -func (vt *VideoTrack) setRtmpTag() { +func (vt *VideoTrack) firstRevIDR() { + vt.FirstScreen = vt.Buffer.Index + close(vt.WaitFirst) + vt.revIDR = vt.afterRevIDR +} +func (vt *VideoTrack) afterRevIDR() { + vt.GOP = vt.Buffer.Index - vt.FirstScreen + vt.FirstScreen = vt.Buffer.Index +} +func (vt *VideoTrack) SetRtmpTag() { lenSPS, lenPPS := len(vt.SPS), len(vt.PPS) vt.RtmpTag = append([]byte{}, codec.RTMP_AVC_HEAD...) copy(vt.RtmpTag[6:], vt.SPS[1:4]) @@ -111,39 +123,3 @@ func (vt *VideoTrack) setRtmpTag() { vt.RtmpTag = append(vt.RtmpTag, vt.SPS...) vt.RtmpTag = append(append(vt.RtmpTag, 0x01, byte(lenPPS>>8), byte(lenPPS)), vt.PPS...) } - -func (vt *VideoTrack) Play(ctx context.Context, callback func(VideoPack)) { - select { - case <-vt.WaitFirst: - case <-ctx.Done(): - return - } - ring := vt.Buffer.SubRing(vt.FirstScreen) - ring.Current.Wait() - droped := 0 - var action, send func() - drop := func() { - if ring.Current.NalType == codec.NALU_IDR_Picture { - action = send - } else { - droped++ - } - } - send = func() { - callback(ring.Current.VideoPack) - pIndex := vt.Buffer.Index - //s.BufferLength = pIndex - ring.Index - //s.Delay = s.AVRing.Timestamp - packet.Timestamp - if pIndex-ring.Index > 128 { - action = drop - } - } - for action = send; ; ring.NextR() { - select { - case <-ctx.Done(): - return - default: - action() - } - } -}