集群采集信息功能完善

This commit is contained in:
langhuihui
2020-02-04 13:46:19 +08:00
parent a1bdc8528b
commit ab745145d9
15 changed files with 121 additions and 87 deletions

View File

@@ -7,8 +7,8 @@ ListenAddr = ":1935"
[Plugins.GateWay]
ListenAddr = ":8081"
[Plugins.Cluster]
Master = "203.60.1.23:2019"
#ListenAddr = ":2019"
#Master = "localhost:2019"
ListenAddr = ":2019"
#
#[Plugins.Auth]
#Key="www.monibuca.com"

View File

@@ -1 +1 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ea4656d8.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.af5e5ef3.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ea4656d8.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.af5e5ef3.js></script></body></html>
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ea4656d8.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.0267da57.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ea4656d8.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.0267da57.js></script></body></html>

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers();
this.initTextures();
};
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js')
this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this
function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output)
@@ -118,12 +118,15 @@ Jessibuca.prototype.playAudio = function (data) {
}
// setTimeout(playNextBuffer, buffer.duration * 1000)
}
var tryPlay = function (buffer) {
var decodeAudio = function () {
if (decodeQueue.length) {
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error);
context.decodeAudioData(decodeQueue.shift(), tryPlay, decodeAudio);
} else {
isDecoding = false
}
}
var tryPlay = function (buffer) {
decodeAudio()
if (isPlaying) {
audioBuffers.push(buffer);
} else {
@@ -134,7 +137,7 @@ Jessibuca.prototype.playAudio = function (data) {
decodeQueue.push(...data)
if (!isDecoding) {
isDecoding = true
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error);
decodeAudio()
}
}
this.playAudio = playAudio
@@ -452,7 +455,7 @@ Jessibuca.prototype.close = function () {
this.decoderWorker.postMessage({ cmd: "close" })
this.contextGL.clear(this.contextGL.COLOR_BUFFER_BIT);
}
Jessibuca.prototype.destroy = function(){
Jessibuca.prototype.destroy = function () {
this.decoderWorker.terminate()
}
Jessibuca.prototype.play = function (url) {

File diff suppressed because one or more lines are too long

1
dashboard/dist/js/app.0267da57.js.map vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers();
this.initTextures();
};
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js')
this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this
function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output)

View File

@@ -58,16 +58,15 @@ export default {
}
}
}
if (node.Children && node.Children.length > 0) {
for (let i = 0; i < node.Children.length; i++) {
let child = this.addServer(node.Children[i], nodes, edges);
if (node.Children) {
for (let childId in node.Children) {
this.addServer(node.Children[childId], nodes, edges);
edges.push({
source: result.id,
target: child.id
target: childId
});
}
}
return result;
}
},
watch: {

View File

@@ -9,6 +9,8 @@
>
<canvas id="canvas" width="488" height="275" style="background: black" />
<div slot="footer">
音频缓冲
<InputNumber v-model="audioBuffer" size="small"></InputNumber>
<Button v-if="audioEnabled" @click="turnOff" icon="md-volume-off" />
<Button v-else @click="turnOn" icon="md-volume-up"></Button>
</div>
@@ -22,18 +24,23 @@ export default {
data() {
return {
audioEnabled: false,
audioBuffer: 12,
url: ""
};
},
watch: {
audioEnabled(value) {
h5lc.audioEnabled(value);
},
audioBuffer(v) {
h5lc.audioBuffer = v;
}
},
mounted() {
h5lc = new window.Jessibuca({
canvas: document.getElementById("canvas"),
decoder: "jessibuca/ff.js"
decoder: "jessibuca/ff.js",
audioBuffer: this.audioBuffer
});
},
destroyed() {

BIN
monibuca.exe~ Normal file

Binary file not shown.

View File

@@ -8,8 +8,8 @@ func (h AuthHook) AddHook(hook func(string) error) {
AuthHooks = append(h, hook)
}
func (h AuthHook) Trigger(sign string) error {
for _, h := range h {
if err := h(sign); err != nil {
for _, f := range h {
if err := f(sign); err != nil {
return err
}
}
@@ -24,8 +24,8 @@ func (h OnPublishHook) AddHook(hook func(r *Room)) {
OnPublishHooks = append(h, hook)
}
func (h OnPublishHook) Trigger(r *Room) {
for _, h := range h {
h(r)
for _, f := range h {
f(r)
}
}
@@ -37,8 +37,8 @@ func (h OnSubscribeHook) AddHook(hook func(s *OutputStream)) {
OnSubscribeHooks = append(h, hook)
}
func (h OnSubscribeHook) Trigger(s *OutputStream) {
for _, h := range h {
h(s)
for _, f := range h {
f(s)
}
}
@@ -50,8 +50,8 @@ func (h OnDropHook) AddHook(hook func(s *OutputStream)) {
OnDropHooks = append(h, hook)
}
func (h OnDropHook) Trigger(s *OutputStream) {
for _, h := range h {
h(s)
for _, f := range h {
f(s)
}
}
@@ -63,7 +63,7 @@ func (h OnSummaryHook) AddHook(hook func(bool)) {
OnSummaryHooks = append(h, hook)
}
func (h OnSummaryHook) Trigger(v bool) {
for _, h := range h {
h(v)
for _, f := range h {
f(v)
}
}

View File

@@ -2,11 +2,12 @@ package monica
import (
"context"
"github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
"log"
"sync"
"time"
"github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
)
var (
@@ -154,11 +155,17 @@ func (r *Room) Run() {
}
}
func (r *Room) PushAudio(audio *pool.AVPacket) {
if len(audio.Payload) < 3 {
return
}
if audio.Payload[0] == 0xFF && (audio.Payload[1]&0xF0) == 0xF0 {
audio.IsADTS = true
r.AudioTag = audio
} else if r.AudioTag == nil {
audio.IsAACSequence = true
if len(audio.Payload) < 5 {
return
}
r.AudioTag = audio
tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息
if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息
@@ -200,6 +207,9 @@ func (r *Room) setH264Info(video *pool.AVPacket) {
}
}
func (r *Room) PushVideo(video *pool.AVPacket) {
if len(video.Payload) < 3 {
return
}
video.VideoFrameType = video.Payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
r.VideoInfo.CodecID = video.Payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC...
video.IsAVCSequence = video.VideoFrameType == 1 && video.Payload[1] == 0

View File

@@ -3,6 +3,7 @@ package cluster
import (
"bufio"
"encoding/json"
"io"
"log"
"math/rand"
"net"
@@ -46,56 +47,52 @@ func run() {
if MayBeError(err) {
return
}
masterConn, err = net.DialTCP("tcp", nil, addr)
if MayBeError(err) {
return
}
go readMaster()
go readMaster(addr)
}
if config.ListenAddr != "" {
Summary.Children = make(map[string]*ServerSummary)
OnSummaryHooks.AddHook(onSummary)
log.Printf("server bare start at %s", config.ListenAddr)
log.Fatal(ListenBare(config.ListenAddr))
}
}
func readMaster() {
func readMaster(addr *net.TCPAddr) {
var err error
defer func() {
for {
t := 5 + rand.Int63n(5)
log.Printf("reconnect to master %s after %d seconds", config.Master, t)
time.Sleep(time.Duration(t) * time.Second)
addr, _ := net.ResolveTCPAddr("tcp", config.Master)
if masterConn, err = net.DialTCP("tcp", nil, addr); err == nil {
go readMaster()
return
}
}
}()
brw := bufio.NewReadWriter(bufio.NewReader(masterConn), bufio.NewWriter(masterConn))
log.Printf("connect to master %s reporting", config.Master)
//首次报告
if b, err := json.Marshal(Summary); err == nil {
_, err = masterConn.Write(b)
}
var cmd byte
for {
cmd, err := brw.ReadByte()
if err != nil {
return
}
switch cmd {
case MSG_SUMMARY: //收到主服务器指令,进行采集和上报
log.Println("receive summary request from master")
if cmd, err = brw.ReadByte(); err != nil {
return
}
if cmd == 1 {
Summary.Add()
go onReport()
} else {
Summary.Done()
if masterConn, err = net.DialTCP("tcp", nil, addr); !MayBeError(err) {
reader := bufio.NewReader(masterConn)
log.Printf("connect to master %s reporting", config.Master)
for report(); err == nil; {
if cmd, err = reader.ReadByte(); !MayBeError(err) {
switch cmd {
case MSG_SUMMARY: //收到主服务器指令,进行采集和上报
log.Println("receive summary request from master")
if cmd, err = reader.ReadByte(); !MayBeError(err) {
if cmd == 1 {
Summary.Add()
go onReport()
} else {
Summary.Done()
}
}
}
}
}
}
t := 5 + rand.Int63n(5)
log.Printf("reconnect to master %s after %d seconds", config.Master, t)
time.Sleep(time.Duration(t) * time.Second)
}
}
func report() {
if b, err := json.Marshal(Summary); err == nil {
data := make([]byte, len(b)+2)
data[0] = MSG_SUMMARY
copy(data[1:], b)
data[len(data)-1] = 0
_, err = masterConn.Write(data)
}
}
@@ -103,28 +100,24 @@ func readMaster() {
func onReport() {
for range time.NewTicker(time.Second).C {
if Summary.Running() {
if b, err := json.Marshal(Summary); err == nil {
data := make([]byte, len(b)+2)
data[0] = MSG_SUMMARY
copy(data[1:], b)
data[len(data)-1] = 0
_, err = masterConn.Write(data)
}
report()
} else {
return
}
}
}
func orderReport(conn io.Writer, start bool) {
b := []byte{MSG_SUMMARY, 0}
if start {
b[1] = 1
}
conn.Write(b)
}
//通知从服务器需要上报或者关闭上报
func onSummary(start bool) {
slaves.Range(func(k, v interface{}) bool {
conn := v.(*net.TCPConn)
b := []byte{MSG_SUMMARY, 0}
if start {
b[1] = 1
}
conn.Write(b)
orderReport(v.(*net.TCPConn), start)
return true
})
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/binary"
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"strings"
@@ -80,29 +81,32 @@ func process(conn net.Conn) {
if err != nil {
return
}
bytes = bytes[0 : len(bytes)-1]
switch cmd {
case MSG_SUBSCRIBE:
if stream.Room != nil {
fmt.Printf("bare stream already exist from %s", conn.RemoteAddr())
return
}
streamName := string(bytes[0 : len(bytes)-1])
go stream.Play(streamName)
go stream.Play(string(bytes))
case MSG_AUTH:
sign := strings.Split(string(bytes[0:len(bytes)-1]), ",")
sign := strings.Split(string(bytes), ",")
head := []byte{MSG_AUTH, 2}
if len(sign) > 1 && AuthHooks.Trigger(sign[1]) == nil {
head[1] = 1
}
conn.Write(head)
conn.Write(bytes)
conn.Write(bytes[0 : len(bytes)+1])
case MSG_SUMMARY: //收到从服务器发来报告,加入摘要中
var summary *ServerSummary
summary := &ServerSummary{}
if err = json.Unmarshal(bytes, summary); err == nil {
summary.Address = connAddr
Summary.Report(summary)
if _, ok := slaves.Load(connAddr); !ok {
slaves.Store(connAddr, conn)
if Summary.Running() {
orderReport(io.Writer(conn), true)
}
defer slaves.Delete(connAddr)
}
}

18
slave.toml Normal file
View File

@@ -0,0 +1,18 @@
# [Plugins.HDL]
# ListenAddr = ":2020"
# [Plugins.Jessica]
# ListenAddr = ":8080"
# [Plugins.RTMP]
# ListenAddr = ":1935"
# [Plugins.GateWay]
# ListenAddr = ":8081"
[Plugins.Cluster]
Master = "localhost:2019"
#ListenAddr = ":2019"
#
#[Plugins.Auth]
#Key="www.monibuca.com"
# [Plugins.RecordFlv]
# Path="./resource"
# [Plugins.QoS]
# Suffix = ["high","medium","low"]