大改版

This commit is contained in:
langhuihui
2021-02-14 22:56:17 +08:00
parent 70706fc8da
commit c2ff0bbcae
14 changed files with 250 additions and 222 deletions

View File

@@ -1,9 +1,5 @@
package engine package engine
import (
"context"
)
type AudioPack struct { type AudioPack struct {
Timestamp uint32 Timestamp uint32
Payload []byte Payload []byte
@@ -30,33 +26,9 @@ func (at *AudioTrack) Push(timestamp uint32, payload []byte) {
at.Track_Audio.GetBPS(payloadLen) at.Track_Audio.GetBPS(payloadLen)
audio.NextW() audio.NextW()
} }
func (at *AudioTrack) Play(ctx context.Context, callback func(AudioPack)) {
ring := at.Buffer.SubRing(at.Buffer.Index)
ring.Current.Wait()
droped := 0
var action, send func()
drop := func() {
if at.Buffer.Index-ring.Index < 10 {
action = send
} else {
droped++
}
}
send = func() {
callback(ring.Current.AudioPack)
//s.BufferLength = pIndex - ring.Index func NewAudioTrack() *AudioTrack {
//s.Delay = s.AVRing.Timestamp - packet.Timestamp var result AudioTrack
if at.Buffer.Index-ring.Index > 128 { result.Buffer = NewRing_Audio()
action = drop return &result
}
}
for action = send; ; ring.NextR() {
select {
case <-ctx.Done():
return
default:
action()
}
}
} }

View File

@@ -1,11 +1,5 @@
package engine package engine
import (
"context"
"github.com/Monibuca/utils/v3/codec"
)
type Track interface { type Track interface {
Push(uint32, []byte) Push(uint32, []byte)
} }
@@ -40,53 +34,3 @@ func (t *Track_Video) GetBPS(payloadLen int) {
} }
t.lastIndex = t.Buffer.Index t.lastIndex = t.Buffer.Index
} }
type TrackCP struct {
Audio *AudioTrack
Video *VideoTrack
}
func (tcp *TrackCP) Play(ctx context.Context, cba func(AudioPack), cbv func(VideoPack)) {
select {
case <-tcp.Video.WaitFirst:
case <-ctx.Done():
return
}
vr := tcp.Video.Buffer.SubRing(tcp.Video.FirstScreen)
ar := tcp.Audio.Buffer.SubRing(tcp.Audio.Buffer.Index)
vr.Current.Wait()
ar.Current.Wait()
dropping := false
send_audio := func() {
cba(ar.Current.AudioPack)
if tcp.Audio.Buffer.Index-ar.Index > 128 {
dropping = true
}
}
send_video := func() {
cbv(vr.Current.VideoPack)
if tcp.Video.Buffer.Index-vr.Index > 128 {
dropping = true
}
}
for {
select {
case <-ctx.Done():
return
default:
if ar.Current.Timestamp > vr.Current.Timestamp {
if !dropping {
send_video()
} else if vr.Current.NalType == codec.NALU_IDR_Picture {
dropping = false
}
vr.NextR()
} else {
if !dropping {
send_audio()
}
ar.NextR()
}
}
}
}

2
go.mod
View File

@@ -4,7 +4,7 @@ go 1.13
require ( require (
github.com/BurntSushi/toml v0.3.1 github.com/BurntSushi/toml v0.3.1
github.com/Monibuca/utils/v3 v3.0.0-alpha3 github.com/Monibuca/utils/v3 v3.0.0-alpha4
github.com/logrusorgru/aurora v2.0.3+incompatible github.com/logrusorgru/aurora v2.0.3+incompatible
github.com/pkg/errors v0.9.1 github.com/pkg/errors v0.9.1
) )

4
go.sum
View File

@@ -1,7 +1,7 @@
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Monibuca/utils/v3 v3.0.0-alpha3 h1:n4Sq7mS1Iz8oBj2BcV4sXgKbZgix0fFLvjAfXYoiXl0= github.com/Monibuca/utils/v3 v3.0.0-alpha4 h1:pecYA89kWmtGOeY6R99d4T1epPJ1wc+jFrrJY13VD04=
github.com/Monibuca/utils/v3 v3.0.0-alpha3/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c= github.com/Monibuca/utils/v3 v3.0.0-alpha4/go.mod h1:3xYmhQbgAZBHLyIMteUCd1va+1z/xnd72B585mCaT3c=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0=
github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo=
github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg=

13
hook.go
View File

@@ -11,12 +11,17 @@ type Hook struct {
Name string Name string
Payload interface{} Payload interface{}
} }
type TransCodeReq struct {
*Subscriber
RequestCodec string
}
const ( const (
HOOK_SUBSCRIBE = "Subscribe" HOOK_SUBSCRIBE = "Subscribe"
HOOK_UNSUBSCRIBE = "UnSubscibe" HOOK_UNSUBSCRIBE = "UnSubscibe"
HOOK_STREAMCLOSE = "StreamClose" HOOK_STREAMCLOSE = "StreamClose"
HOOK_PUBLISH = "Publish" HOOK_PUBLISH = "Publish"
HOOK_REQUEST_TRANSAUDIO = "RequestTransAudio"
) )
var Hooks = NewRing_Hook() var Hooks = NewRing_Hook()

16
main.go
View File

@@ -21,15 +21,15 @@ const Version = "3.0.1"
var ( var (
config = &struct { config = &struct {
EnableWaitStream bool EnableAudio bool
EnableAudio bool EnableVideo bool
EnableVideo bool PublishTimeout time.Duration
PublishTimeout time.Duration }{true, true, time.Minute}
}{true, true, true, time.Minute}
// ConfigRaw 配置信息的原始数据 // ConfigRaw 配置信息的原始数据
ConfigRaw []byte ConfigRaw []byte
StartTime time.Time //启动时间 StartTime time.Time //启动时间
Plugins = make(map[string]*PluginConfig) // Plugins 所有的插件配置 Plugins = make(map[string]*PluginConfig) // Plugins 所有的插件配置
HasTranscoder bool
) )
//PluginConfig 插件配置定义 //PluginConfig 插件配置定义

View File

@@ -8,10 +8,12 @@ import (
// Publisher 发布者实体定义 // Publisher 发布者实体定义
type Publisher struct { type Publisher struct {
context.Context context.Context
cancel context.CancelFunc cancel context.CancelFunc
AutoUnPublish bool // 当无人订阅时自动停止发布 AutoUnPublish bool // 当无人订阅时自动停止发布
*Stream `json:"-"` *Stream `json:"-"`
Type string //类型,用来区分不同的发布者 Type string //类型,用来区分不同的发布者
OriginVideoTrack *VideoTrack //原始视频轨
OriginAudioTrack *AudioTrack //原始音频轨
} }
// Close 关闭发布者 // Close 关闭发布者
@@ -23,11 +25,17 @@ func (p *Publisher) Close() {
// Dispose 释放RingBuffer的锁防止订阅者一直阻塞读取 // Dispose 释放RingBuffer的锁防止订阅者一直阻塞读取
func (p *Publisher) Dispose() { func (p *Publisher) Dispose() {
p.OriginVideoTrack.Buffer.Current.Done()
p.OriginAudioTrack.Buffer.Current.Done()
for _, vt := range p.Stream.VideoTracks { for _, vt := range p.Stream.VideoTracks {
vt.Buffer.Current.Done() if vt != p.OriginVideoTrack {
vt.Buffer.Current.Done()
}
} }
for _, at := range p.Stream.AudioTracks { for _, at := range p.Stream.AudioTracks {
at.Buffer.Current.Done() if at != p.OriginAudioTrack {
at.Buffer.Current.Done()
}
} }
} }

View File

@@ -24,22 +24,12 @@ func FindStream(streamPath string) *Stream {
func GetStream(streamPath string) (result *Stream) { func GetStream(streamPath string) (result *Stream) {
item, loaded := Streams.LoadOrStore(streamPath, &Stream{ item, loaded := Streams.LoadOrStore(streamPath, &Stream{
StreamPath: streamPath, StreamPath: streamPath,
HasVideo: true, AudioTracks: make(map[string]*AudioTrack),
HasAudio: true, VideoTracks: make(map[string]*VideoTrack),
EnableAudio: &config.EnableAudio,
EnableVideo: &config.EnableVideo,
}) })
result = item.(*Stream) result = item.(*Stream)
if !loaded { if !loaded {
result.Context, result.cancel = context.WithCancel(context.Background()) result.Context, result.cancel = context.WithCancel(context.Background())
if config.EnableVideo {
result.EnableVideo = &result.HasVideo
}
if config.EnableAudio {
result.EnableAudio = &result.HasAudio
}
result.AddVideoTrack()
result.AddAudioTrack()
utils.Print(Green("Stream create:"), BrightCyan(streamPath)) utils.Print(Green("Stream create:"), BrightCyan(streamPath))
} }
return return
@@ -53,28 +43,33 @@ type Stream struct {
StartTime time.Time //流的创建时间 StartTime time.Time //流的创建时间
*Publisher *Publisher
Subscribers []*Subscriber // 订阅者 Subscribers []*Subscriber // 订阅者
VideoTracks []*VideoTrack VideoTracks map[string]*VideoTrack
AudioTracks []*AudioTrack AudioTracks map[string]*AudioTrack
HasAudio bool
HasVideo bool
EnableVideo *bool
EnableAudio *bool
subscribeMutex sync.Mutex subscribeMutex sync.Mutex
audioRW sync.RWMutex
videoRW sync.RWMutex
} }
func (r *Stream) AddVideoTrack() (vt *VideoTrack) { func (r *Stream) AddVideoTrack(codec string, vt *VideoTrack) *VideoTrack {
vt = new(VideoTrack) if vt == nil {
vt.WaitFirst = make(chan struct{}) vt = NewVideoTrack()
vt.Buffer = NewRing_Video() }
r.VideoTracks = append(r.VideoTracks, vt) r.videoRW.Lock()
return r.VideoTracks[codec] = vt
r.videoRW.Unlock()
return vt
} }
func (r *Stream) AddAudioTrack() (at *AudioTrack) {
at = new(AudioTrack) func (r *Stream) AddAudioTrack(codec string, at *AudioTrack) *AudioTrack {
at.Buffer = NewRing_Audio() if at == nil {
r.AudioTracks = append(r.AudioTracks, at) at = NewAudioTrack()
return }
r.audioRW.Lock()
r.AudioTracks[codec] = at
r.audioRW.Unlock()
return at
} }
func (r *Stream) Close() { func (r *Stream) Close() {
r.cancel() r.cancel()
utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) utils.Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
@@ -86,7 +81,7 @@ func (r *Stream) Close() {
func (r *Stream) Subscribe(s *Subscriber) { func (r *Stream) Subscribe(s *Subscriber) {
if s.Stream = r; r.Err() == nil { if s.Stream = r; r.Err() == nil {
s.SubscribeTime = time.Now() s.SubscribeTime = time.Now()
utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(r.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) utils.Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.Context, s.cancel = context.WithCancel(r) s.Context, s.cancel = context.WithCancel(r)
r.subscribeMutex.Lock() r.subscribeMutex.Lock()
r.Subscribers = append(r.Subscribers, s) r.Subscribers = append(r.Subscribers, s)
@@ -117,9 +112,3 @@ func DeleteSliceItem_Subscriber(slice []*Subscriber, item *Subscriber) []*Subscr
} }
return slice return slice
} }
func (r *Stream) PushVideo(ts uint32, payload []byte) {
r.VideoTracks[0].Push(ts, payload)
}
func (r *Stream) PushAudio(ts uint32, payload []byte) {
r.AudioTracks[0].Push(ts, payload)
}

View File

@@ -2,14 +2,16 @@ package engine
import ( import (
"context" "context"
"encoding/json"
"time" "time"
"github.com/Monibuca/utils/v3/codec"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
// SubscriberInfo 订阅者可序列化信息,用于控制台输出 // Subscriber 订阅者实体定义
type SubscriberInfo struct { type Subscriber struct {
context.Context
*Stream `json:"-"`
ID string ID string
TotalDrop int //总丢帧 TotalDrop int //总丢帧
TotalPacket int TotalPacket int
@@ -17,19 +19,12 @@ type SubscriberInfo struct {
BufferLength int BufferLength int
Delay uint32 Delay uint32
SubscribeTime time.Time SubscribeTime time.Time
} cancel context.CancelFunc
Sign string
// Subscriber 订阅者实体定义 OffsetTime uint32
type Subscriber struct { startTime uint32
context.Context OnAudio func(pack AudioPack) `json:"-"`
*Stream `json:"-"` OnVideo func(pack VideoPack) `json:"-"`
SubscriberInfo
cancel context.CancelFunc
Sign string
OffsetTime uint32
startTime uint32
vtIndex int //第几个视频轨
atIndex int //第几个音频轨
} }
// IsClosed 检查订阅者是否已经关闭 // IsClosed 检查订阅者是否已经关闭
@@ -44,14 +39,41 @@ func (s *Subscriber) Close() {
s.cancel() s.cancel()
} }
} }
func (r *Subscriber) GetVideoTrack(codec string) *VideoTrack {
func (s *Subscriber) MarshalJSON() ([]byte, error) { if !config.EnableVideo {
return json.Marshal(s.SubscriberInfo) return nil
}
r.videoRW.RLock()
defer r.videoRW.RUnlock()
return r.VideoTracks[codec]
}
func (s *Subscriber) GetAudioTrack(codecs ...string) (at *AudioTrack) {
if !config.EnableAudio {
return nil
}
if HasTranscoder {
s.audioRW.Lock()
defer s.audioRW.Unlock()
} else {
s.audioRW.RLock()
defer s.audioRW.RUnlock()
}
for _, codec := range codecs {
if at, ok := s.AudioTracks[codec]; ok {
return at
}
}
if HasTranscoder {
at = s.AddAudioTrack(codecs[0], nil)
at.SoundFormat = codec.Codec2SoundFormat[codecs[0]]
TriggerHook(Hook{HOOK_REQUEST_TRANSAUDIO, &TransCodeReq{s, codecs[0]}})
}
return
} }
//Subscribe 开始订阅 //Subscribe 开始订阅 将Subscriber与Stream关联
func (s *Subscriber) Subscribe(streamPath string) error { func (s *Subscriber) Subscribe(streamPath string) error {
if !config.EnableWaitStream && FindStream(streamPath) == nil { if FindStream(streamPath) == nil {
return errors.Errorf("Stream not found:%s", streamPath) return errors.Errorf("Stream not found:%s", streamPath)
} }
GetStream(streamPath).Subscribe(s) GetStream(streamPath).Subscribe(s)
@@ -60,3 +82,115 @@ func (s *Subscriber) Subscribe(streamPath string) error {
} }
return nil return nil
} }
//Play 开始播放
func (s *Subscriber) Play(ctx context.Context, at *AudioTrack, vt *VideoTrack) {
defer s.Close()
if vt == nil && at == nil {
return
}
if vt == nil {
s.PlayAudio(ctx, at)
return
} else if at == nil {
s.PlayVideo(ctx, vt)
return
}
select {
case <-vt.WaitFirst: //等待获取到第一个关键帧
case <-s.Context.Done():
return
case <-ctx.Done(): //可能等不到关键帧就退出了
return
}
vr := vt.Buffer.SubRing(vt.FirstScreen) //从关键帧开始读取,首屏秒开
vr.Current.Wait() //等到RingBuffer可读
ar := at.Buffer.SubRing(at.Buffer.Index)
ar.Current.Wait()
dropping := false //是否处于丢帧中
send_audio := func() {
s.OnAudio(ar.Current.AudioPack)
if at.Buffer.Index-ar.Index > 128 {
dropping = true
}
}
send_video := func() {
s.OnVideo(vr.Current.VideoPack)
if vt.Buffer.Index-vr.Index > 128 {
dropping = true
}
}
for ctx.Err() == nil && s.Context.Err() == nil {
if ar.Current.Timestamp > vr.Current.Timestamp || ar.Current.Timestamp == 0 {
if !dropping {
send_video()
} else if vr.Current.NalType == codec.NALU_IDR_Picture {
dropping = false
}
vr.NextR()
} else {
if !dropping {
send_audio()
}
ar.NextR()
}
}
}
func (s *Subscriber) PlayAudio(ctx context.Context, at *AudioTrack) {
ring := at.Buffer.SubRing(at.Buffer.Index)
ring.Current.Wait()
droped := 0
var action, send func()
drop := func() {
if at.Buffer.Index-ring.Index < 10 {
action = send
} else {
droped++
}
}
send = func() {
s.OnAudio(ring.Current.AudioPack)
//s.BufferLength = pIndex - ring.Index
//s.Delay = s.AVRing.Timestamp - packet.Timestamp
if at.Buffer.Index-ring.Index > 128 {
action = drop
}
}
for action = send; ctx.Err() == nil && s.Context.Err() == nil; ring.NextR() {
action()
}
}
func (s *Subscriber) PlayVideo(ctx context.Context, vt *VideoTrack) {
select {
case <-vt.WaitFirst:
case <-s.Context.Done():
return
case <-ctx.Done(): //可能等不到关键帧就退出了
return
}
ring := vt.Buffer.SubRing(vt.FirstScreen)
ring.Current.Wait()
droped := 0
var action, send func()
drop := func() {
if ring.Current.NalType == codec.NALU_IDR_Picture {
action = send
} else {
droped++
}
}
send = func() {
s.OnVideo(ring.Current.VideoPack)
pIndex := vt.Buffer.Index
//s.BufferLength = pIndex - ring.Index
//s.Delay = s.AVRing.Timestamp - packet.Timestamp
if pIndex-ring.Index > 128 {
action = drop
}
}
for action = send; ctx.Err() == nil && s.Context.Err() == nil; ring.NextR() {
action()
}
}

View File

@@ -1,7 +1,6 @@
package engine package engine
import ( import (
"context"
"encoding/binary" "encoding/binary"
"github.com/Monibuca/utils/v3" "github.com/Monibuca/utils/v3"
@@ -28,12 +27,22 @@ type VideoPack struct {
type VideoTrack struct { type VideoTrack struct {
FirstScreen byte //最近的关键帧位置,首屏渲染 FirstScreen byte //最近的关键帧位置,首屏渲染
Track_Video Track_Video
SPS []byte SPS []byte `json:"-"`
PPS []byte PPS []byte `json:"-"`
SPSInfo codec.SPSInfo SPSInfo codec.SPSInfo
GOP byte //关键帧间隔 GOP byte //关键帧间隔
RtmpTag []byte //rtmp需要先发送一个序列帧包含SPS和PPS RtmpTag []byte `json:"-"` //rtmp需要先发送一个序列帧包含SPS和PPS
WaitFirst chan struct{} WaitFirst chan struct{} `json:"-"`
revIDR func()
}
func NewVideoTrack() *VideoTrack {
result := &VideoTrack{
WaitFirst: make(chan struct{}),
}
result.Buffer = NewRing_Video()
result.revIDR = result.firstRevIDR
return result
} }
// Push 来自发布者推送的视频 // Push 来自发布者推送的视频
@@ -79,18 +88,13 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) {
vt.SPSInfo, _ = codec.ParseSPS(payload) vt.SPSInfo, _ = codec.ParseSPS(payload)
case codec.NALU_PPS: case codec.NALU_PPS:
vt.PPS = payload vt.PPS = payload
if vt.RtmpTag == nil {
vt.SetRtmpTag()
}
case codec.NALU_Access_Unit_Delimiter: case codec.NALU_Access_Unit_Delimiter:
case codec.NALU_IDR_Picture: case codec.NALU_IDR_Picture:
if vt.RtmpTag == nil { vt.revIDR()
vt.FirstScreen = vbr.Index
vt.setRtmpTag()
close(vt.WaitFirst)
} else {
vt.GOP = vbr.Index - vt.FirstScreen
vt.FirstScreen = vbr.Index
}
fallthrough fallthrough
case codec.NALU_Non_IDR_Picture: case codec.NALU_Non_IDR_Picture:
video.Payload = payload video.Payload = payload
@@ -102,8 +106,16 @@ func (vt *VideoTrack) Push(timestamp uint32, payload []byte) {
utils.Printf("nalType not support yet:%d", video.NalType) utils.Printf("nalType not support yet:%d", video.NalType)
} }
} }
func (vt *VideoTrack) firstRevIDR() {
func (vt *VideoTrack) setRtmpTag() { vt.FirstScreen = vt.Buffer.Index
close(vt.WaitFirst)
vt.revIDR = vt.afterRevIDR
}
func (vt *VideoTrack) afterRevIDR() {
vt.GOP = vt.Buffer.Index - vt.FirstScreen
vt.FirstScreen = vt.Buffer.Index
}
func (vt *VideoTrack) SetRtmpTag() {
lenSPS, lenPPS := len(vt.SPS), len(vt.PPS) lenSPS, lenPPS := len(vt.SPS), len(vt.PPS)
vt.RtmpTag = append([]byte{}, codec.RTMP_AVC_HEAD...) vt.RtmpTag = append([]byte{}, codec.RTMP_AVC_HEAD...)
copy(vt.RtmpTag[6:], vt.SPS[1:4]) copy(vt.RtmpTag[6:], vt.SPS[1:4])
@@ -111,39 +123,3 @@ func (vt *VideoTrack) setRtmpTag() {
vt.RtmpTag = append(vt.RtmpTag, vt.SPS...) vt.RtmpTag = append(vt.RtmpTag, vt.SPS...)
vt.RtmpTag = append(append(vt.RtmpTag, 0x01, byte(lenPPS>>8), byte(lenPPS)), vt.PPS...) vt.RtmpTag = append(append(vt.RtmpTag, 0x01, byte(lenPPS>>8), byte(lenPPS)), vt.PPS...)
} }
func (vt *VideoTrack) Play(ctx context.Context, callback func(VideoPack)) {
select {
case <-vt.WaitFirst:
case <-ctx.Done():
return
}
ring := vt.Buffer.SubRing(vt.FirstScreen)
ring.Current.Wait()
droped := 0
var action, send func()
drop := func() {
if ring.Current.NalType == codec.NALU_IDR_Picture {
action = send
} else {
droped++
}
}
send = func() {
callback(ring.Current.VideoPack)
pIndex := vt.Buffer.Index
//s.BufferLength = pIndex - ring.Index
//s.Delay = s.AVRing.Timestamp - packet.Timestamp
if pIndex-ring.Index > 128 {
action = drop
}
}
for action = send; ; ring.NextR() {
select {
case <-ctx.Done():
return
default:
action()
}
}
}