diff --git a/avformat/avpacket.go b/avformat/avpacket.go index d075cb4..849fee1 100644 --- a/avformat/avpacket.go +++ b/avformat/avpacket.go @@ -17,14 +17,11 @@ type AVPacket struct { Timestamp uint32 Type byte //8 audio,9 video IsSequence bool //序列帧 - VideoFrameType byte //4bit + IsKeyFrame bool//是否为关键帧 Payload []byte Number int //编号,audio和video独立编号 } -func (av *AVPacket) IsKeyFrame() bool { - return av.VideoFrameType == 1 || av.VideoFrameType == 4 -} func (av *AVPacket) ADTS2ASC() (tagPacket *AVPacket) { tagPacket = NewAVPacket(FLV_TAG_TYPE_AUDIO) tagPacket.Payload = ADTSToAudioSpecificConfig(av.Payload) diff --git a/config.go b/config.go index ed08059..6fbc39e 100644 --- a/config.go +++ b/config.go @@ -14,6 +14,7 @@ const ( PLUGIN_SUBSCRIBER = 1 //订阅者插件 PLUGIN_PUBLISHER = 1 << 1 //发布者插件 PLUGIN_HOOK = 1 << 2 //钩子插件 + PLUGIN_APP = 1 << 3 //应用插件 ) // Plugins 所有的插件配置 @@ -53,5 +54,5 @@ type ListenerConfig struct { } var config = &struct { - EnableWaitRoom bool + EnableWaitStream bool }{true} diff --git a/hook.go b/hook.go index ab15b62..611267b 100644 --- a/hook.go +++ b/hook.go @@ -18,12 +18,12 @@ func (h AuthHook) Trigger(sign string) error { var OnPublishHooks = make(OnPublishHook, 0) -type OnPublishHook []func(r *Room) +type OnPublishHook []func(r *Stream) -func (h OnPublishHook) AddHook(hook func(r *Room)) { +func (h OnPublishHook) AddHook(hook func(r *Stream)) { OnPublishHooks = append(h, hook) } -func (h OnPublishHook) Trigger(r *Room) { +func (h OnPublishHook) Trigger(r *Stream) { for _, f := range h { f(r) } @@ -31,12 +31,12 @@ func (h OnPublishHook) Trigger(r *Room) { var OnSubscribeHooks = make(OnSubscribeHook, 0) -type OnSubscribeHook []func(s *OutputStream) +type OnSubscribeHook []func(s *Subscriber) -func (h OnSubscribeHook) AddHook(hook func(s *OutputStream)) { +func (h OnSubscribeHook) AddHook(hook func(s *Subscriber)) { OnSubscribeHooks = append(h, hook) } -func (h OnSubscribeHook) Trigger(s *OutputStream) { +func (h OnSubscribeHook) Trigger(s *Subscriber) { for _, f := range h { f(s) } @@ -44,12 +44,12 @@ func (h OnSubscribeHook) Trigger(s *OutputStream) { var OnUnSubscribeHooks = make(OnUnSubscribeHook, 0) -type OnUnSubscribeHook []func(s *OutputStream) +type OnUnSubscribeHook []func(s *Subscriber) -func (h OnUnSubscribeHook) AddHook(hook func(s *OutputStream)) { +func (h OnUnSubscribeHook) AddHook(hook func(s *Subscriber)) { OnUnSubscribeHooks = append(h, hook) } -func (h OnUnSubscribeHook) Trigger(s *OutputStream) { +func (h OnUnSubscribeHook) Trigger(s *Subscriber) { for _, f := range h { f(s) } @@ -57,12 +57,12 @@ func (h OnUnSubscribeHook) Trigger(s *OutputStream) { var OnDropHooks = make(OnDropHook, 0) -type OnDropHook []func(s *OutputStream) +type OnDropHook []func(s *Subscriber) -func (h OnDropHook) AddHook(hook func(s *OutputStream)) { +func (h OnDropHook) AddHook(hook func(s *Subscriber)) { OnDropHooks = append(h, hook) } -func (h OnDropHook) Trigger(s *OutputStream) { +func (h OnDropHook) Trigger(s *Subscriber) { for _, f := range h { f(s) } @@ -81,14 +81,14 @@ func (h OnSummaryHook) Trigger(v bool) { } } -var OnRoomClosedHooks = make(OnRoomClosedHook, 0) +var OnStreamClosedHooks = make(OnStreamClosedHook, 0) -type OnRoomClosedHook []func(*Room) +type OnStreamClosedHook []func(*Stream) -func (h OnRoomClosedHook) AddHook(hook func(*Room)) { - OnRoomClosedHooks = append(h, hook) +func (h OnStreamClosedHook) AddHook(hook func(*Stream)) { + OnStreamClosedHooks = append(h, hook) } -func (h OnRoomClosedHook) Trigger(v *Room) { +func (h OnStreamClosedHook) Trigger(v *Stream) { for _, f := range h { f(v) } diff --git a/publisher.go b/publisher.go index 5d07b04..74ccaf5 100644 --- a/publisher.go +++ b/publisher.go @@ -1,51 +1,36 @@ package engine import ( - "log" - "reflect" "time" ) -// Publisher 发布者接口 -type Publisher interface { - OnClosed() -} - -// InputStream 发布者实体定义 -type InputStream struct { - *Room +// Publisher 发布者实体定义 +type Publisher struct { + *Stream } // Close 关闭发布者 -func (p *InputStream) Close() { +func (p *Publisher) Close() { if p.Running() { p.Cancel() } } // Running 发布者是否正在发布 -func (p *InputStream) Running() bool { - return p.Room != nil && p.Err() == nil -} - -// OnClosed 发布者关闭事件,用于回收资源 -func (p *InputStream) OnClosed() { +func (p *Publisher) Running() bool { + return p.Stream != nil && p.Err() == nil } // Publish 发布者进行发布操作 -func (p *InputStream) Publish(streamPath string, publisher Publisher) bool { - p.Room = AllRoom.Get(streamPath) +func (p *Publisher) Publish(streamPath string) bool { + p.Stream = GetStream(streamPath) //检查是否已存在发布者 if p.Publisher != nil { return false } - p.Publisher = publisher - //反射获取发布者类型信息 - p.Type = reflect.ValueOf(publisher).Elem().Type().Name() - log.Printf("publish set :%s", p.Type) + p.Publisher = p p.StartTime = time.Now() //触发钩子 - OnPublishHooks.Trigger(p.Room) - + OnPublishHooks.Trigger(p.Stream) return true } diff --git a/ring.go b/ring.go index e34ee1e..cd15b75 100644 --- a/ring.go +++ b/ring.go @@ -43,6 +43,17 @@ func (r *Ring) GoBack() { r.Index-- r.RingItem = r.buffer[r.Index] } +func (r *Ring) NextW() { + r.Index++ + item := r.RingItem + r.RingItem = r.buffer[r.Index] + r.RingItem.Lock() + item.UnLock() +} +func (r *Ring) NextR() { + r.RingItem.RUnlock() + r.GoNext() +} func (r Ring) Clone() *Ring { return &r } diff --git a/room.go b/stream.go similarity index 68% rename from room.go rename to stream.go index 4c5e080..3c06004 100644 --- a/room.go +++ b/stream.go @@ -11,54 +11,54 @@ import ( ) var ( - AllRoom = RoomCollection{} - roomCtxBg = context.Background() + streamCollection = Collection{} ) // Collection 对sync.Map的包装 -type RoomCollection struct { +type Collection struct { sync.Map } -// Get 根据流名称获取房间 -func (c *RoomCollection) Get(name string) (result *Room) { - item, loaded := AllRoom.LoadOrStore(name, &Room{ - Subscribers: make(map[string]*OutputStream), +//GetStream 根据流路径获取流,如果不存在则创建一个新的 +func GetStream(streamPath string) (result *Stream) { + item, loaded := streamCollection.LoadOrStore(name, &Stream{ + Subscribers: make(map[string]*Subscriber), Control: make(chan interface{}), AVRing: NewRing(), WaitingMutex: new(sync.RWMutex), - RoomInfo: RoomInfo{ - StreamPath: name, + StreamInfo: StreamInfo{ + StreamPath: streamPath, SubscriberInfo: make([]*SubscriberInfo, 0), }, }) - result = item.(*Room) + result = item.(*Stream) if !loaded { - result.Context, result.Cancel = context.WithCancel(roomCtxBg) + Summary.Streams = append(Summary.Streams, &result.StreamInfo) + result.Context, result.Cancel = context.WithCancel(context.Background()) result.WaitingMutex.Lock() //等待发布者 go result.Run() } return } -// Room 房间定义 -type Room struct { +// Stream 流定义 +type Stream struct { context.Context Publisher - RoomInfo //可序列化,供后台查看的数据 + StreamInfo //可序列化,供后台查看的数据 Control chan interface{} Cancel context.CancelFunc - Subscribers map[string]*OutputStream // 订阅者 - VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 - AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 - FirstScreen *Ring //最近的关键帧位置,首屏渲染 - AVRing *Ring //数据环 - WaitingMutex *sync.RWMutex //用于订阅和等待发布者 - UseTimestamp bool //是否采用数据包中的时间戳 + Subscribers map[string]*Subscriber // 订阅者 + VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 + AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 + FirstScreen *Ring //最近的关键帧位置,首屏渲染 + AVRing *Ring //数据环 + WaitingMutex *sync.RWMutex //用于订阅和等待发布者 + UseTimestamp bool //是否采用数据包中的时间戳 } -// RoomInfo 房间可序列化信息,用于控制台显示 -type RoomInfo struct { +// StreamInfo 流可序列化信息,用于控制台显示 +type StreamInfo struct { StreamPath string StartTime time.Time SubscriberInfo []*SubscriberInfo @@ -82,50 +82,47 @@ type RoomInfo struct { // UnSubscribeCmd 取消订阅命令 type UnSubscribeCmd struct { - *OutputStream + *Subscriber } -// SubscribeCmd 订阅房间命令 +// SubscribeCmd 订阅流命令 type SubscribeCmd struct { - *OutputStream + *Subscriber } -// ChangeRoomCmd 切换房间命令 -type ChangeRoomCmd struct { - *OutputStream - NewRoom *Room +// ChangeStreamCmd 切换流命令 +type ChangeStreamCmd struct { + *Subscriber + NewStream *Stream } -func (r *Room) onClosed() { - Print(Yellow("room destoryed :"), BrightCyan(r.StreamPath)) - AllRoom.Delete(r.StreamPath) - OnRoomClosedHooks.Trigger(r) - if r.Publisher != nil { - r.OnClosed() - } +func (r *Stream) onClosed() { + Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) + streamCollection.Delete(r.StreamPath) + OnStreamClosedHooks.Trigger(r) } -//Subscribe 订阅房间 -func (r *Room) Subscribe(s *OutputStream) { - s.Room = r +//Subscribe 订阅流 +func (r *Stream) Subscribe(s *Subscriber) { + s.Stream = r if r.Err() == nil { s.SubscribeTime = time.Now() - Print(Sprintf(Yellow("subscribe :%s %s,to room %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) + Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) s.Context, s.Cancel = context.WithCancel(r) s.Control <- &SubscribeCmd{s} } } -//UnSubscribe 取消订阅房间 -func (r *Room) UnSubscribe(s *OutputStream) { +//UnSubscribe 取消订阅流 +func (r *Stream) UnSubscribe(s *Subscriber) { if r.Err() == nil { r.Control <- &UnSubscribeCmd{s} } } -// Run 房间运行,转发逻辑 -func (r *Room) Run() { - Print(Green("room create:"), BrightCyan(r.StreamPath)) +// Run 流运行 +func (r *Stream) Run() { + Print(Green("Stream create:"), BrightCyan(r.StreamPath)) defer r.onClosed() for { select { @@ -145,7 +142,7 @@ func (r *Room) Run() { } copy(r.SubscriberInfo[hole:], r.SubscriberInfo[hole+1:]) r.SubscriberInfo = r.SubscriberInfo[:len(r.SubscriberInfo)-1] - OnUnSubscribeHooks.Trigger(v.OutputStream) + OnUnSubscribeHooks.Trigger(v.Subscriber) Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo)))) if len(r.SubscriberInfo) == 0 && r.Publisher == nil { r.Cancel() @@ -154,15 +151,15 @@ func (r *Room) Run() { case *SubscribeCmd: //防止重复添加 if _, ok := r.Subscribers[v.ID]; !ok { - r.Subscribers[v.ID] = v.OutputStream + r.Subscribers[v.ID] = v.Subscriber r.SubscriberInfo = append(r.SubscriberInfo, &v.SubscriberInfo) Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo)))) - OnSubscribeHooks.Trigger(v.OutputStream) + OnSubscribeHooks.Trigger(v.Subscriber) } - case *ChangeRoomCmd: - if _, ok := v.NewRoom.Subscribers[v.ID]; !ok { + case *ChangeStreamCmd: + if _, ok := v.NewStream.Subscribers[v.ID]; !ok { delete(r.Subscribers, v.ID) - v.NewRoom.Subscribe(v.OutputStream) + v.NewStream.Subscribe(v.Subscriber) if len(r.SubscriberInfo) == 0 && r.Publisher == nil { r.Cancel() } @@ -173,13 +170,13 @@ func (r *Room) Run() { } // PushAudio 来自发布者推送的音频 -func (r *Room) PushAudio(timestamp uint32, payload []byte) { +func (r *Stream) PushAudio(timestamp uint32, payload []byte) { payloadLen := len(payload) audio := r.AVRing audio.Type = avformat.FLV_TAG_TYPE_AUDIO audio.Timestamp = timestamp audio.Payload = payload - audio.VideoFrameType = 0 + audio.IsKeyFrame = false audio.IsSequence = false if audio.GetLast().Timestamp > 0 { r.AudioInfo.BPS = payloadLen * 1000 / int(timestamp-audio.GetLast().Timestamp) @@ -228,11 +225,9 @@ func (r *Room) PushAudio(timestamp uint32, payload []byte) { } r.AudioInfo.PacketCount++ audio.Number = r.AudioInfo.PacketCount - audio.GoNext() - audio.Lock() - audio.GetLast().Unlock() + audio.NextW() } -func (r *Room) setH264Info(video *Ring) { +func (r *Stream) setH264Info(video *Ring) { r.VideoTag = video.AVPacket.Clone() if r.VideoInfo.CodecID != 7 { return @@ -245,7 +240,7 @@ func (r *Room) setH264Info(video *Ring) { } // PushVideo 来自发布者推送的视频 -func (r *Room) PushVideo(timestamp uint32, payload []byte) { +func (r *Stream) PushVideo(timestamp uint32, payload []byte) { payloadLen := len(payload) video := r.AVRing video.Type = avformat.FLV_TAG_TYPE_VIDEO @@ -257,9 +252,10 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) { if payloadLen < 3 { return } - video.VideoFrameType = payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2 + videoFrameType := payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2 r.VideoInfo.CodecID = payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC... - video.IsSequence = video.VideoFrameType == 1 && payload[1] == 0 + video.IsSequence = videoFrameType == 1 && payload[1] == 0 + video.IsKeyFrame = videoFrameType == 1 || videoFrameType == 4 if r.VideoTag == nil { if video.IsSequence { r.setH264Info(video) @@ -271,7 +267,7 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) { if video.IsSequence { r.setH264Info(video) } - if video.IsKeyFrame() { + if video.IsKeyFrame { if r.FirstScreen == nil { defer r.WaitingMutex.Unlock() r.FirstScreen = video.Clone() @@ -287,8 +283,6 @@ func (r *Room) PushVideo(timestamp uint32, payload []byte) { } r.VideoInfo.PacketCount++ video.Number = r.VideoInfo.PacketCount - video.GoNext() - video.Lock() - video.GetLast().Unlock() + video.NextW() } } diff --git a/subscriber.go b/subscriber.go index 510ad0d..db9ed7c 100644 --- a/subscriber.go +++ b/subscriber.go @@ -9,11 +9,6 @@ import ( "github.com/pkg/errors" ) -// Subscriber 订阅者 -// type Subscriber interface { -// Send(*avformat.SendPacket) error -// } - // SubscriberInfo 订阅者可序列化信息,用于控制台输出 type SubscriberInfo struct { ID string @@ -25,51 +20,50 @@ type SubscriberInfo struct { SubscribeTime time.Time } -// OutputStream 订阅者实体定义 -type OutputStream struct { +// Subscriber 订阅者实体定义 +type Subscriber struct { context.Context - *Room + *Stream SubscriberInfo - SendHandler func(*avformat.SendPacket) error - Cancel context.CancelFunc - Sign string - OffsetTime uint32 + OnData func(*avformat.SendPacket) error + Cancel context.CancelFunc + Sign string + OffsetTime uint32 } // IsClosed 检查订阅者是否已经关闭 -func (s *OutputStream) IsClosed() bool { +func (s *Subscriber) IsClosed() bool { return s.Context != nil && s.Err() != nil } // Close 关闭订阅者 -func (s *OutputStream) Close() { +func (s *Subscriber) Close() { if s.Cancel != nil { s.Cancel() } } -//Play 开始订阅 -func (s *OutputStream) Play(streamPath string) (err error) { - if !config.EnableWaitRoom { - if _, ok := AllRoom.Load(streamPath); !ok { +//Subscribe 开始订阅 +func (s *Subscriber) Subscribe(streamPath string) (err error) { + if !config.EnableWaitStream { + if _, ok := streamCollection.Load(streamPath); !ok { return errors.New(fmt.Sprintf("Stream not found:%s", streamPath)) } } - AllRoom.Get(streamPath).Subscribe(s) + GetStream(streamPath).Subscribe(s) defer s.UnSubscribe(s) //加锁解锁的目的是等待发布者首屏数据,如果发布者尚为发布,则会等待,否则就会往下执行 s.WaitingMutex.RLock() s.WaitingMutex.RUnlock() sendPacket := avformat.NewSendPacket(s.VideoTag, 0) defer sendPacket.Recycle() - s.SendHandler(sendPacket) + s.OnData(sendPacket) packet := s.FirstScreen.Clone() startTime := packet.Timestamp packet.RLock() sendPacket.AVPacket = packet.AVPacket - s.SendHandler(sendPacket) - packet.RUnlock() - packet.GoNext() + s.OnData(sendPacket) + packet.NextR() atsent := false dropping := false droped := 0 @@ -84,19 +78,18 @@ func (s *OutputStream) Play(streamPath string) (err error) { if packet.Type == avformat.FLV_TAG_TYPE_AUDIO && !atsent { sendPacket.AVPacket = s.AudioTag sendPacket.Timestamp = 0 - s.SendHandler(sendPacket) + s.OnData(sendPacket) atsent = true } sendPacket.AVPacket = packet.AVPacket sendPacket.Timestamp = packet.Timestamp - startTime - s.SendHandler(sendPacket) + s.OnData(sendPacket) if s.checkDrop(packet) { dropping = true droped = 0 } - packet.RUnlock() - packet.GoNext() - } else if packet.AVPacket.IsKeyFrame() { + packet.NextR() + } else if packet.IsKeyFrame { //遇到关键帧则退出丢帧 dropping = false //fmt.Println("drop package ", droped) @@ -104,8 +97,7 @@ func (s *OutputStream) Play(streamPath string) (err error) { packet.RUnlock() } else { droped++ - packet.RUnlock() - packet.GoNext() + packet.NextR() } } } diff --git a/summary.go b/summary.go index 720123e..aa1c980 100644 --- a/summary.go +++ b/summary.go @@ -30,7 +30,7 @@ type ServerSummary struct { Usage float64 } NetWork []NetWorkInfo - Rooms []*RoomInfo + Streams []*StreamInfo lastNetWork []NetWorkInfo ref int control chan bool @@ -144,10 +144,5 @@ func (s *ServerSummary) collect() { //fmt.Printf(" HD : %v GB Free: %v GB Usage:%f%%\n", d.Total/1024/1024/1024, d.Free/1024/1024/1024, d.UsedPercent) //fmt.Printf(" OS : %v(%v) %v \n", n.Platform, n.PlatformFamily, n.PlatformVersion) //fmt.Printf(" Hostname : %v \n", n.Hostname) - s.Rooms = nil - AllRoom.Range(func(key interface{}, v interface{}) bool { - s.Rooms = append(s.Rooms, &v.(*Room).RoomInfo) - return true - }) return }