mirror of
https://github.com/langhuihui/monibuca.git
synced 2025-09-27 01:15:52 +08:00
251 lines
7.2 KiB
Go
251 lines
7.2 KiB
Go
package monica
|
||
|
||
import (
|
||
"context"
|
||
"log"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/langhuihui/monibuca/monica/avformat"
|
||
)
|
||
|
||
var (
|
||
AllRoom = Collection{}
|
||
roomCtxBg = context.Background()
|
||
)
|
||
|
||
type Collection struct {
|
||
sync.Map
|
||
}
|
||
|
||
func (c *Collection) Get(name string) (result *Room) {
|
||
item, loaded := AllRoom.LoadOrStore(name, &Room{
|
||
Subscribers: make(map[string]*OutputStream),
|
||
Control: make(chan interface{}),
|
||
VideoChan: make(chan *avformat.AVPacket, 1),
|
||
AudioChan: make(chan *avformat.AVPacket, 1),
|
||
})
|
||
result = item.(*Room)
|
||
if !loaded {
|
||
result.StreamPath = name
|
||
result.Context, result.Cancel = context.WithCancel(roomCtxBg)
|
||
go result.Run()
|
||
}
|
||
return
|
||
}
|
||
|
||
type Room struct {
|
||
context.Context
|
||
Publisher
|
||
RoomInfo
|
||
Control chan interface{}
|
||
Cancel context.CancelFunc
|
||
Subscribers map[string]*OutputStream // 订阅者
|
||
VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
|
||
AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
|
||
FirstScreen []*avformat.AVPacket
|
||
AudioChan chan *avformat.AVPacket
|
||
VideoChan chan *avformat.AVPacket
|
||
UseTimestamp bool //是否采用数据包中的时间戳
|
||
}
|
||
|
||
type RoomInfo struct {
|
||
StreamPath string
|
||
StartTime time.Time
|
||
SubscriberInfo []*SubscriberInfo
|
||
Type string
|
||
VideoInfo struct {
|
||
PacketCount int
|
||
CodecID byte
|
||
SPSInfo avformat.SPSInfo
|
||
}
|
||
AudioInfo struct {
|
||
PacketCount int
|
||
SoundFormat byte //4bit
|
||
SoundRate int //2bit
|
||
SoundSize byte //1bit
|
||
SoundType byte //1bit
|
||
}
|
||
}
|
||
type UnSubscribeCmd struct {
|
||
*OutputStream
|
||
}
|
||
type SubscribeCmd struct {
|
||
*OutputStream
|
||
}
|
||
type ChangeRoomCmd struct {
|
||
*OutputStream
|
||
NewRoom *Room
|
||
}
|
||
|
||
func (r *Room) onClosed() {
|
||
log.Printf("room destoryed :%s", r.StreamPath)
|
||
AllRoom.Delete(r.StreamPath)
|
||
if r.Publisher != nil {
|
||
r.OnClosed()
|
||
}
|
||
}
|
||
func (r *Room) Subscribe(s *OutputStream) {
|
||
s.Room = r
|
||
if r.Err() == nil {
|
||
s.SubscribeTime = time.Now()
|
||
log.Printf("subscribe :%s %s,to room %s", s.Type, s.ID, r.StreamPath)
|
||
s.packetQueue = make(chan *avformat.SendPacket, 1024)
|
||
s.Context, s.Cancel = context.WithCancel(r)
|
||
s.Control <- &SubscribeCmd{s}
|
||
}
|
||
}
|
||
|
||
func (r *Room) UnSubscribe(s *OutputStream) {
|
||
if r.Err() == nil {
|
||
r.Control <- &UnSubscribeCmd{s}
|
||
}
|
||
}
|
||
func (r *Room) Run() {
|
||
log.Printf("room create :%s", r.StreamPath)
|
||
defer r.onClosed()
|
||
update := time.NewTicker(time.Second)
|
||
defer update.Stop()
|
||
for {
|
||
select {
|
||
case <-r.Done():
|
||
return
|
||
case <-update.C:
|
||
if Summary.Running() {
|
||
r.SubscriberInfo = make([]*SubscriberInfo, len(r.Subscribers))
|
||
i := 0
|
||
for _, v := range r.Subscribers {
|
||
r.SubscriberInfo[i] = &v.SubscriberInfo
|
||
i++
|
||
}
|
||
}
|
||
case s := <-r.Control:
|
||
switch v := s.(type) {
|
||
case *UnSubscribeCmd:
|
||
delete(r.Subscribers, v.ID)
|
||
log.Printf("%s subscriber %s removed remains:%d", r.StreamPath, v.ID, len(r.Subscribers))
|
||
if len(r.Subscribers) == 0 && r.Publisher == nil {
|
||
r.Cancel()
|
||
}
|
||
case *SubscribeCmd:
|
||
if _, ok := r.Subscribers[v.ID]; !ok {
|
||
r.Subscribers[v.ID] = v.OutputStream
|
||
log.Printf("%s subscriber %s added remains:%d", r.StreamPath, v.ID, len(r.Subscribers))
|
||
OnSubscribeHooks.Trigger(v.OutputStream)
|
||
}
|
||
case *ChangeRoomCmd:
|
||
if _, ok := v.NewRoom.Subscribers[v.ID]; !ok {
|
||
delete(r.Subscribers, v.ID)
|
||
v.NewRoom.Subscribe(v.OutputStream)
|
||
if len(r.Subscribers) == 0 && r.Publisher == nil {
|
||
r.Cancel()
|
||
}
|
||
}
|
||
}
|
||
case audio := <-r.AudioChan:
|
||
for _, v := range r.Subscribers {
|
||
v.sendAudio(audio)
|
||
}
|
||
case video := <-r.VideoChan:
|
||
for _, v := range r.Subscribers {
|
||
v.sendVideo(video)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
func (r *Room) PushAudio(audio *avformat.AVPacket) {
|
||
if len(audio.Payload) < 4 {
|
||
return
|
||
}
|
||
if audio.Payload[0] == 0xFF && (audio.Payload[1]&0xF0) == 0xF0 {
|
||
//audio.IsADTS = true
|
||
r.AudioInfo.SoundFormat = 10
|
||
r.AudioInfo.SoundRate = avformat.SamplingFrequencies[(audio.Payload[2]&0x3c)>>2]
|
||
r.AudioInfo.SoundType = ((audio.Payload[2] & 0x1) << 2) | ((audio.Payload[3] & 0xc0) >> 6)
|
||
r.AudioTag = audio.ADTS2ASC()
|
||
} else if r.AudioTag == nil {
|
||
audio.IsAACSequence = true
|
||
if len(audio.Payload) < 5 {
|
||
return
|
||
}
|
||
r.AudioTag = audio
|
||
tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息
|
||
if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话,后面有一个字节的详细信息
|
||
//0 = AAC sequence header,1 = AAC raw。
|
||
if aacPacketType := audio.Payload[1]; aacPacketType == 0 {
|
||
config1 := audio.Payload[2]
|
||
config2 := audio.Payload[3]
|
||
//audioObjectType = (config1 & 0xF8) >> 3
|
||
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
|
||
// 2 AAC LC ISO/IEC 14496-3 subpart 4
|
||
// 3 AAC SSR ISO/IEC 14496-3 subpart 4
|
||
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
|
||
r.AudioInfo.SoundRate = avformat.SamplingFrequencies[((config1&0x7)<<1)|(0x90>>7)]
|
||
r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道
|
||
//frameLengthFlag = (config2 >> 2) & 0x01
|
||
//dependsOnCoreCoder = (config2 >> 1) & 0x01
|
||
//extensionFlag = config2 & 0x01
|
||
}
|
||
} else {
|
||
r.AudioInfo.SoundRate = avformat.SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz
|
||
r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples
|
||
r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道,1立体声
|
||
}
|
||
return
|
||
}
|
||
audio.RefCount = len(r.Subscribers)
|
||
if !r.UseTimestamp {
|
||
audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
|
||
}
|
||
r.AudioInfo.PacketCount++
|
||
r.AudioChan <- audio
|
||
}
|
||
func (r *Room) setH264Info(video *avformat.AVPacket) {
|
||
r.VideoTag = video
|
||
if r.VideoInfo.CodecID != 7 {
|
||
return
|
||
}
|
||
info := avformat.AVCDecoderConfigurationRecord{}
|
||
//0:codec,1:IsAVCSequence,2~4:compositionTime
|
||
if _, err := info.Unmarshal(video.Payload[5:]); err == nil {
|
||
r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit)
|
||
}
|
||
}
|
||
func (r *Room) PushVideo(video *avformat.AVPacket) {
|
||
if len(video.Payload) < 3 {
|
||
return
|
||
}
|
||
video.VideoFrameType = video.Payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
|
||
r.VideoInfo.CodecID = video.Payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC...
|
||
video.IsAVCSequence = video.VideoFrameType == 1 && video.Payload[1] == 0
|
||
if r.VideoTag == nil {
|
||
if video.IsAVCSequence {
|
||
r.setH264Info(video)
|
||
} else {
|
||
log.Println("no AVCSequence")
|
||
}
|
||
} else {
|
||
//更换AVCSequence
|
||
if video.IsAVCSequence {
|
||
r.setH264Info(video)
|
||
}
|
||
if r.FirstScreen != nil {
|
||
if video.IsKeyFrame() {
|
||
for _, cache := range r.FirstScreen { //清空队列
|
||
cache.Recycle()
|
||
}
|
||
r.FirstScreen = r.FirstScreen[:0]
|
||
}
|
||
r.FirstScreen = append(r.FirstScreen, video)
|
||
video.RefCount = len(r.Subscribers) + 1
|
||
} else {
|
||
video.RefCount = len(r.Subscribers)
|
||
}
|
||
if !r.UseTimestamp {
|
||
video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond)
|
||
}
|
||
r.VideoInfo.PacketCount++
|
||
r.VideoChan <- video
|
||
}
|
||
}
|