修改命名

This commit is contained in:
unknown
2020-05-01 11:41:16 +08:00
parent 3f1ce8466e
commit 6487eaddbf
8 changed files with 123 additions and 148 deletions

View File

@@ -17,14 +17,11 @@ type AVPacket struct {
Timestamp uint32
Type byte //8 audio,9 video
IsSequence bool //序列帧
VideoFrameType byte //4bit
IsKeyFrame bool//是否为关键帧
Payload []byte
Number int //编号audio和video独立编号
}
func (av *AVPacket) IsKeyFrame() bool {
return av.VideoFrameType == 1 || av.VideoFrameType == 4
}
func (av *AVPacket) ADTS2ASC() (tagPacket *AVPacket) {
tagPacket = NewAVPacket(FLV_TAG_TYPE_AUDIO)
tagPacket.Payload = ADTSToAudioSpecificConfig(av.Payload)

View File

@@ -14,6 +14,7 @@ const (
PLUGIN_SUBSCRIBER = 1 //订阅者插件
PLUGIN_PUBLISHER = 1 << 1 //发布者插件
PLUGIN_HOOK = 1 << 2 //钩子插件
PLUGIN_APP = 1 << 3 //应用插件
)
// Plugins 所有的插件配置
@@ -53,5 +54,5 @@ type ListenerConfig struct {
}
var config = &struct {
EnableWaitRoom bool
EnableWaitStream bool
}{true}

34
hook.go
View File

@@ -18,12 +18,12 @@ func (h AuthHook) Trigger(sign string) error {
var OnPublishHooks = make(OnPublishHook, 0)
type OnPublishHook []func(r *Room)
type OnPublishHook []func(r *Stream)
func (h OnPublishHook) AddHook(hook func(r *Room)) {
func (h OnPublishHook) AddHook(hook func(r *Stream)) {
OnPublishHooks = append(h, hook)
}
func (h OnPublishHook) Trigger(r *Room) {
func (h OnPublishHook) Trigger(r *Stream) {
for _, f := range h {
f(r)
}
@@ -31,12 +31,12 @@ func (h OnPublishHook) Trigger(r *Room) {
var OnSubscribeHooks = make(OnSubscribeHook, 0)
type OnSubscribeHook []func(s *OutputStream)
type OnSubscribeHook []func(s *Subscriber)
func (h OnSubscribeHook) AddHook(hook func(s *OutputStream)) {
func (h OnSubscribeHook) AddHook(hook func(s *Subscriber)) {
OnSubscribeHooks = append(h, hook)
}
func (h OnSubscribeHook) Trigger(s *OutputStream) {
func (h OnSubscribeHook) Trigger(s *Subscriber) {
for _, f := range h {
f(s)
}
@@ -44,12 +44,12 @@ func (h OnSubscribeHook) Trigger(s *OutputStream) {
var OnUnSubscribeHooks = make(OnUnSubscribeHook, 0)
type OnUnSubscribeHook []func(s *OutputStream)
type OnUnSubscribeHook []func(s *Subscriber)
func (h OnUnSubscribeHook) AddHook(hook func(s *OutputStream)) {
func (h OnUnSubscribeHook) AddHook(hook func(s *Subscriber)) {
OnUnSubscribeHooks = append(h, hook)
}
func (h OnUnSubscribeHook) Trigger(s *OutputStream) {
func (h OnUnSubscribeHook) Trigger(s *Subscriber) {
for _, f := range h {
f(s)
}
@@ -57,12 +57,12 @@ func (h OnUnSubscribeHook) Trigger(s *OutputStream) {
var OnDropHooks = make(OnDropHook, 0)
type OnDropHook []func(s *OutputStream)
type OnDropHook []func(s *Subscriber)
func (h OnDropHook) AddHook(hook func(s *OutputStream)) {
func (h OnDropHook) AddHook(hook func(s *Subscriber)) {
OnDropHooks = append(h, hook)
}
func (h OnDropHook) Trigger(s *OutputStream) {
func (h OnDropHook) Trigger(s *Subscriber) {
for _, f := range h {
f(s)
}
@@ -81,14 +81,14 @@ func (h OnSummaryHook) Trigger(v bool) {
}
}
var OnRoomClosedHooks = make(OnRoomClosedHook, 0)
var OnStreamClosedHooks = make(OnStreamClosedHook, 0)
type OnRoomClosedHook []func(*Room)
type OnStreamClosedHook []func(*Stream)
func (h OnRoomClosedHook) AddHook(hook func(*Room)) {
OnRoomClosedHooks = append(h, hook)
func (h OnStreamClosedHook) AddHook(hook func(*Stream)) {
OnStreamClosedHooks = append(h, hook)
}
func (h OnRoomClosedHook) Trigger(v *Room) {
func (h OnStreamClosedHook) Trigger(v *Stream) {
for _, f := range h {
f(v)
}

View File

@@ -1,51 +1,36 @@
package engine
import (
"log"
"reflect"
"time"
)
// Publisher 发布者接口
type Publisher interface {
OnClosed()
}
// InputStream 发布者实体定义
type InputStream struct {
*Room
// Publisher 发布者实体定义
type Publisher struct {
*Stream
}
// Close 关闭发布者
func (p *InputStream) Close() {
func (p *Publisher) Close() {
if p.Running() {
p.Cancel()
}
}
// Running 发布者是否正在发布
func (p *InputStream) Running() bool {
return p.Room != nil && p.Err() == nil
}
// OnClosed 发布者关闭事件,用于回收资源
func (p *InputStream) OnClosed() {
func (p *Publisher) Running() bool {
return p.Stream != nil && p.Err() == nil
}
// Publish 发布者进行发布操作
func (p *InputStream) Publish(streamPath string, publisher Publisher) bool {
p.Room = AllRoom.Get(streamPath)
func (p *Publisher) Publish(streamPath string) bool {
p.Stream = GetStream(streamPath)
//检查是否已存在发布者
if p.Publisher != nil {
return false
}
p.Publisher = publisher
//反射获取发布者类型信息
p.Type = reflect.ValueOf(publisher).Elem().Type().Name()
log.Printf("publish set :%s", p.Type)
p.Publisher = p
p.StartTime = time.Now()
//触发钩子
OnPublishHooks.Trigger(p.Room)
OnPublishHooks.Trigger(p.Stream)
return true
}

11
ring.go
View File

@@ -43,6 +43,17 @@ func (r *Ring) GoBack() {
r.Index--
r.RingItem = r.buffer[r.Index]
}
func (r *Ring) NextW() {
r.Index++
item := r.RingItem
r.RingItem = r.buffer[r.Index]
r.RingItem.Lock()
item.UnLock()
}
func (r *Ring) NextR() {
r.RingItem.RUnlock()
r.GoNext()
}
func (r Ring) Clone() *Ring {
return &r
}

View File

@@ -11,54 +11,54 @@ import (
)
var (
AllRoom = RoomCollection{}
roomCtxBg = context.Background()
streamCollection = Collection{}
)
// Collection 对sync.Map的包装
type RoomCollection struct {
type Collection struct {
sync.Map
}
// Get 根据流名称获取房间
func (c *RoomCollection) Get(name string) (result *Room) {
item, loaded := AllRoom.LoadOrStore(name, &Room{
Subscribers: make(map[string]*OutputStream),
//GetStream 根据流路径获取流,如果不存在则创建一个新的
func GetStream(streamPath string) (result *Stream) {
item, loaded := streamCollection.LoadOrStore(name, &Stream{
Subscribers: make(map[string]*Subscriber),
Control: make(chan interface{}),
AVRing: NewRing(),
WaitingMutex: new(sync.RWMutex),
RoomInfo: RoomInfo{
StreamPath: name,
StreamInfo: StreamInfo{
StreamPath: streamPath,
SubscriberInfo: make([]*SubscriberInfo, 0),
},
})
result = item.(*Room)
result = item.(*Stream)
if !loaded {
result.Context, result.Cancel = context.WithCancel(roomCtxBg)
Summary.Streams = append(Summary.Streams, &result.StreamInfo)
result.Context, result.Cancel = context.WithCancel(context.Background())
result.WaitingMutex.Lock() //等待发布者
go result.Run()
}
return
}
// Room 房间定义
type Room struct {
// Stream 流定义
type Stream struct {
context.Context
Publisher
RoomInfo //可序列化,供后台查看的数据
StreamInfo //可序列化,供后台查看的数据
Control chan interface{}
Cancel context.CancelFunc
Subscribers map[string]*OutputStream // 订阅者
VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen *Ring //最近的关键帧位置,首屏渲染
AVRing *Ring //数据环
WaitingMutex *sync.RWMutex //用于订阅和等待发布者
UseTimestamp bool //是否采用数据包中的时间戳
Subscribers map[string]*Subscriber // 订阅者
VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen *Ring //最近的关键帧位置,首屏渲染
AVRing *Ring //数据环
WaitingMutex *sync.RWMutex //用于订阅和等待发布者
UseTimestamp bool //是否采用数据包中的时间戳
}
// RoomInfo 房间可序列化信息,用于控制台显示
type RoomInfo struct {
// StreamInfo 可序列化信息,用于控制台显示
type StreamInfo struct {
StreamPath string
StartTime time.Time
SubscriberInfo []*SubscriberInfo
@@ -82,50 +82,47 @@ type RoomInfo struct {
// UnSubscribeCmd 取消订阅命令
type UnSubscribeCmd struct {
*OutputStream
*Subscriber
}
// SubscribeCmd 订阅房间命令
// SubscribeCmd 订阅命令
type SubscribeCmd struct {
*OutputStream
*Subscriber
}
// ChangeRoomCmd 切换房间命令
type ChangeRoomCmd struct {
*OutputStream
NewRoom *Room
// ChangeStreamCmd 切换命令
type ChangeStreamCmd struct {
*Subscriber
NewStream *Stream
}
func (r *Room) onClosed() {
Print(Yellow("room destoryed :"), BrightCyan(r.StreamPath))
AllRoom.Delete(r.StreamPath)
OnRoomClosedHooks.Trigger(r)
if r.Publisher != nil {
r.OnClosed()
}
func (r *Stream) onClosed() {
Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath))
streamCollection.Delete(r.StreamPath)
OnStreamClosedHooks.Trigger(r)
}
//Subscribe 订阅房间
func (r *Room) Subscribe(s *OutputStream) {
s.Room = r
//Subscribe 订阅
func (r *Stream) Subscribe(s *Subscriber) {
s.Stream = r
if r.Err() == nil {
s.SubscribeTime = time.Now()
Print(Sprintf(Yellow("subscribe :%s %s,to room %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath)))
s.Context, s.Cancel = context.WithCancel(r)
s.Control <- &SubscribeCmd{s}
}
}
//UnSubscribe 取消订阅房间
func (r *Room) UnSubscribe(s *OutputStream) {
//UnSubscribe 取消订阅
func (r *Stream) UnSubscribe(s *Subscriber) {
if r.Err() == nil {
r.Control <- &UnSubscribeCmd{s}
}
}
// Run 房间运行,转发逻辑
func (r *Room) Run() {
Print(Green("room create:"), BrightCyan(r.StreamPath))
// Run 流运行
func (r *Stream) Run() {
Print(Green("Stream create:"), BrightCyan(r.StreamPath))
defer r.onClosed()
for {
select {
@@ -145,7 +142,7 @@ func (r *Room) Run() {
}
copy(r.SubscriberInfo[hole:], r.SubscriberInfo[hole+1:])
r.SubscriberInfo = r.SubscriberInfo[:len(r.SubscriberInfo)-1]
OnUnSubscribeHooks.Trigger(v.OutputStream)
OnUnSubscribeHooks.Trigger(v.Subscriber)
Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo))))
if len(r.SubscriberInfo) == 0 && r.Publisher == nil {
r.Cancel()
@@ -154,15 +151,15 @@ func (r *Room) Run() {
case *SubscribeCmd:
//防止重复添加
if _, ok := r.Subscribers[v.ID]; !ok {
r.Subscribers[v.ID] = v.OutputStream
r.Subscribers[v.ID] = v.Subscriber
r.SubscriberInfo = append(r.SubscriberInfo, &v.SubscriberInfo)
Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo))))
OnSubscribeHooks.Trigger(v.OutputStream)
OnSubscribeHooks.Trigger(v.Subscriber)
}
case *ChangeRoomCmd:
if _, ok := v.NewRoom.Subscribers[v.ID]; !ok {
case *ChangeStreamCmd:
if _, ok := v.NewStream.Subscribers[v.ID]; !ok {
delete(r.Subscribers, v.ID)
v.NewRoom.Subscribe(v.OutputStream)
v.NewStream.Subscribe(v.Subscriber)
if len(r.SubscriberInfo) == 0 && r.Publisher == nil {
r.Cancel()
}
@@ -173,13 +170,13 @@ func (r *Room) Run() {
}
// PushAudio 来自发布者推送的音频
func (r *Room) PushAudio(timestamp uint32, payload []byte) {
func (r *Stream) PushAudio(timestamp uint32, payload []byte) {
payloadLen := len(payload)
audio := r.AVRing
audio.Type = avformat.FLV_TAG_TYPE_AUDIO
audio.Timestamp = timestamp
audio.Payload = payload
audio.VideoFrameType = 0
audio.IsKeyFrame = false
audio.IsSequence = false
if audio.GetLast().Timestamp > 0 {
r.AudioInfo.BPS = payloadLen * 1000 / int(timestamp-audio.GetLast().Timestamp)
@@ -228,11 +225,9 @@ func (r *Room) PushAudio(timestamp uint32, payload []byte) {
}
r.AudioInfo.PacketCount++
audio.Number = r.AudioInfo.PacketCount
audio.GoNext()
audio.Lock()
audio.GetLast().Unlock()
audio.NextW()
}
func (r *Room) setH264Info(video *Ring) {
func (r *Stream) setH264Info(video *Ring) {
r.VideoTag = video.AVPacket.Clone()
if r.VideoInfo.CodecID != 7 {
return
@@ -245,7 +240,7 @@ func (r *Room) setH264Info(video *Ring) {
}
// PushVideo 来自发布者推送的视频
func (r *Room) PushVideo(timestamp uint32, payload []byte) {
func (r *Stream) PushVideo(timestamp uint32, payload []byte) {
payloadLen := len(payload)
video := r.AVRing
video.Type = avformat.FLV_TAG_TYPE_VIDEO
@@ -257,9 +252,10 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) {
if payloadLen < 3 {
return
}
video.VideoFrameType = payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
videoFrameType := payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
r.VideoInfo.CodecID = payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC...
video.IsSequence = video.VideoFrameType == 1 && payload[1] == 0
video.IsSequence = videoFrameType == 1 && payload[1] == 0
video.IsKeyFrame = videoFrameType == 1 || videoFrameType == 4
if r.VideoTag == nil {
if video.IsSequence {
r.setH264Info(video)
@@ -271,7 +267,7 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) {
if video.IsSequence {
r.setH264Info(video)
}
if video.IsKeyFrame() {
if video.IsKeyFrame {
if r.FirstScreen == nil {
defer r.WaitingMutex.Unlock()
r.FirstScreen = video.Clone()
@@ -287,8 +283,6 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) {
}
r.VideoInfo.PacketCount++
video.Number = r.VideoInfo.PacketCount
video.GoNext()
video.Lock()
video.GetLast().Unlock()
video.NextW()
}
}

View File

@@ -9,11 +9,6 @@ import (
"github.com/pkg/errors"
)
// Subscriber 订阅者
// type Subscriber interface {
// Send(*avformat.SendPacket) error
// }
// SubscriberInfo 订阅者可序列化信息,用于控制台输出
type SubscriberInfo struct {
ID string
@@ -25,51 +20,50 @@ type SubscriberInfo struct {
SubscribeTime time.Time
}
// OutputStream 订阅者实体定义
type OutputStream struct {
// Subscriber 订阅者实体定义
type Subscriber struct {
context.Context
*Room
*Stream
SubscriberInfo
SendHandler func(*avformat.SendPacket) error
Cancel context.CancelFunc
Sign string
OffsetTime uint32
OnData func(*avformat.SendPacket) error
Cancel context.CancelFunc
Sign string
OffsetTime uint32
}
// IsClosed 检查订阅者是否已经关闭
func (s *OutputStream) IsClosed() bool {
func (s *Subscriber) IsClosed() bool {
return s.Context != nil && s.Err() != nil
}
// Close 关闭订阅者
func (s *OutputStream) Close() {
func (s *Subscriber) Close() {
if s.Cancel != nil {
s.Cancel()
}
}
//Play 开始订阅
func (s *OutputStream) Play(streamPath string) (err error) {
if !config.EnableWaitRoom {
if _, ok := AllRoom.Load(streamPath); !ok {
//Subscribe 开始订阅
func (s *Subscriber) Subscribe(streamPath string) (err error) {
if !config.EnableWaitStream {
if _, ok := streamCollection.Load(streamPath); !ok {
return errors.New(fmt.Sprintf("Stream not found:%s", streamPath))
}
}
AllRoom.Get(streamPath).Subscribe(s)
GetStream(streamPath).Subscribe(s)
defer s.UnSubscribe(s)
//加锁解锁的目的是等待发布者首屏数据,如果发布者尚为发布,则会等待,否则就会往下执行
s.WaitingMutex.RLock()
s.WaitingMutex.RUnlock()
sendPacket := avformat.NewSendPacket(s.VideoTag, 0)
defer sendPacket.Recycle()
s.SendHandler(sendPacket)
s.OnData(sendPacket)
packet := s.FirstScreen.Clone()
startTime := packet.Timestamp
packet.RLock()
sendPacket.AVPacket = packet.AVPacket
s.SendHandler(sendPacket)
packet.RUnlock()
packet.GoNext()
s.OnData(sendPacket)
packet.NextR()
atsent := false
dropping := false
droped := 0
@@ -84,19 +78,18 @@ func (s *OutputStream) Play(streamPath string) (err error) {
if packet.Type == avformat.FLV_TAG_TYPE_AUDIO && !atsent {
sendPacket.AVPacket = s.AudioTag
sendPacket.Timestamp = 0
s.SendHandler(sendPacket)
s.OnData(sendPacket)
atsent = true
}
sendPacket.AVPacket = packet.AVPacket
sendPacket.Timestamp = packet.Timestamp - startTime
s.SendHandler(sendPacket)
s.OnData(sendPacket)
if s.checkDrop(packet) {
dropping = true
droped = 0
}
packet.RUnlock()
packet.GoNext()
} else if packet.AVPacket.IsKeyFrame() {
packet.NextR()
} else if packet.IsKeyFrame {
//遇到关键帧则退出丢帧
dropping = false
//fmt.Println("drop package ", droped)
@@ -104,8 +97,7 @@ func (s *OutputStream) Play(streamPath string) (err error) {
packet.RUnlock()
} else {
droped++
packet.RUnlock()
packet.GoNext()
packet.NextR()
}
}
}

View File

@@ -30,7 +30,7 @@ type ServerSummary struct {
Usage float64
}
NetWork []NetWorkInfo
Rooms []*RoomInfo
Streams []*StreamInfo
lastNetWork []NetWorkInfo
ref int
control chan bool
@@ -144,10 +144,5 @@ func (s *ServerSummary) collect() {
//fmt.Printf(" HD : %v GB Free: %v GB Usage:%f%%\n", d.Total/1024/1024/1024, d.Free/1024/1024/1024, d.UsedPercent)
//fmt.Printf(" OS : %v(%v) %v \n", n.Platform, n.PlatformFamily, n.PlatformVersion)
//fmt.Printf(" Hostname : %v \n", n.Hostname)
s.Rooms = nil
AllRoom.Range(func(key interface{}, v interface{}) bool {
s.Rooms = append(s.Rooms, &v.(*Room).RoomInfo)
return true
})
return
}