From ca016d834b4cb0deeaf9a1bf1e5ed1a02081993f Mon Sep 17 00:00:00 2001 From: langhuihui <178529795@qq.com> Date: Mon, 11 Jan 2021 23:08:15 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=80=E5=8F=91=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- audio_track.go2 | 84 ++++++++ avformat/flv.go | 4 +- avformat/sps.go | 2 +- go.mod | 11 +- go.sum | 23 +- hook.go | 95 -------- hook.go2 | 34 +++ main.go | 39 +--- nalu.go | 2 +- publisher.go => publisher.go2 | 11 +- ring.go | 98 --------- stream.go | 369 -------------------------------- stream.go2 | 99 +++++++++ subscriber.go => subscriber.go2 | 34 +-- summary.go | 148 ------------- util/SSE.go | 73 ------- util/linux.go | 13 ++ util/stderr.go | 8 +- util/windows.go | 13 ++ version.go | 2 + video_track.go2 | 75 +++++++ 21 files changed, 374 insertions(+), 863 deletions(-) create mode 100644 audio_track.go2 delete mode 100644 hook.go create mode 100644 hook.go2 rename publisher.go => publisher.go2 (81%) delete mode 100644 ring.go delete mode 100644 stream.go create mode 100644 stream.go2 rename subscriber.go => subscriber.go2 (77%) delete mode 100644 summary.go delete mode 100644 util/SSE.go create mode 100644 util/linux.go create mode 100644 util/windows.go create mode 100644 version.go create mode 100644 video_track.go2 diff --git a/audio_track.go2 b/audio_track.go2 new file mode 100644 index 0000000..293cfa9 --- /dev/null +++ b/audio_track.go2 @@ -0,0 +1,84 @@ +package engine + +import ( + "github.com/pion/rtp" + "github.com/Monibuca/utils/v3/go2" + "time" +) + +type AudioTrack struct { + Buffer *go2.Ring[rtp.Packet] + Info struct { + PacketCount int + SoundFormat byte //4bit + SoundRate int //2bit + SoundSize byte //1bit + SoundType byte //1bit + lastIndex int + BPS int + } +} +// Push 来自发布者推送的音频 +func (at *AudioTrack) Push(timestamp uint32, payload []byte) { + audio := at.Buffer + payloadLen := len(payload) + audio.Type = FLV_TAG_TYPE_AUDIO + audio.Timestamp = timestamp + audio.Payload = payload + audio.IsKeyFrame = false + audio.IsSequence = false + + if payloadLen < 4 { + return + } + if payload[0] == 0xFF && (payload[1]&0xF0) == 0xF0 { + //将ADTS转换成ASC + r.AudioInfo.SoundFormat = 10 + r.AudioInfo.SoundRate = SamplingFrequencies[(payload[2]&0x3c)>>2] + r.AudioInfo.SoundType = ((payload[2] & 0x1) << 2) | ((payload[3] & 0xc0) >> 6) + r.AudioTag = audio.ADTS2ASC() + } else if r.AudioTag == nil && r.AudioInfo.SoundRate == 0 { + audio.IsSequence = true + // if payloadLen < 5 { + // return + // } + r.AudioTag = audio.AVPacket.Clone() + tmp := payload[0] // 第一个字节保存着音频的相关信息 + if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话,后面有一个字节的详细信息 + //0 = AAC sequence header,1 = AAC raw。 + if aacPacketType := payload[1]; aacPacketType == 0 { + config1 := payload[2] + config2 := payload[3] + //audioObjectType = (config1 & 0xF8) >> 3 + // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 + // 2 AAC LC ISO/IEC 14496-3 subpart 4 + // 3 AAC SSR ISO/IEC 14496-3 subpart 4 + // 4 AAC LTP ISO/IEC 14496-3 subpart 4 + r.AudioInfo.SoundRate = SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)] + r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道 + //frameLengthFlag = (config2 >> 2) & 0x01 + //dependsOnCoreCoder = (config2 >> 1) & 0x01 + //extensionFlag = config2 & 0x01 + } + return + } else { + r.AudioInfo.SoundRate = SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz + r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples + r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道,1立体声 + } + } + if !r.UseTimestamp { + audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) + } + lastTimestamp := audio.GetAt(r.AudioInfo.lastIndex).Timestamp + if lastTimestamp > 0 && lastTimestamp != audio.Timestamp { + r.AudioInfo.BPS = payloadLen * 1000 / int(audio.Timestamp-lastTimestamp) + } + r.AudioInfo.PacketCount++ + audio.Number = r.AudioInfo.PacketCount + r.AudioInfo.lastIndex = audio.Index + audio.NextW() + if r.AudioInfo.PacketCount == 1 && (!*r.EnableVideo) { + close(r.WaitPub) + } +} \ No newline at end of file diff --git a/avformat/flv.go b/avformat/flv.go index 97a17cc..779044c 100644 --- a/avformat/flv.go +++ b/avformat/flv.go @@ -3,8 +3,8 @@ package avformat import ( "io" - "github.com/Monibuca/engine/v2/pool" - "github.com/Monibuca/engine/v2/util" + "github.com/Monibuca/engine/v3/pool" + "github.com/Monibuca/engine/v3/util" ) const ( diff --git a/avformat/sps.go b/avformat/sps.go index dc2af32..64cfcfd 100644 --- a/avformat/sps.go +++ b/avformat/sps.go @@ -3,7 +3,7 @@ package avformat import ( "bytes" - "github.com/Monibuca/engine/v2/util/bits" + "github.com/Monibuca/engine/v3/util/bits" ) type SPSInfo struct { diff --git a/go.mod b/go.mod index 2c469f2..cd3ef24 100644 --- a/go.mod +++ b/go.mod @@ -1,16 +1,15 @@ -module github.com/Monibuca/engine/v2 +module github.com/Monibuca/engine/v3 go 1.13 require ( github.com/BurntSushi/toml v0.3.1 - github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 - github.com/funny/utest v0.0.0-20161029064919-43870a374500 // indirect - github.com/go-ole/go-ole v1.2.4 // indirect github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 github.com/mattn/go-colorable v0.1.6 github.com/pkg/errors v0.9.1 - github.com/shirou/gopsutil v2.20.1+incompatible - github.com/stretchr/testify v1.5.1 // indirect + github.com/pion/rtp v1.5.4 + github.com/Monibuca/utils/v3 v3.0.0-alpha2 ) + +replace github.com/Monibuca/utils/v3 v3.0.0-alpha2 => ../../utils/v3 \ No newline at end of file diff --git a/go.sum b/go.sum index afc5254..8ae6117 100644 --- a/go.sum +++ b/go.sum @@ -1,49 +1,40 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Monibuca/engine/v2 v2.2.5 h1:/w0BrvdTy4cqLD2uaIRaqBwdnu+/VDk+r3sjFbpbc1E= +github.com/Monibuca/engine/v2 v2.2.5/go.mod h1:34EYjjV15G6myuHOKaJkO7y5tJ1Arq/NfC9Weacr2mc= +github.com/Monibuca/utils/v3 v3.0.0-alpha2 h1:zQzAbzhteSJBiDQKnTS8P5Ro6fZF3wzS7Vs1ArgFO4E= +github.com/Monibuca/utils/v3 v3.0.0-alpha2/go.mod h1:DBJeFxFTiZFScKAmAmqrpvLfyVeeZ77Rpq7mdNRW+28= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= -github.com/beevik/etree v1.1.0/go.mod h1:r8Aw8JqVegEf0w2fDnATrX9VpkMcyFeM0FhwO62wh+A= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/elgs/gostrgen v0.0.0-20161222160715-9d61ae07eeae/go.mod h1:wruC5r2gHdr/JIUs5Rr1V45YtsAzKXZxAnn/5rPC97g= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478 h1:Db9StoJ6RZN3YttC0Pm0I4Y5izITRYch3RMbT59BYN0= github.com/funny/slab v0.0.0-20180511031532-b1fad5e5d478/go.mod h1:0j1+svBH8ABEIPdUP0AIg4qedsybnXGJBakCEw8cfoo= github.com/funny/utest v0.0.0-20161029064919-43870a374500 h1:Z0r1CZnoIWFB/Uiwh1BU5FYmuFe6L5NPi6XWQEmsTRg= github.com/funny/utest v0.0.0-20161029064919-43870a374500/go.mod h1:mUn39tBov9jKnTWV1RlOYoNzxdBFHiSzXWdY1FoNGGg= github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= -github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee/go.mod h1:L0fX3K22YWvt/FAX9NnzrNzcI4wNYi9Yku4O0LKYflo= -github.com/gobwas/pool v0.2.0/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= -github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381 h1:bqDmpDG49ZRnB5PcgP0RXtQvnMSgIF14M7CBd2shtXs= github.com/logrusorgru/aurora v0.0.0-20200102142835-e9ef32dff381/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4= github.com/mattn/go-colorable v0.1.6 h1:6Su7aK7lXmJ/U79bYtBjLNaha4Fs1Rg9plHpcH+vvnE= github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= +github.com/pion/rtp v1.5.4/go.mod h1:bg60AL5GotNOlYZsqycbhDtEV3TkfbpXG0KBiUq29Mg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/quangngotan95/go-m3u8 v0.1.0/go.mod h1:smzfWHlYpBATVNu1GapKLYiCtEo5JxridIgvvudZ+Wc= -github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= -github.com/shirou/gopsutil v2.19.12+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/shirou/gopsutil v2.20.1+incompatible h1:oIq9Cq4i84Hk8uQAUOG3eNdI/29hBawGrD5YRl6JRDY= github.com/shirou/gopsutil v2.20.1+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/yakovlevdmv/Golang-iso8601-duration v0.0.0-20180403125811-e5db0413b903/go.mod h1:9o96byDMk+osDZqiIS2a7E7y0cWmg4rRTjQRWVHpFWE= -github.com/yakovlevdmv/WS-Discovery v0.0.0-20180512141937-16170c6c3677/go.mod h1:/VKdrRRbAVE0pvkoPTUlfXw1zxqEpflVsgF25aR5gbk= -github.com/yakovlevdmv/goonvif v0.0.0-20180517145634-8181eb3ef2fb/go.mod h1:Os0AToR0I28wSLpS4rQtZdMEcfGKJcSrTaJughAopv4= -github.com/yakovlevdmv/gosoap v0.0.0-20180512142237-299a954b1c6d/go.mod h1:NhCpqPG+N2wrLSqEHVG3FKl4uAPvtFHUx7IlCVpW1PU= -golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20200226051749-491c5fce7268/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs= +golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae h1:/WDfKMnPU+m5M4xB+6x4kaepxRw6jWvR5iDRdvjHgy8= golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= diff --git a/hook.go b/hook.go deleted file mode 100644 index 611267b..0000000 --- a/hook.go +++ /dev/null @@ -1,95 +0,0 @@ -package engine - -var AuthHooks = make(AuthHook, 0) - -type AuthHook []func(string) error - -func (h AuthHook) AddHook(hook func(string) error) { - AuthHooks = append(h, hook) -} -func (h AuthHook) Trigger(sign string) error { - for _, f := range h { - if err := f(sign); err != nil { - return err - } - } - return nil -} - -var OnPublishHooks = make(OnPublishHook, 0) - -type OnPublishHook []func(r *Stream) - -func (h OnPublishHook) AddHook(hook func(r *Stream)) { - OnPublishHooks = append(h, hook) -} -func (h OnPublishHook) Trigger(r *Stream) { - for _, f := range h { - f(r) - } -} - -var OnSubscribeHooks = make(OnSubscribeHook, 0) - -type OnSubscribeHook []func(s *Subscriber) - -func (h OnSubscribeHook) AddHook(hook func(s *Subscriber)) { - OnSubscribeHooks = append(h, hook) -} -func (h OnSubscribeHook) Trigger(s *Subscriber) { - for _, f := range h { - f(s) - } -} - -var OnUnSubscribeHooks = make(OnUnSubscribeHook, 0) - -type OnUnSubscribeHook []func(s *Subscriber) - -func (h OnUnSubscribeHook) AddHook(hook func(s *Subscriber)) { - OnUnSubscribeHooks = append(h, hook) -} -func (h OnUnSubscribeHook) Trigger(s *Subscriber) { - for _, f := range h { - f(s) - } -} - -var OnDropHooks = make(OnDropHook, 0) - -type OnDropHook []func(s *Subscriber) - -func (h OnDropHook) AddHook(hook func(s *Subscriber)) { - OnDropHooks = append(h, hook) -} -func (h OnDropHook) Trigger(s *Subscriber) { - for _, f := range h { - f(s) - } -} - -var OnSummaryHooks = make(OnSummaryHook, 0) - -type OnSummaryHook []func(bool) - -func (h OnSummaryHook) AddHook(hook func(bool)) { - OnSummaryHooks = append(h, hook) -} -func (h OnSummaryHook) Trigger(v bool) { - for _, f := range h { - f(v) - } -} - -var OnStreamClosedHooks = make(OnStreamClosedHook, 0) - -type OnStreamClosedHook []func(*Stream) - -func (h OnStreamClosedHook) AddHook(hook func(*Stream)) { - OnStreamClosedHooks = append(h, hook) -} -func (h OnStreamClosedHook) Trigger(v *Stream) { - for _, f := range h { - f(v) - } -} diff --git a/hook.go2 b/hook.go2 new file mode 100644 index 0000000..c457d33 --- /dev/null +++ b/hook.go2 @@ -0,0 +1,34 @@ +package engine + +import ( + "context" + "github.com/Monibuca/utils/v3/go2" +) +type Hook struct { + Name string + Payload interface{} +} +var Hooks = go2.NewRing[Hook]() + +func AddHook[T interface{}](name string,channel chan T) { + for hooks:= Hooks.SubRing(Hooks.Index);;hooks.GoNext(){ + hooks.Current.Wait() + if name == hooks.Current.Name { + channel<-hooks.Current.Payload.(T) + } + } +} + +func AddHookWithContext[T interface{}](name string,channel chan T,ctx context.Context) { + for hooks:= Hooks.SubRing(Hooks.Index);ctx.Err()==nil;hooks.GoNext(){ + hooks.Current.Wait() + if name == hooks.Current.Name && ctx.Err()==nil{ + channel<-hooks.Current.Payload.(T) + } + } +} + +func TriggerHook(hook Hook) { + Hooks.Current.T = hook + Hooks.NextW() +} \ No newline at end of file diff --git a/main.go b/main.go index 9064580..6357292 100644 --- a/main.go +++ b/main.go @@ -2,13 +2,9 @@ package engine import ( "encoding/json" - "fmt" + "github.com/Monibuca/engine/v3/util" "io/ioutil" "log" - "os" - "path/filepath" - "runtime" - "strings" "time" // colorable "github.com/BurntSushi/toml" @@ -18,44 +14,27 @@ import ( var ( config = &struct { EnableWaitStream bool - EnableAudio bool - EnableVideo bool - RingSize int + EnableAudio bool + EnableVideo bool PublishTimeout time.Duration - }{true, true, true, 10, time.Minute} + }{true, true, true, time.Minute} // ConfigRaw 配置信息的原始数据 ConfigRaw []byte - // Version 引擎版本号 - Version string - // EngineInfo 引擎信息 - EngineInfo = &struct { - Version *string - StartTime time.Time //启动时间 - EnableWaitStream *bool - RingSize *int - }{&Version, time.Now(), &config.EnableWaitStream, &config.RingSize} + StartTime time.Time //启动时间 ) // Run 启动Monibuca引擎 func Run(configFile string) (err error) { - if runtime.GOOS == "windows" { - ioutil.WriteFile("shutdown.bat", []byte(fmt.Sprintf("taskkill /pid %d -t -f", os.Getpid())), 0777) - } else { - ioutil.WriteFile("shutdown.sh", []byte(fmt.Sprintf("kill -9 %d", os.Getpid())), 0777) - } - _, enginePath, _, _ := runtime.Caller(0) - if parts := strings.Split(filepath.Dir(enginePath), "@"); len(parts) > 1 { - Version = parts[len(parts)-1] - } + err = util.CreateShutdownScript() + StartTime = time.Now() if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil { Print(Red("read config file error:"), err) return } - Print(BgGreen(Black("Ⓜ start monibuca ")), BrightBlue(Version)) - go Summary.StartSummary() + Print(BgGreen(Black("Ⓜ starting monibuca ")), BrightBlue(Version)) var cg map[string]interface{} if _, err = toml.Decode(string(ConfigRaw), &cg); err == nil { - if cfg, ok := cg["Monibuca"]; ok { + if cfg, ok := cg["Engine"]; ok { b, _ := json.Marshal(cfg) if err = json.Unmarshal(b, config); err != nil { log.Println(err) diff --git a/nalu.go b/nalu.go index 4d140c1..4e2b812 100644 --- a/nalu.go +++ b/nalu.go @@ -3,7 +3,7 @@ package engine import ( "encoding/binary" - . "github.com/Monibuca/engine/v2/avformat" + . "github.com/Monibuca/engine/v3/avformat" ) const ( diff --git a/publisher.go b/publisher.go2 similarity index 81% rename from publisher.go rename to publisher.go2 index 6b67457..187ec08 100644 --- a/publisher.go +++ b/publisher.go2 @@ -11,12 +11,13 @@ type Publisher struct { cancel context.CancelFunc AutoUnPublish bool // 当无人订阅时自动停止发布 *Stream + Type string //类型,用来区分不同的发布者 } // Close 关闭发布者 func (p *Publisher) Close() { if p.Running() { - p.Cancel() + p.Stream.Close() } } @@ -30,16 +31,12 @@ func (p *Publisher) Publish(streamPath string) bool { p.Stream = GetStream(streamPath) //检查是否已存在发布者 if p.Publisher != nil { - if p.AVRing.Timeout() { - p.Publisher.cancel() //单独关闭Publisher而复用Stream - } else { - return false - } + return false } p.Context, p.cancel = context.WithCancel(p.Stream) p.Publisher = p p.StartTime = time.Now() //触发钩子 - OnPublishHooks.Trigger(p.Stream) + TriggerHook(Hook{"Publish",p.Stream}) return true } diff --git a/ring.go b/ring.go deleted file mode 100644 index f853106..0000000 --- a/ring.go +++ /dev/null @@ -1,98 +0,0 @@ -package engine - -import ( - "bytes" - "sync" - "time" - - "github.com/Monibuca/engine/v2/avformat" -) - -type RingItem struct { - avformat.AVPacket - sync.WaitGroup - *bytes.Buffer - UpdateTime time.Time -} - -// Ring 环形缓冲,使用数组实现 -type Ring struct { - *RingItem - buffer []RingItem - Size int - Index int -} - -// NewRing 创建Ring,传入大小指数 -func NewRing(exp int) (r *Ring) { - r = new(Ring) - r.Size = 1 << exp - r.buffer = make([]RingItem, r.Size) - r.RingItem = &r.buffer[0] - r.Add(1) - return -} -func (r *Ring) offset(v int) int { - return (r.Index + v) & (r.Size - 1) -} - -// GoTo 移动到指定索引处 -func (r *Ring) GoTo(index int) { - r.Index = index - r.RingItem = &r.buffer[index] -} - -// GetAt 获取指定索引处的引用 -func (r *Ring) GetAt(index int) *RingItem { - return &r.buffer[index] -} - -// GetNext 获取下一个位置的引用 -func (r *Ring) GetNext() *RingItem { - return &r.buffer[r.offset(1)] -} - -// GetLast 获取上一个位置的引用 -func (r *Ring) GetLast() *RingItem { - return &r.buffer[r.offset(-1)] -} - -// GoNext 移动到下一个位置 -func (r *Ring) GoNext() { - r.Index = r.offset(1) - r.RingItem = &r.buffer[r.Index] -} - -// GoBack 移动到上一个位置 -func (r *Ring) GoBack() { - r.Index = r.offset(-1) - r.RingItem = &r.buffer[r.Index] -} - -// NextW 写下一个 -func (r *Ring) NextW() { - item := r.RingItem - item.UpdateTime = time.Now() - r.GoNext() - r.RingItem.Add(1) - item.Done() -} - -func (r *Ring) GetBuffer() *bytes.Buffer { - if r.Buffer == nil { - r.Buffer = bytes.NewBuffer([]byte{}) - } else { - r.Reset() - } - return r.Buffer -} - -// Clone 克隆一个Ring -func (r Ring) Clone() *Ring { - return &r -} - -// Timeout 发布者是否超时了 -func (r *Ring) Timeout() bool { - return time.Since(r.UpdateTime) > config.PublishTimeout -} diff --git a/stream.go b/stream.go deleted file mode 100644 index 7d984c5..0000000 --- a/stream.go +++ /dev/null @@ -1,369 +0,0 @@ -package engine - -import ( - "bytes" - "context" - "log" - "sync" - "time" - - . "github.com/Monibuca/engine/v2/avformat" - . "github.com/logrusorgru/aurora" -) - -var streamCollection Collection - -// Collection 对sync.Map的包装 -type Collection struct { - sync.Map -} - -//FindStream 根据流路径查找流 -func FindStream(streamPath string) *Stream { - if s, ok := streamCollection.Load(streamPath); ok { - return s.(*Stream) - } - return nil -} - -//GetStream 根据流路径获取流,如果不存在则创建一个新的 -func GetStream(streamPath string) (result *Stream) { - item, loaded := streamCollection.LoadOrStore(streamPath, &Stream{ - Subscribers: make(map[string]*Subscriber), - Control: make(chan interface{}), - AVRing: NewRing(config.RingSize), - StreamInfo: StreamInfo{ - StreamPath: streamPath, - SubscriberInfo: make([]*SubscriberInfo, 0), - HasVideo: true, - HasAudio: true, - EnableAudio: &config.EnableAudio, - EnableVideo: &config.EnableVideo, - }, - WaitPub: make(chan struct{}), - }) - result = item.(*Stream) - if !loaded { - Summary.Streams = append(Summary.Streams, &result.StreamInfo) - result.Context, result.Cancel = context.WithCancel(context.Background()) - if config.EnableVideo { - result.EnableVideo = &result.HasVideo - } - if config.EnableAudio { - result.EnableAudio = &result.HasAudio - } - go result.Run() - } - return -} - -// Stream 流定义 -type Stream struct { - context.Context - *Publisher - StreamInfo //可序列化,供后台查看的数据 - Control chan interface{} - Cancel context.CancelFunc - Subscribers map[string]*Subscriber // 订阅者 - VideoTag *AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 - AudioTag *AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 - FirstScreen *Ring //最近的关键帧位置,首屏渲染 - AVRing *Ring //数据环 - WaitPub chan struct{} //用于订阅和等待发布者 - UseTimestamp bool //是否采用数据包中的时间戳 - SPS []byte - PPS []byte -} - -// StreamInfo 流可序列化信息,用于控制台显示 -type StreamInfo struct { - StreamPath string - StartTime time.Time - SubscriberInfo []*SubscriberInfo - Type string - VideoInfo struct { - PacketCount int - CodecID byte - SPSInfo SPSInfo - BPS int - lastIndex int - GOP int //关键帧间隔 - } - AudioInfo struct { - PacketCount int - SoundFormat byte //4bit - SoundRate int //2bit - SoundSize byte //1bit - SoundType byte //1bit - lastIndex int - BPS int - } - HasAudio bool - HasVideo bool - EnableVideo *bool - EnableAudio *bool -} - -// UnSubscribeCmd 取消订阅命令 -type UnSubscribeCmd struct { - *Subscriber -} - -// SubscribeCmd 订阅流命令 -type SubscribeCmd struct { - *Subscriber -} - -// ChangeStreamCmd 切换流命令 -type ChangeStreamCmd struct { - *Subscriber - NewStream *Stream -} - -func (r *Stream) onClosed() { - Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) - streamCollection.Delete(r.StreamPath) - for i, val := range Summary.Streams { - if val == &r.StreamInfo { - Summary.Streams = append(Summary.Streams[:i], Summary.Streams[i+1:]...) - break - } - } - OnStreamClosedHooks.Trigger(r) -} - -//Subscribe 订阅流 -func (r *Stream) Subscribe(s *Subscriber) { - s.Stream = r - if r.Err() == nil { - s.SubscribeTime = time.Now() - Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(s.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) - s.Context, s.Cancel = context.WithCancel(r) - s.Control <- &SubscribeCmd{s} - } -} - -//UnSubscribe 取消订阅流 -func (r *Stream) UnSubscribe(s *Subscriber) { - if r.Err() == nil { - r.Control <- &UnSubscribeCmd{s} - } -} - -// Run 流运行 -func (r *Stream) Run() { - Print(Green("Stream create:"), BrightCyan(r.StreamPath)) - defer r.onClosed() - for { - select { - case <-r.Done(): - return - case s := <-r.Control: - switch v := s.(type) { - case *UnSubscribeCmd: - if _, ok := r.Subscribers[v.ID]; ok { - delete(r.Subscribers, v.ID) - for i, val := range r.SubscriberInfo { - if val == &v.SubscriberInfo { - r.SubscriberInfo = append(r.SubscriberInfo[:i], r.SubscriberInfo[i+1:]...) - break - } - } - OnUnSubscribeHooks.Trigger(v.Subscriber) - Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo)))) - if len(r.SubscriberInfo) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) { - r.Cancel() - } - } - case *SubscribeCmd: - //防止重复添加 - if _, ok := r.Subscribers[v.ID]; !ok { - r.Subscribers[v.ID] = v.Subscriber - r.SubscriberInfo = append(r.SubscriberInfo, &v.SubscriberInfo) - Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(v.ID), Blue(len(r.SubscriberInfo)))) - OnSubscribeHooks.Trigger(v.Subscriber) - } - case *ChangeStreamCmd: - if _, ok := v.NewStream.Subscribers[v.ID]; !ok { - delete(r.Subscribers, v.ID) - v.NewStream.Subscribe(v.Subscriber) - if len(r.SubscriberInfo) == 0 && r.Publisher == nil { - r.Cancel() - } - } - } - } - } -} - -// GetBuffer 获取用于写入的缓冲区 -func (r *Stream) GetBuffer() *bytes.Buffer { - return r.AVRing.GetBuffer() -} -func (r *Stream) WriteASC(asc []byte) { - if r.AudioTag == nil { - r.AudioTag = NewAVPacket(FLV_TAG_TYPE_AUDIO) - r.AudioTag.IsSequence = true - r.AudioTag.Payload = append(append(r.AudioTag.Payload, 0xAF, 0), asc...) - } else { - r.AudioTag.Payload = append(r.AudioTag.Payload[:2], asc...) - } - config1 := asc[0] - config2 := asc[1] - r.AudioInfo.SoundFormat = 10 - //audioObjectType = (config1 & 0xF8) >> 3 - // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 - // 2 AAC LC ISO/IEC 14496-3 subpart 4 - // 3 AAC SSR ISO/IEC 14496-3 subpart 4 - // 4 AAC LTP ISO/IEC 14496-3 subpart 4 - r.AudioInfo.SoundRate = SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)] - r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道 - //frameLengthFlag = (config2 >> 2) & 0x01 - //dependsOnCoreCoder = (config2 >> 1) & 0x01 - //extensionFlag = config2 & 0x01 -} - -// PushAudio 来自发布者推送的音频 -func (r *Stream) PushAudio(timestamp uint32, payload []byte) { - audio := r.AVRing - payloadLen := len(payload) - audio.Type = FLV_TAG_TYPE_AUDIO - audio.Timestamp = timestamp - audio.Payload = payload - audio.IsKeyFrame = false - audio.IsSequence = false - - if payloadLen < 4 { - return - } - if payload[0] == 0xFF && (payload[1]&0xF0) == 0xF0 { - //将ADTS转换成ASC - r.AudioInfo.SoundFormat = 10 - r.AudioInfo.SoundRate = SamplingFrequencies[(payload[2]&0x3c)>>2] - r.AudioInfo.SoundType = ((payload[2] & 0x1) << 2) | ((payload[3] & 0xc0) >> 6) - r.AudioTag = audio.ADTS2ASC() - } else if r.AudioTag == nil && r.AudioInfo.SoundRate == 0 { - audio.IsSequence = true - // if payloadLen < 5 { - // return - // } - r.AudioTag = audio.AVPacket.Clone() - tmp := payload[0] // 第一个字节保存着音频的相关信息 - if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话,后面有一个字节的详细信息 - //0 = AAC sequence header,1 = AAC raw。 - if aacPacketType := payload[1]; aacPacketType == 0 { - config1 := payload[2] - config2 := payload[3] - //audioObjectType = (config1 & 0xF8) >> 3 - // 1 AAC MAIN ISO/IEC 14496-3 subpart 4 - // 2 AAC LC ISO/IEC 14496-3 subpart 4 - // 3 AAC SSR ISO/IEC 14496-3 subpart 4 - // 4 AAC LTP ISO/IEC 14496-3 subpart 4 - r.AudioInfo.SoundRate = SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)] - r.AudioInfo.SoundType = (config2 >> 3) & 0x0F //声道 - //frameLengthFlag = (config2 >> 2) & 0x01 - //dependsOnCoreCoder = (config2 >> 1) & 0x01 - //extensionFlag = config2 & 0x01 - } - return - } else { - r.AudioInfo.SoundRate = SoundRate[(tmp&0x0c)>>2] // 采样率 0 = 5.5 kHz or 1 = 11 kHz or 2 = 22 kHz or 3 = 44 kHz - r.AudioInfo.SoundSize = (tmp & 0x02) >> 1 // 采样精度 0 = 8-bit samples or 1 = 16-bit samples - r.AudioInfo.SoundType = tmp & 0x01 // 0 单声道,1立体声 - } - } - if !r.UseTimestamp { - audio.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) - } - lastTimestamp := audio.GetAt(r.AudioInfo.lastIndex).Timestamp - if lastTimestamp > 0 && lastTimestamp != audio.Timestamp { - r.AudioInfo.BPS = payloadLen * 1000 / int(audio.Timestamp-lastTimestamp) - } - r.AudioInfo.PacketCount++ - audio.Number = r.AudioInfo.PacketCount - r.AudioInfo.lastIndex = audio.Index - audio.NextW() - if r.AudioInfo.PacketCount == 1 && (!*r.EnableVideo) { - close(r.WaitPub) - } -} -func (r *Stream) setH264Info(video *Ring) { - r.VideoTag = video.AVPacket.Clone() - if r.VideoInfo.CodecID != 7 { - return - } - var info AVCDecoderConfigurationRecord - //0:codec,1:IsAVCSequence,2~4:compositionTime - if _, err := info.Unmarshal(video.Payload[5:]); err == nil { - r.VideoInfo.SPSInfo, err = ParseSPS(info.SequenceParameterSetNALUnit) - r.SPS = info.SequenceParameterSetNALUnit - r.PPS = info.PictureParameterSetNALUnit - } -} -func (r *Stream) WriteSPS(sps []byte) { - lenSPS := len(sps) - r.SPS = sps - if r.VideoTag == nil { - r.VideoTag = NewAVPacket(FLV_TAG_TYPE_VIDEO) - r.VideoTag.IsSequence = true - r.VideoTag.IsKeyFrame = true - r.VideoTag.Payload = append(r.VideoTag.Payload, RTMP_AVC_HEAD...) - } - r.VideoInfo.SPSInfo, _ = ParseSPS(sps) - copy(r.VideoTag.Payload[6:], sps[1:4]) - r.VideoTag.Payload = append(append(r.VideoTag.Payload[:10], 0xE1, byte(lenSPS>>8), byte(lenSPS)), sps...) -} -func (r *Stream) WritePPS(pps []byte) { - lenPPS := len(pps) - r.PPS = pps - r.VideoTag.Payload = append(append(r.VideoTag.Payload, 0x01, byte(lenPPS>>8), byte(lenPPS)), pps...) -} - -// PushVideo 来自发布者推送的视频 -func (r *Stream) PushVideo(timestamp uint32, payload []byte) { - payloadLen := len(payload) - if payloadLen < 3 { - return - } - video := r.AVRing - video.Type = FLV_TAG_TYPE_VIDEO - video.Timestamp = timestamp - video.Payload = payload - videoFrameType := payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2 - r.VideoInfo.CodecID = payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC... - video.IsSequence = videoFrameType == 1 && payload[1] == 0 - video.IsKeyFrame = videoFrameType == 1 || videoFrameType == 4 - r.VideoInfo.PacketCount++ - video.Number = r.VideoInfo.PacketCount - if r.VideoTag == nil { - if video.IsSequence { - r.setH264Info(video) - } else { - log.Println("no AVCSequence") - } - } else { - //更换AVCSequence - if video.IsSequence { - r.setH264Info(video) - } - if video.IsKeyFrame { - if r.FirstScreen == nil { - defer close(r.WaitPub) - r.FirstScreen = video.Clone() - } else { - oldNumber := r.FirstScreen.Number - r.FirstScreen.GoTo(video.Index) - r.VideoInfo.GOP = r.FirstScreen.Number - oldNumber - } - } - if !r.UseTimestamp { - video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) - } - lastTimestamp := video.GetAt(r.VideoInfo.lastIndex).Timestamp - if lastTimestamp > 0 && lastTimestamp != video.Timestamp { - r.VideoInfo.BPS = payloadLen * 1000 / int(video.Timestamp-lastTimestamp) - } - r.VideoInfo.lastIndex = video.Index - video.NextW() - } -} diff --git a/stream.go2 b/stream.go2 new file mode 100644 index 0000000..09ae7cb --- /dev/null +++ b/stream.go2 @@ -0,0 +1,99 @@ +package engine + +import ( + "context" + "github.com/Monibuca/utils/v3/go2" + "sync" + "time" + . "github.com/logrusorgru/aurora" +) + +var streamCollection sync.Map + +//FindStream 根据流路径查找流 +func FindStream(streamPath string) *Stream { + if s, ok := streamCollection.Load(streamPath); ok { + return s.(*Stream) + } + return nil +} + +//GetStream 根据流路径获取流,如果不存在则创建一个新的 +func GetStream(streamPath string) (result *Stream) { + item, loaded := streamCollection.LoadOrStore(streamPath, &Stream{ + StreamPath: streamPath, + HasVideo: true, + HasAudio: true, + EnableAudio: &config.EnableAudio, + EnableVideo: &config.EnableVideo, + WaitPub: make(chan struct{}), + }) + result = item.(*Stream) + if !loaded { + result.Context, result.cancel = context.WithCancel(context.Background()) + if config.EnableVideo { + result.EnableVideo = &result.HasVideo + } + if config.EnableAudio { + result.EnableAudio = &result.HasAudio + } + Print(Green("Stream create:"), BrightCyan(streamPath)) + } + return +} + +// Stream 流定义 +type Stream struct { + context.Context + cancel context.CancelFunc + StreamPath string + StartTime time.Time //流的创建时间 + *Publisher + Subscribers []*Subscriber // 订阅者 + //VideoTag *AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 + //AudioTag *AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 + VideoTracks []VideoTrack + AudioTracks []AudioTrack + WaitPub chan struct{} //用于订阅和等待发布者 + UseTimestamp bool //是否采用数据包中的时间戳 + HasAudio bool + HasVideo bool + EnableVideo *bool + EnableAudio *bool + subscribeMutex sync.Mutex +} + +func (r *Stream) Close() { + r.cancel() + Print(Yellow("Stream destoryed :"), BrightCyan(r.StreamPath)) + streamCollection.Delete(r.StreamPath) + TriggerHook(Hook{"StreamClose",r}) +} + +//Subscribe 订阅流 +func (r *Stream) Subscribe(s *Subscriber) { + if s.Stream = r;r.Err() == nil { + s.SubscribeTime = time.Now() + Print(Sprintf(Yellow("subscribe :%s %s,to Stream %s"), Blue(r.Type), Cyan(s.ID), BrightCyan(r.StreamPath))) + s.Context, s.Cancel = context.WithCancel(r) + r.subscribeMutex.Lock() + r.Subscribers = append(r.Subscribers,s) + r.subscribeMutex.Unlock() + Print(Sprintf(Yellow("%s subscriber %s added remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) + TriggerHook(Hook{"Subscribe",s}) + } +} + +//UnSubscribe 取消订阅流 +func (r *Stream) UnSubscribe(s *Subscriber) { + if r.Err() == nil { + Print(Sprintf(Yellow("%s subscriber %s removed remains:%d"), BrightCyan(r.StreamPath), Cyan(s.ID), Blue(len(r.Subscribers)))) + r.subscribeMutex.Lock() + go2.DeleteSliceItem[Subscriber](r.Subscribers,s) + r.subscribeMutex.Unlock() + TriggerHook(Hook{"UnSubscribe",s}) + if len(r.Subscribers) == 0 && (r.Publisher == nil || r.Publisher.AutoUnPublish) { + r.Close() + } + } +} \ No newline at end of file diff --git a/subscriber.go b/subscriber.go2 similarity index 77% rename from subscriber.go rename to subscriber.go2 index 49ca084..1a4c139 100644 --- a/subscriber.go +++ b/subscriber.go2 @@ -2,9 +2,11 @@ package engine import ( "context" + "encoding/json" + "github.com/pion/rtp" "time" - "github.com/Monibuca/engine/v2/avformat" + "github.com/Monibuca/engine/v3/avformat" "github.com/pkg/errors" ) @@ -24,13 +26,14 @@ type Subscriber struct { context.Context *Stream SubscriberInfo - MetaData func(stream *Stream) error - OnData func(*avformat.SendPacket) error + MetaData func() error + OnData func(rtp.Packet) error Cancel context.CancelFunc Sign string OffsetTime uint32 startTime uint32 - avformat.SendPacket + vtIndex int //第几个视频轨 + atIndex int //第几个音频轨 } // IsClosed 检查订阅者是否已经关闭 @@ -45,6 +48,9 @@ func (s *Subscriber) Close() { } } +func (s *Subscriber) MarshalJSON() ([]byte, error) { + return json.Marshal(s.SubscriberInfo) +} //Subscribe 开始订阅 func (s *Subscriber) Subscribe(streamPath string) (err error) { if !config.EnableWaitStream && FindStream(streamPath) == nil { @@ -62,23 +68,23 @@ func (s *Subscriber) Subscribe(streamPath string) (err error) { return s.Err() } if s.MetaData != nil { - if err = s.MetaData(s.Stream); err != nil { + if err = s.MetaData(); err != nil { return err } } if *s.EnableVideo { - s.sendAv(s.VideoTag, 0) - packet := s.FirstScreen.Clone() - s.startTime = packet.Timestamp // 开始时间戳,第一个关键帧的 - s.Delay = s.AVRing.GetLast().Timestamp - packet.Timestamp + videoTrack:=s.VideoTracks[s.vtIndex] + packet := videoTrack.Buffer.SubRing(videoTrack.FirstScreen) + s.startTime = packet.Current.Timestamp // 开始时间戳,第一个关键帧的 + s.Delay = videoTrack.Buffer.GetLast().Timestamp - packet.Current.Timestamp s.send(packet) packet.GoNext() // targetStartTime := s.AVRing.GetLast().Timestamp //实际开始时间戳 for atsent, dropping, droped := s.AudioTag == nil, false, 0; s.Err() == nil; packet.GoNext() { s.TotalPacket++ if !dropping { - packet.Wait() - if !atsent && packet.Type == avformat.FLV_TAG_TYPE_AUDIO { + packet.Current.Wait() + if !atsent && packet.Current.Type == avformat.FLV_TAG_TYPE_AUDIO { s.sendAv(s.AudioTag, 0) atsent = true } @@ -101,10 +107,8 @@ func (s *Subscriber) Subscribe(streamPath string) (err error) { } } } else if *s.EnableAudio { - if s.AudioTag != nil { - s.sendAv(s.AudioTag, 0) - } - for packet := s.AVRing; s.Err() == nil; packet.GoNext() { + audioTrack:=s.AudioTracks[s.atIndex] + for packet := audioTrack; s.Err() == nil; packet.Buffer.GoNext() { s.TotalPacket++ s.send(packet) } diff --git a/summary.go b/summary.go deleted file mode 100644 index aa1c980..0000000 --- a/summary.go +++ /dev/null @@ -1,148 +0,0 @@ -package engine - -import ( - "log" - "time" - - "github.com/shirou/gopsutil/cpu" - "github.com/shirou/gopsutil/disk" - "github.com/shirou/gopsutil/mem" - "github.com/shirou/gopsutil/net" -) - -// Summary 系统摘要数据 -var Summary = ServerSummary{} - -// ServerSummary 系统摘要定义 -type ServerSummary struct { - Address string - Memory struct { - Total uint64 - Free uint64 - Used uint64 - Usage float64 - } - CPUUsage float64 - HardDisk struct { - Total uint64 - Free uint64 - Used uint64 - Usage float64 - } - NetWork []NetWorkInfo - Streams []*StreamInfo - lastNetWork []NetWorkInfo - ref int - control chan bool - reportChan chan *ServerSummary - Children map[string]*ServerSummary -} - -// NetWorkInfo 网速信息 -type NetWorkInfo struct { - Name string - Receive uint64 - Sent uint64 - ReceiveSpeed uint64 - SentSpeed uint64 -} - -//StartSummary 开始定时采集数据,每秒一次 -func (s *ServerSummary) StartSummary() { - ticker := time.NewTicker(time.Second) - s.control = make(chan bool) - s.reportChan = make(chan *ServerSummary) - for { - select { - case <-ticker.C: - if s.ref > 0 { - Summary.collect() - } - case v := <-s.control: - if v { - if s.ref++; s.ref == 1 { - log.Println("start report summary") - OnSummaryHooks.Trigger(true) - } - } else { - if s.ref--; s.ref == 0 { - s.lastNetWork = nil - log.Println("stop report summary") - OnSummaryHooks.Trigger(false) - } - } - case report := <-s.reportChan: - s.Children[report.Address] = report - } - } -} - -// Running 是否正在采集数据 -func (s *ServerSummary) Running() bool { - return s.ref > 0 -} - -// Add 增加订阅者 -func (s *ServerSummary) Add() { - s.control <- true -} - -// Done 删除订阅者 -func (s *ServerSummary) Done() { - s.control <- false -} - -// Report 上报数据 -func (s *ServerSummary) Report(slave *ServerSummary) { - s.reportChan <- slave -} -func (s *ServerSummary) collect() { - v, _ := mem.VirtualMemory() - //c, _ := cpu.Info() - cc, _ := cpu.Percent(time.Second, false) - d, _ := disk.Usage("/") - //n, _ := host.Info() - nv, _ := net.IOCounters(true) - //boottime, _ := host.BootTime() - //btime := time.Unix(int64(boottime), 0).Format("2006-01-02 15:04:05") - s.Memory.Total = v.Total / 1024 / 1024 - s.Memory.Free = v.Available / 1024 / 1024 - s.Memory.Used = v.Used / 1024 / 1024 - s.Memory.Usage = v.UsedPercent - //fmt.Printf(" Mem : %v MB Free: %v MB Used:%v Usage:%f%%\n", v.Total/1024/1024, v.Available/1024/1024, v.Used/1024/1024, v.UsedPercent) - //if len(c) > 1 { - // for _, sub_cpu := range c { - // modelname := sub_cpu.ModelName - // cores := sub_cpu.Cores - // fmt.Printf(" CPU : %v %v cores \n", modelname, cores) - // } - //} else { - // sub_cpu := c[0] - // modelname := sub_cpu.ModelName - // cores := sub_cpu.Cores - // fmt.Printf(" CPU : %v %v cores \n", modelname, cores) - //} - s.CPUUsage = cc[0] - s.HardDisk.Free = d.Free / 1024 / 1024 / 1024 - s.HardDisk.Total = d.Total / 1024 / 1024 / 1024 - s.HardDisk.Used = d.Used / 1024 / 1024 / 1024 - s.HardDisk.Usage = d.UsedPercent - s.NetWork = make([]NetWorkInfo, len(nv)) - for i, n := range nv { - s.NetWork[i].Name = n.Name - s.NetWork[i].Receive = n.BytesRecv - s.NetWork[i].Sent = n.BytesSent - if s.lastNetWork != nil && len(s.lastNetWork) > i { - s.NetWork[i].ReceiveSpeed = n.BytesRecv - s.lastNetWork[i].Receive - s.NetWork[i].SentSpeed = n.BytesSent - s.lastNetWork[i].Sent - } - } - s.lastNetWork = s.NetWork - //fmt.Printf(" Network: %v bytes / %v bytes\n", nv[0].BytesRecv, nv[0].BytesSent) - //fmt.Printf(" SystemBoot:%v\n", btime) - //fmt.Printf(" CPU Used : used %f%% \n", cc[0]) - //fmt.Printf(" HD : %v GB Free: %v GB Usage:%f%%\n", d.Total/1024/1024/1024, d.Free/1024/1024/1024, d.UsedPercent) - //fmt.Printf(" OS : %v(%v) %v \n", n.Platform, n.PlatformFamily, n.PlatformVersion) - //fmt.Printf(" Hostname : %v \n", n.Hostname) - return -} diff --git a/util/SSE.go b/util/SSE.go deleted file mode 100644 index 8581fdc..0000000 --- a/util/SSE.go +++ /dev/null @@ -1,73 +0,0 @@ -package util - -import ( - "context" - "encoding/json" - "net/http" - "os/exec" -) - -var ( - sseEent = []byte("event: ") - sseBegin = []byte("data: ") - sseEnd = []byte("\n\n") -) - -type SSE struct { - http.ResponseWriter - context.Context -} - -func (sse *SSE) Write(data []byte) (n int, err error) { - if err = sse.Err(); err != nil { - return - } - _, err = sse.ResponseWriter.Write(sseBegin) - n, err = sse.ResponseWriter.Write(data) - _, err = sse.ResponseWriter.Write(sseEnd) - if err != nil { - return - } - sse.ResponseWriter.(http.Flusher).Flush() - return -} - -func (sse *SSE) WriteEvent(event string, data []byte) (err error) { - if err = sse.Err(); err != nil { - return - } - _, err = sse.ResponseWriter.Write(sseEent) - _, err = sse.ResponseWriter.Write([]byte(event)) - _, err = sse.ResponseWriter.Write([]byte("\n")) - _, err = sse.Write(data) - return -} - -func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE { - header := w.Header() - header.Set("Content-Type", "text/event-stream") - header.Set("Cache-Control", "no-cache") - header.Set("Connection", "keep-alive") - header.Set("X-Accel-Buffering", "no") - header.Set("Access-Control-Allow-Origin", "*") - return &SSE{ - w, - ctx, - } -} - -func (sse *SSE) WriteJSON(data interface{}) (err error) { - var jsonData []byte - if jsonData, err = json.Marshal(data); err == nil { - if _, err = sse.Write(jsonData); err != nil { - return - } - return - } - return -} -func (sse *SSE) WriteExec(cmd *exec.Cmd) error { - cmd.Stderr = sse - cmd.Stdout = sse - return cmd.Run() -} diff --git a/util/linux.go b/util/linux.go new file mode 100644 index 0000000..6b66b28 --- /dev/null +++ b/util/linux.go @@ -0,0 +1,13 @@ +// +build !windows + +package util + +import ( + "fmt" + "io/ioutil" + "os" +) + +func CreateShutdownScript() error{ + return ioutil.WriteFile("shutdown.sh", []byte(fmt.Sprintf("kill -9 %d", os.Getpid())), 0777) +} \ No newline at end of file diff --git a/util/stderr.go b/util/stderr.go index 31e74ad..a9d85b6 100644 --- a/util/stderr.go +++ b/util/stderr.go @@ -5,6 +5,7 @@ package util import ( "log" "os" + "runtime" "syscall" ) @@ -14,7 +15,10 @@ func init() { log.Println("服务启动出错", "打开异常日志文件失败", err) return } - // 将进程标准出错重定向至文件,进程崩溃时运行时将向该文件记录协程调用栈信息 - syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd())) + if runtime.GOARCH == "arm64" { + syscall.Dup3(int(logFile.Fd()), int(os.Stderr.Fd()), 0) + } else { + syscall.Dup2(int(logFile.Fd()), int(os.Stderr.Fd())) + } } diff --git a/util/windows.go b/util/windows.go new file mode 100644 index 0000000..82116c0 --- /dev/null +++ b/util/windows.go @@ -0,0 +1,13 @@ +// +build windows + +package util + +import ( + "fmt" + "io/ioutil" + "os" +) + +func CreateShutdownScript() error{ + return ioutil.WriteFile("shutdown.bat", []byte(fmt.Sprintf("taskkill /pid %d -t -f", os.Getpid())), 0777) +} \ No newline at end of file diff --git a/version.go b/version.go new file mode 100644 index 0000000..25df0cd --- /dev/null +++ b/version.go @@ -0,0 +1,2 @@ +package engine +var Version = "v3.0.1" \ No newline at end of file diff --git a/video_track.go2 b/video_track.go2 new file mode 100644 index 0000000..f0ec67e --- /dev/null +++ b/video_track.go2 @@ -0,0 +1,75 @@ +package engine + +import ( + "github.com/Monibuca/utils/v3/go2" + "github.com/pion/rtp" + "log" + "time" +) + +type VideoTrack struct { + FirstScreen byte //最近的关键帧位置,首屏渲染 + Buffer *go2.Ring[rtp.Packet] + SPS []byte + PPS []byte + Info struct { + PacketCount int + CodecID byte + SPSInfo SPSInfo + BPS int + lastIndex byte + GOP int //关键帧间隔 + } +} +func (vt *VideoTrack) GetBPS(payloadLen int){ + lastTimestamp := vt.Buffer.GetAt(vt.Info.lastIndex).Timestamp + if lastTimestamp > 0 && lastTimestamp != vt.Buffer.Current.Timestamp { + vt.Info.BPS = payloadLen * 1000 / int(vt.Buffer.Current.Timestamp-lastTimestamp) + } + vt.Info.lastIndex = vt.Buffer.Index +} + +// PushVideo 来自发布者推送的视频 +func (vt *VideoTrack) Push(timestamp uint32, payload []byte) { + payloadLen := len(payload) + if payloadLen < 3 { + return + } + video := r.VideoRing + video.Type = FLV_TAG_TYPE_VIDEO + video.Timestamp = timestamp + video.Payload = payload + videoFrameType := payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2 + r.VideoInfo.CodecID = payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC... + video.IsSequence = videoFrameType == 1 && payload[1] == 0 + video.IsKeyFrame = videoFrameType == 1 || videoFrameType == 4 + r.VideoInfo.PacketCount++ + video.Number = r.VideoInfo.PacketCount + if r.VideoTag == nil { + if video.IsSequence { + r.setH264Info(video) + } else { + log.Println("no AVCSequence") + } + } else { + //更换AVCSequence + if video.IsSequence { + r.setH264Info(video) + } + if video.IsKeyFrame { + if r.FirstScreen == nil { + defer close(r.WaitPub) + r.FirstScreen = video.Clone() + } else { + oldNumber := r.FirstScreen.Number + r.FirstScreen.GoTo(video.Index) + r.VideoInfo.GOP = r.FirstScreen.Number - oldNumber + } + } + if !r.UseTimestamp { + video.Timestamp = uint32(time.Since(r.StartTime) / time.Millisecond) + } + + video.NextW() + } +}