Compare commits

...

1 Commits

Author SHA1 Message Date
langhuihui
8a35f763dd 集群采集信息功能完善 2020-02-04 13:46:19 +08:00
13 changed files with 101 additions and 84 deletions

View File

@@ -7,8 +7,8 @@ ListenAddr = ":1935"
[Plugins.GateWay] [Plugins.GateWay]
ListenAddr = ":8081" ListenAddr = ":8081"
[Plugins.Cluster] [Plugins.Cluster]
Master = "203.60.1.23:2019" #Master = "localhost:2019"
#ListenAddr = ":2019" ListenAddr = ":2019"
# #
#[Plugins.Auth] #[Plugins.Auth]
#Key="www.monibuca.com" #Key="www.monibuca.com"

View File

@@ -1 +1 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ea4656d8.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.af5e5ef3.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ea4656d8.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.af5e5ef3.js></script></body></html> <!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ea4656d8.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.0267da57.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ea4656d8.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.0267da57.js></script></body></html>

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers(); this.initBuffers();
this.initTextures(); this.initTextures();
}; };
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js') this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this var _this = this
function draw(output) { function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output) _this.drawNextOutputPicture(_this.width, _this.height, null, output)
@@ -118,12 +118,15 @@ Jessibuca.prototype.playAudio = function (data) {
} }
// setTimeout(playNextBuffer, buffer.duration * 1000) // setTimeout(playNextBuffer, buffer.duration * 1000)
} }
var tryPlay = function (buffer) { var decodeAudio = function () {
if (decodeQueue.length) { if (decodeQueue.length) {
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error); context.decodeAudioData(decodeQueue.shift(), tryPlay, decodeAudio);
} else { } else {
isDecoding = false isDecoding = false
} }
}
var tryPlay = function (buffer) {
decodeAudio()
if (isPlaying) { if (isPlaying) {
audioBuffers.push(buffer); audioBuffers.push(buffer);
} else { } else {
@@ -134,7 +137,7 @@ Jessibuca.prototype.playAudio = function (data) {
decodeQueue.push(...data) decodeQueue.push(...data)
if (!isDecoding) { if (!isDecoding) {
isDecoding = true isDecoding = true
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error); decodeAudio()
} }
} }
this.playAudio = playAudio this.playAudio = playAudio
@@ -452,7 +455,7 @@ Jessibuca.prototype.close = function () {
this.decoderWorker.postMessage({ cmd: "close" }) this.decoderWorker.postMessage({ cmd: "close" })
this.contextGL.clear(this.contextGL.COLOR_BUFFER_BIT); this.contextGL.clear(this.contextGL.COLOR_BUFFER_BIT);
} }
Jessibuca.prototype.destroy = function(){ Jessibuca.prototype.destroy = function () {
this.decoderWorker.terminate() this.decoderWorker.terminate()
} }
Jessibuca.prototype.play = function (url) { Jessibuca.prototype.play = function (url) {

File diff suppressed because one or more lines are too long

1
dashboard/dist/js/app.0267da57.js.map vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers(); this.initBuffers();
this.initTextures(); this.initTextures();
}; };
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js') this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this var _this = this
function draw(output) { function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output) _this.drawNextOutputPicture(_this.width, _this.height, null, output)

View File

@@ -58,16 +58,15 @@ export default {
} }
} }
} }
if (node.Children && node.Children.length > 0) { if (node.Children) {
for (let i = 0; i < node.Children.length; i++) { for (let childId in node.Children) {
let child = this.addServer(node.Children[i], nodes, edges); this.addServer(node.Children[childId], nodes, edges);
edges.push({ edges.push({
source: result.id, source: result.id,
target: child.id target: childId
}); });
} }
} }
return result;
} }
}, },
watch: { watch: {

BIN
monibuca.exe~ Normal file

Binary file not shown.

View File

@@ -8,8 +8,8 @@ func (h AuthHook) AddHook(hook func(string) error) {
AuthHooks = append(h, hook) AuthHooks = append(h, hook)
} }
func (h AuthHook) Trigger(sign string) error { func (h AuthHook) Trigger(sign string) error {
for _, h := range h { for _, f := range h {
if err := h(sign); err != nil { if err := f(sign); err != nil {
return err return err
} }
} }
@@ -24,8 +24,8 @@ func (h OnPublishHook) AddHook(hook func(r *Room)) {
OnPublishHooks = append(h, hook) OnPublishHooks = append(h, hook)
} }
func (h OnPublishHook) Trigger(r *Room) { func (h OnPublishHook) Trigger(r *Room) {
for _, h := range h { for _, f := range h {
h(r) f(r)
} }
} }
@@ -37,8 +37,8 @@ func (h OnSubscribeHook) AddHook(hook func(s *OutputStream)) {
OnSubscribeHooks = append(h, hook) OnSubscribeHooks = append(h, hook)
} }
func (h OnSubscribeHook) Trigger(s *OutputStream) { func (h OnSubscribeHook) Trigger(s *OutputStream) {
for _, h := range h { for _, f := range h {
h(s) f(s)
} }
} }
@@ -50,8 +50,8 @@ func (h OnDropHook) AddHook(hook func(s *OutputStream)) {
OnDropHooks = append(h, hook) OnDropHooks = append(h, hook)
} }
func (h OnDropHook) Trigger(s *OutputStream) { func (h OnDropHook) Trigger(s *OutputStream) {
for _, h := range h { for _, f := range h {
h(s) f(s)
} }
} }
@@ -63,7 +63,7 @@ func (h OnSummaryHook) AddHook(hook func(bool)) {
OnSummaryHooks = append(h, hook) OnSummaryHooks = append(h, hook)
} }
func (h OnSummaryHook) Trigger(v bool) { func (h OnSummaryHook) Trigger(v bool) {
for _, h := range h { for _, f := range h {
h(v) f(v)
} }
} }

View File

@@ -3,6 +3,7 @@ package cluster
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"io"
"log" "log"
"math/rand" "math/rand"
"net" "net"
@@ -46,49 +47,29 @@ func run() {
if MayBeError(err) { if MayBeError(err) {
return return
} }
masterConn, err = net.DialTCP("tcp", nil, addr) go readMaster(addr)
if MayBeError(err) {
return
}
go readMaster()
} }
if config.ListenAddr != "" { if config.ListenAddr != "" {
Summary.Children = make(map[string]*ServerSummary)
OnSummaryHooks.AddHook(onSummary) OnSummaryHooks.AddHook(onSummary)
log.Printf("server bare start at %s", config.ListenAddr) log.Printf("server bare start at %s", config.ListenAddr)
log.Fatal(ListenBare(config.ListenAddr)) log.Fatal(ListenBare(config.ListenAddr))
} }
} }
func readMaster() {
func readMaster(addr *net.TCPAddr) {
var err error var err error
defer func() { var cmd byte
for { for {
t := 5 + rand.Int63n(5) if masterConn, err = net.DialTCP("tcp", nil, addr); !MayBeError(err) {
log.Printf("reconnect to master %s after %d seconds", config.Master, t) reader := bufio.NewReader(masterConn)
time.Sleep(time.Duration(t) * time.Second)
addr, _ := net.ResolveTCPAddr("tcp", config.Master)
if masterConn, err = net.DialTCP("tcp", nil, addr); err == nil {
go readMaster()
return
}
}
}()
brw := bufio.NewReadWriter(bufio.NewReader(masterConn), bufio.NewWriter(masterConn))
log.Printf("connect to master %s reporting", config.Master) log.Printf("connect to master %s reporting", config.Master)
//首次报告 for report(); err == nil; {
if b, err := json.Marshal(Summary); err == nil { if cmd, err = reader.ReadByte(); !MayBeError(err) {
_, err = masterConn.Write(b)
}
for {
cmd, err := brw.ReadByte()
if err != nil {
return
}
switch cmd { switch cmd {
case MSG_SUMMARY: //收到主服务器指令,进行采集和上报 case MSG_SUMMARY: //收到主服务器指令,进行采集和上报
log.Println("receive summary request from master") log.Println("receive summary request from master")
if cmd, err = brw.ReadByte(); err != nil { if cmd, err = reader.ReadByte(); !MayBeError(err) {
return
}
if cmd == 1 { if cmd == 1 {
Summary.Add() Summary.Add()
go onReport() go onReport()
@@ -97,12 +78,15 @@ func readMaster() {
} }
} }
} }
}
}
}
t := 5 + rand.Int63n(5)
log.Printf("reconnect to master %s after %d seconds", config.Master, t)
time.Sleep(time.Duration(t) * time.Second)
}
} }
func report() {
//定时上报
func onReport() {
for range time.NewTicker(time.Second).C {
if Summary.Running() {
if b, err := json.Marshal(Summary); err == nil { if b, err := json.Marshal(Summary); err == nil {
data := make([]byte, len(b)+2) data := make([]byte, len(b)+2)
data[0] = MSG_SUMMARY data[0] = MSG_SUMMARY
@@ -110,21 +94,30 @@ func onReport() {
data[len(data)-1] = 0 data[len(data)-1] = 0
_, err = masterConn.Write(data) _, err = masterConn.Write(data)
} }
}
//定时上报
func onReport() {
for range time.NewTicker(time.Second).C {
if Summary.Running() {
report()
} else { } else {
return return
} }
} }
} }
func orderReport(conn io.Writer, start bool) {
//通知从服务器需要上报或者关闭上报
func onSummary(start bool) {
slaves.Range(func(k, v interface{}) bool {
conn := v.(*net.TCPConn)
b := []byte{MSG_SUMMARY, 0} b := []byte{MSG_SUMMARY, 0}
if start { if start {
b[1] = 1 b[1] = 1
} }
conn.Write(b) conn.Write(b)
}
//通知从服务器需要上报或者关闭上报
func onSummary(start bool) {
slaves.Range(func(k, v interface{}) bool {
orderReport(v.(*net.TCPConn), start)
return true return true
}) })
} }

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@@ -80,29 +81,32 @@ func process(conn net.Conn) {
if err != nil { if err != nil {
return return
} }
bytes = bytes[0 : len(bytes)-1]
switch cmd { switch cmd {
case MSG_SUBSCRIBE: case MSG_SUBSCRIBE:
if stream.Room != nil { if stream.Room != nil {
fmt.Printf("bare stream already exist from %s", conn.RemoteAddr()) fmt.Printf("bare stream already exist from %s", conn.RemoteAddr())
return return
} }
streamName := string(bytes[0 : len(bytes)-1]) go stream.Play(string(bytes))
go stream.Play(streamName)
case MSG_AUTH: case MSG_AUTH:
sign := strings.Split(string(bytes[0:len(bytes)-1]), ",") sign := strings.Split(string(bytes), ",")
head := []byte{MSG_AUTH, 2} head := []byte{MSG_AUTH, 2}
if len(sign) > 1 && AuthHooks.Trigger(sign[1]) == nil { if len(sign) > 1 && AuthHooks.Trigger(sign[1]) == nil {
head[1] = 1 head[1] = 1
} }
conn.Write(head) conn.Write(head)
conn.Write(bytes) conn.Write(bytes[0 : len(bytes)+1])
case MSG_SUMMARY: //收到从服务器发来报告,加入摘要中 case MSG_SUMMARY: //收到从服务器发来报告,加入摘要中
var summary *ServerSummary summary := &ServerSummary{}
if err = json.Unmarshal(bytes, summary); err == nil { if err = json.Unmarshal(bytes, summary); err == nil {
summary.Address = connAddr summary.Address = connAddr
Summary.Report(summary) Summary.Report(summary)
if _, ok := slaves.Load(connAddr); !ok { if _, ok := slaves.Load(connAddr); !ok {
slaves.Store(connAddr, conn) slaves.Store(connAddr, conn)
if Summary.Running() {
orderReport(io.Writer(conn), true)
}
defer slaves.Delete(connAddr) defer slaves.Delete(connAddr)
} }
} }

18
slave.toml Normal file
View File

@@ -0,0 +1,18 @@
# [Plugins.HDL]
# ListenAddr = ":2020"
# [Plugins.Jessica]
# ListenAddr = ":8080"
# [Plugins.RTMP]
# ListenAddr = ":1935"
# [Plugins.GateWay]
# ListenAddr = ":8081"
[Plugins.Cluster]
Master = "localhost:2019"
#ListenAddr = ":2019"
#
#[Plugins.Auth]
#Key="www.monibuca.com"
# [Plugins.RecordFlv]
# Path="./resource"
# [Plugins.QoS]
# Suffix = ["high","medium","low"]