Compare commits

...

6 Commits

Author SHA1 Message Date
langhuihui
b9e19e75c8 增强对实例的控制 2020-02-13 10:43:32 +08:00
langhuihui
eac623639d 界面增加重启和升级 2020-02-11 21:59:31 +08:00
langhuihui
fea6e98ca7 小功能增加 2020-02-11 17:27:05 +08:00
langhuihui
649a5b558a 增加实例管理器 2020-02-11 14:53:29 +08:00
langhuihui
b3c8d35fad 修复集群传输bug 2020-02-05 17:29:55 +08:00
langhuihui
ab745145d9 集群采集信息功能完善 2020-02-04 16:09:55 +08:00
65 changed files with 14860 additions and 271 deletions

View File

@@ -7,8 +7,8 @@ ListenAddr = ":1935"
[Plugins.GateWay] [Plugins.GateWay]
ListenAddr = ":8081" ListenAddr = ":8081"
[Plugins.Cluster] [Plugins.Cluster]
Master = "203.60.1.23:2019" #Master = "localhost:2019"
#ListenAddr = ":2019" ListenAddr = ":2019"
# #
#[Plugins.Auth] #[Plugins.Auth]
#Key="www.monibuca.com" #Key="www.monibuca.com"

View File

@@ -1 +1 @@
#app,body,html{height:100%}#app{font-family:Avenir,Helvetica,Arial,sans-serif;-webkit-font-smoothing:antialiased;-moz-osx-font-smoothing:grayscale;text-align:center;color:#184c18;position:relative}#app>div:first-child{position:absolute;top:10px;left:30px;font-size:x-large}.content{padding-top:60px}.feature-title[data-v-54efad41]{color:#eb5e46;font-weight:700;font-size:larger}p[data-v-54efad41]{margin:30px;font-size:20px}img[data-v-54efad41]{margin:20px}.root[data-v-e34eab40]{background:#d3d3d3}.root>img[data-v-e34eab40]{width:300px;margin:30px}.records[data-v-4eee1624]{display:-webkit-box;display:-ms-flexbox;display:flex;-ms-flex-wrap:wrap;flex-wrap:wrap;padding:0 15px}.records>[data-v-4eee1624]{width:200px}.log-container{overflow-y:auto;max-height:500px}@-webkit-keyframes recording-data-v-f6113870{0%{opacity:.2}50%{opacity:1}to{opacity:.2}}@keyframes recording-data-v-f6113870{0%{opacity:.2}50%{opacity:1}to{opacity:.2}}.recording[data-v-f6113870]{-webkit-animation:recording-data-v-f6113870 1s infinite;animation:recording-data-v-f6113870 1s infinite}.layout[data-v-f6113870]{padding-bottom:30px;display:-webkit-box;display:-ms-flexbox;display:flex;-ms-flex-wrap:wrap;flex-wrap:wrap}.room[data-v-f6113870]{width:250px;margin:10px;text-align:left}.empty[data-v-f6113870]{color:#eb5e46;width:100%;min-height:500px;-webkit-box-pack:center;-ms-flex-pack:center;justify-content:center;-webkit-box-align:center;-ms-flex-align:center;align-items:center}.empty[data-v-f6113870],.status[data-v-f6113870]{display:-webkit-box;display:-ms-flexbox;display:flex}.status[data-v-f6113870]{position:fixed;left:5px;bottom:10px}.status>div[data-v-f6113870]{margin:0 5px} #app,body,html{height:100%}#app{font-family:Avenir,Helvetica,Arial,sans-serif;-webkit-font-smoothing:antialiased;-moz-osx-font-smoothing:grayscale;text-align:center;color:#184c18;position:relative}#app>div:first-child{position:absolute;top:10px;left:30px;font-size:x-large}.content{padding-top:60px}.feature-title[data-v-54efad41]{color:#eb5e46;font-weight:700;font-size:larger}p[data-v-54efad41]{margin:30px;font-size:20px}img[data-v-54efad41]{margin:20px}.root[data-v-e34eab40]{background:#d3d3d3}.root>img[data-v-e34eab40]{width:300px;margin:30px}.records[data-v-7d5ab110]{display:-webkit-box;display:-ms-flexbox;display:flex;-ms-flex-wrap:wrap;flex-wrap:wrap;padding:0 15px}.records>[data-v-7d5ab110]{width:200px}.log-container{overflow-y:auto;max-height:500px}@-webkit-keyframes recording-data-v-65ac4b48{0%{opacity:.2}50%{opacity:1}to{opacity:.2}}@keyframes recording-data-v-65ac4b48{0%{opacity:.2}50%{opacity:1}to{opacity:.2}}.recording[data-v-65ac4b48]{-webkit-animation:recording-data-v-65ac4b48 1s infinite;animation:recording-data-v-65ac4b48 1s infinite}.layout[data-v-65ac4b48]{padding-bottom:30px;display:-webkit-box;display:-ms-flexbox;display:flex;-ms-flex-wrap:wrap;flex-wrap:wrap}.room[data-v-65ac4b48]{width:250px;margin:10px;text-align:left}.empty[data-v-65ac4b48]{color:#eb5e46;width:100%;min-height:500px;-webkit-box-pack:center;-ms-flex-pack:center;justify-content:center;-webkit-box-align:center;-ms-flex-align:center;align-items:center}.empty[data-v-65ac4b48],.status[data-v-65ac4b48]{display:-webkit-box;display:-ms-flexbox;display:flex}.status[data-v-65ac4b48]{position:fixed;left:5px;bottom:10px}.status>div[data-v-65ac4b48]{margin:0 5px}

View File

@@ -1 +1 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ea4656d8.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.af5e5ef3.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ea4656d8.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.af5e5ef3.js></script></body></html> <!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca</title><script src=jessibuca/ajax.js></script><script src=jessibuca/renderer.js></script><link href=/css/app.ce470878.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.017fb959.js rel=preload as=script><link href=/js/chunk-vendors.ebc28a73.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.ce470878.css rel=stylesheet></head><body><noscript><strong>We're sorry but dashboard doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.ebc28a73.js></script><script src=/js/app.017fb959.js></script></body></html>

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers(); this.initBuffers();
this.initTextures(); this.initTextures();
}; };
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js') this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this var _this = this
function draw(output) { function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output) _this.drawNextOutputPicture(_this.width, _this.height, null, output)
@@ -118,12 +118,15 @@ Jessibuca.prototype.playAudio = function (data) {
} }
// setTimeout(playNextBuffer, buffer.duration * 1000) // setTimeout(playNextBuffer, buffer.duration * 1000)
} }
var tryPlay = function (buffer) { var decodeAudio = function () {
if (decodeQueue.length) { if (decodeQueue.length) {
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error); context.decodeAudioData(decodeQueue.shift(), tryPlay, decodeAudio);
} else { } else {
isDecoding = false isDecoding = false
} }
}
var tryPlay = function (buffer) {
decodeAudio()
if (isPlaying) { if (isPlaying) {
audioBuffers.push(buffer); audioBuffers.push(buffer);
} else { } else {
@@ -134,7 +137,7 @@ Jessibuca.prototype.playAudio = function (data) {
decodeQueue.push(...data) decodeQueue.push(...data)
if (!isDecoding) { if (!isDecoding) {
isDecoding = true isDecoding = true
context.decodeAudioData(decodeQueue.shift(), tryPlay, console.error); decodeAudio()
} }
} }
this.playAudio = playAudio this.playAudio = playAudio
@@ -452,7 +455,7 @@ Jessibuca.prototype.close = function () {
this.decoderWorker.postMessage({ cmd: "close" }) this.decoderWorker.postMessage({ cmd: "close" })
this.contextGL.clear(this.contextGL.COLOR_BUFFER_BIT); this.contextGL.clear(this.contextGL.COLOR_BUFFER_BIT);
} }
Jessibuca.prototype.destroy = function(){ Jessibuca.prototype.destroy = function () {
this.decoderWorker.terminate() this.decoderWorker.terminate()
} }
Jessibuca.prototype.play = function (url) { Jessibuca.prototype.play = function (url) {

2
dashboard/dist/js/app.017fb959.js vendored Normal file

File diff suppressed because one or more lines are too long

1
dashboard/dist/js/app.017fb959.js.map vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -27,7 +27,9 @@ func main() {
该配置文件主要是为了定制各个插件的配置,例如监听端口号等,具体还是要看各个插件的设计。 该配置文件主要是为了定制各个插件的配置,例如监听端口号等,具体还是要看各个插件的设计。
> 如果你编写了自己的插件,就必须在该配置文件中写入对自己插件的配置信息 ::: tip
如果你编写了自己的插件,就必须在该配置文件中写入对自己插件的配置信息
:::
如果注释掉部分插件的配置,那么该插件就不会启用,典型的配置如下: 如果注释掉部分插件的配置,那么该插件就不会启用,典型的配置如下:
```toml ```toml

View File

@@ -9,7 +9,7 @@ function Jessibuca(opt) {
this.initBuffers(); this.initBuffers();
this.initTextures(); this.initTextures();
}; };
this.decoderWorker = new Worker(opt.decoder || '264_mp3.js') this.decoderWorker = new Worker(opt.decoder || 'ff.js')
var _this = this var _this = this
function draw(output) { function draw(output) {
_this.drawNextOutputPicture(_this.width, _this.height, null, output) _this.drawNextOutputPicture(_this.width, _this.height, null, output)

View File

@@ -1,5 +1,9 @@
<template> <template>
<div id="mountNode"></div> <div>
自动更新
<i-switch v-model="autoUpdate"></i-switch>
<div id="mountNode"></div>
</div>
</template> </template>
<script> <script>
@@ -7,24 +11,22 @@ import { mapState } from "vuex";
import G6 from "@antv/g6"; import G6 from "@antv/g6";
var graph = null; var graph = null;
export default { export default {
data() {
return {
autoUpdate: true
};
},
computed: { computed: {
...mapState({ ...mapState({
data(state) { data(state) {
let summary = state.summary; let d = this.addServer(state.summary);
// 点集 d.label = "🏠" + d.label;
let nodes = []; return d;
// 边集
let edges = [];
this.addServer(summary, nodes, edges);
return {
nodes,
edges
};
} }
}) })
}, },
methods: { methods: {
addServer(node, nodes, edges) { addServer(node) {
let result = { let result = {
id: node.Address, id: node.Address,
label: node.Address, label: node.Address,
@@ -33,38 +35,35 @@ export default {
shape: "modelRect", shape: "modelRect",
logoIcon: { logoIcon: {
show: false show: false
} },
children: []
}; };
nodes.push(result);
if (node.Rooms) { if (node.Rooms) {
for (let i = 0; i < node.Rooms.length; i++) { for (let i = 0; i < node.Rooms.length; i++) {
let room = node.Rooms[i]; let room = node.Rooms[i];
let roomId = result.id + room.StreamPath; let roomId = room.StreamPath;
nodes.push({ let roomData = {
id: roomId, id: roomId,
label: room.StreamPath, label: room.StreamPath,
shape: "rect" shape: "rect",
}); children: []
edges.push({ source: result.id, target: roomId }); };
result.children.push(roomData);
if (room.SubscriberInfo) { if (room.SubscriberInfo) {
for (let j = 0; j < room.SubscriberInfo.length; j++) { for (let j = 0; j < room.SubscriberInfo.length; j++) {
let subId = roomId + room.SubscriberInfo[j].ID; let subId = roomId + room.SubscriberInfo[j].ID;
nodes.push({ roomData.children.push({
id: subId, id: subId,
label: room.SubscriberInfo[j].ID label: room.SubscriberInfo[j].ID
}); });
edges.push({ source: roomId, target: subId });
} }
} }
} }
} }
if (node.Children && node.Children.length > 0) { if (node.Children) {
for (let i = 0; i < node.Children.length; i++) { for (let childId in node.Children) {
let child = this.addServer(node.Children[i], nodes, edges); result.children.push(this.addServer(node.Children[childId]));
edges.push({
source: result.id,
target: child.id
});
} }
} }
return result; return result;
@@ -72,23 +71,33 @@ export default {
}, },
watch: { watch: {
data(v) { data(v) {
if (graph) { if (graph && this.autoUpdate) {
graph.read(v); // 加载数据 //graph.updateChild(v, "");
graph.changeData(v); // 加载数据
graph.fitView();
//graph.read(v);
} }
} }
}, },
mounted() { mounted() {
graph = new G6.Graph({ graph = new G6.TreeGraph({
renderer: "svg", linkCenter: true,
// renderer: "svg",
container: "mountNode", // 指定挂载容器 container: "mountNode", // 指定挂载容器
width: 800, // 图的宽度 width: 800, // 图的宽度
height: 500, // 图的高度 height: 500, // 图的高度
layout: { modes: {
type: "radial" default: ["drag-canvas", "zoom-canvas", "click-select", "drag-node"]
}, },
defaultNode: {} animate: false,
layout: {
// type: "indeted",
direction: "H"
}
}); });
//graph.addChild(this.data, "");
graph.read(this.data); // 加载数据 graph.read(this.data); // 加载数据
graph.fitView();
} }
}; };
</script> </script>

View File

@@ -9,6 +9,8 @@
> >
<canvas id="canvas" width="488" height="275" style="background: black" /> <canvas id="canvas" width="488" height="275" style="background: black" />
<div slot="footer"> <div slot="footer">
音频缓冲
<InputNumber v-model="audioBuffer" size="small"></InputNumber>
<Button v-if="audioEnabled" @click="turnOff" icon="md-volume-off" /> <Button v-if="audioEnabled" @click="turnOff" icon="md-volume-off" />
<Button v-else @click="turnOn" icon="md-volume-up"></Button> <Button v-else @click="turnOn" icon="md-volume-up"></Button>
</div> </div>
@@ -22,18 +24,23 @@ export default {
data() { data() {
return { return {
audioEnabled: false, audioEnabled: false,
audioBuffer: 12,
url: "" url: ""
}; };
}, },
watch: { watch: {
audioEnabled(value) { audioEnabled(value) {
h5lc.audioEnabled(value); h5lc.audioEnabled(value);
},
audioBuffer(v) {
h5lc.audioBuffer = v;
} }
}, },
mounted() { mounted() {
h5lc = new window.Jessibuca({ h5lc = new window.Jessibuca({
canvas: document.getElementById("canvas"), canvas: document.getElementById("canvas"),
decoder: "jessibuca/ff.js" decoder: "jessibuca/ff.js",
audioBuffer: this.audioBuffer
}); });
}, },
destroyed() { destroyed() {

View File

@@ -33,7 +33,7 @@ export default {
x => { x => {
if (x == "success") { if (x == "success") {
this.onVisible(true); this.onVisible(true);
this.$Message.success("删除成功"); this.$Message.success("开始发布");
} else { } else {
this.$Message.error(x); this.$Message.error(x);
} }
@@ -50,7 +50,7 @@ export default {
{ streamPath: item.Path.replace(".flv", "") }, { streamPath: item.Path.replace(".flv", "") },
x => { x => {
if (x == "success") { if (x == "success") {
this.$Message.success("开始发布"); this.$Message.success("删除成功");
} else { } else {
this.$Message.error(x); this.$Message.error(x);
} }

View File

@@ -118,6 +118,7 @@ export default {
currentTab: "", currentTab: "",
currentStream: [], currentStream: [],
typeMap: { typeMap: {
Receiver: "📡",
FlvFile: "🎥", FlvFile: "🎥",
TS: "🎬", TS: "🎬",
HLS: "🍎", HLS: "🍎",

297
main.go
View File

@@ -1,17 +1,298 @@
package main package main
import ( import (
"bytes"
"encoding/json"
"errors"
"flag" "flag"
. "github.com/langhuihui/monibuca/monica" "fmt"
_ "github.com/langhuihui/monibuca/plugins" "io/ioutil"
"log" "log"
"mime"
"net/http"
"os" "os"
"os/exec"
"os/user"
"path"
"runtime"
"strings"
"github.com/BurntSushi/toml"
. "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/util"
) )
func main() { type InstanceDesc struct {
log.SetOutput(os.Stdout) Name string
configPath := flag.String("c", "config.toml", "configFile") Path string
flag.Parse() Plugins []string
Run(*configPath) Config string
select {} }
var instances = make(map[string]*InstanceDesc)
var instancesDir string
func main() {
// log.SetOutput(os.Stdout)
// configPath := flag.String("c", "config.toml", "configFile")
// flag.Parse()
// Run(*configPath)
// select {}
println("start monibuca instance manager version:", Version)
if MayBeError(readInstances()) {
return
}
addr := flag.String("port", "8000", "http server port")
flag.Parse()
http.HandleFunc("/instance/list", listInstance)
http.HandleFunc("/instance/create", initInstance)
http.HandleFunc("/instance/restart", restartInstance)
http.HandleFunc("/instance/shutdown", shutdownInstance)
http.HandleFunc("/", website)
fmt.Printf("start listen at %s", *addr)
if err := http.ListenAndServe(":"+*addr, nil); err != nil {
log.Fatal(err)
}
}
func readInstances() error {
if homeDir, err := Home(); err == nil {
instancesDir = path.Join(homeDir, ".monibuca")
if err = os.MkdirAll(instancesDir, os.FileMode(0666)); err == nil {
if f, err := os.Open(instancesDir); err != nil {
return err
} else if cs, err := f.Readdir(0); err != nil {
return err
} else {
for _, configFile := range cs {
des := new(InstanceDesc)
if _, err = toml.DecodeFile(path.Join(instancesDir, configFile.Name()), des); err == nil {
instances[des.Name] = des
} else {
log.Println(err)
}
}
return nil
}
} else {
return err
}
} else {
return err
}
}
func website(w http.ResponseWriter, r *http.Request) {
filePath := r.URL.Path
if filePath == "/" {
filePath = "/index.html"
}
if mime := mime.TypeByExtension(path.Ext(filePath)); mime != "" {
w.Header().Set("Content-Type", mime)
}
_, currentFilePath, _, _ := runtime.Caller(0)
if f, err := ioutil.ReadFile(path.Join(path.Dir(currentFilePath), "pm/dist", filePath)); err == nil {
if _, err = w.Write(f); err != nil {
w.WriteHeader(505)
}
} else {
w.Header().Set("Location", "/")
w.WriteHeader(302)
}
}
func listInstance(w http.ResponseWriter, r *http.Request) {
if bytes, err := json.Marshal(instances); err == nil {
_, err = w.Write(bytes)
} else {
w.Write([]byte(err.Error()))
}
}
func initInstance(w http.ResponseWriter, r *http.Request) {
instanceDesc := new(InstanceDesc)
sse := util.NewSSE(w, r.Context())
err := json.Unmarshal([]byte(r.URL.Query().Get("info")), instanceDesc)
clearDir := r.URL.Query().Get("clear") != ""
defer func() {
if err != nil {
sse.WriteEvent("exception", []byte(err.Error()))
} else {
sse.Write([]byte("success"))
}
}()
if err != nil {
return
}
sse.WriteEvent("step", []byte("1:参数解析成功!"))
err = instanceDesc.createDir(sse, clearDir)
if err != nil {
return
}
sse.WriteEvent("step", []byte("6:实例创建成功!"))
var file *os.File
file, err = os.OpenFile(path.Join(instancesDir, instanceDesc.Name+".toml"), os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0666)
if err != nil {
return
}
tomlEncoder := toml.NewEncoder(file)
err = tomlEncoder.Encode(&instanceDesc)
if err != nil {
return
}
instances[instanceDesc.Name] = instanceDesc
}
func shutdownInstance(w http.ResponseWriter, r *http.Request) {
instanceName := r.URL.Query().Get("instance")
if instance, ok := instances[instanceName]; ok {
if err := instance.command("kill", "-9", "`cat pid`").Run(); err == nil {
w.Write([]byte("success"))
} else {
w.Write([]byte(err.Error()))
}
} else {
w.Write([]byte("no such instance"))
}
}
func restartInstance(w http.ResponseWriter, r *http.Request) {
sse := util.NewSSE(w, r.Context())
instanceName := r.URL.Query().Get("instance")
needUpdate := r.URL.Query().Get("update") != ""
needBuild := r.URL.Query().Get("build") != ""
if instance, ok := instances[instanceName]; ok {
if needUpdate {
if err := instance.writeExecSSE(sse, exec.Command("go", "get", "-u")); err != nil {
sse.WriteEvent("failed", []byte(err.Error()))
return
}
}
if needBuild {
if err := instance.writeExecSSE(sse, exec.Command("go", "build")); err != nil {
sse.WriteEvent("failed", []byte(err.Error()))
return
}
}
if err := instance.writeExecSSE(sse, exec.Command("sh", "restart.sh")); err != nil {
sse.WriteEvent("failed", []byte(err.Error()))
return
}
sse.Write([]byte("success"))
} else {
sse.WriteEvent("failed", []byte("no such instance"))
}
}
func (p *InstanceDesc) writeExecSSE(sse *util.SSE, cmd *exec.Cmd) error {
cmd.Dir = p.Path
return sse.WriteExec(cmd)
}
func (p *InstanceDesc) command(name string, args ...string) (cmd *exec.Cmd) {
cmd = exec.Command(name, args...)
cmd.Dir = p.Path
return
}
func (p *InstanceDesc) createDir(sse *util.SSE, clearDir bool) (err error) {
if clearDir {
os.RemoveAll(p.Path)
}
err = os.MkdirAll(p.Path, 0666)
if err != nil {
return
}
sse.WriteEvent("step", []byte("2:目录创建成功!"))
err = ioutil.WriteFile(path.Join(p.Path, "config.toml"), []byte(p.Config), 0666)
if err != nil {
return
}
var build bytes.Buffer
build.WriteString(`package main
import(
"github.com/langhuihui/monibuca/monica"`)
for _, plugin := range p.Plugins {
build.WriteString("\n_ \"")
build.WriteString(plugin)
build.WriteString("\"")
}
build.WriteString("\n)\n")
build.WriteString(`
func main(){
monica.Run("config.toml")
select{}
}
`)
err = ioutil.WriteFile(path.Join(p.Path, "main.go"), build.Bytes(), 0666)
if err != nil {
return
}
sse.WriteEvent("step", []byte("3:文件创建成功!"))
err = p.writeExecSSE(sse, exec.Command("go", "mod", "init", p.Name))
if err != nil {
return
}
sse.WriteEvent("step", []byte("4:go mod 初始化完成!"))
err = p.writeExecSSE(sse, exec.Command("go", "build"))
if err != nil {
return
}
sse.WriteEvent("step", []byte("5:go build 成功!"))
build.Reset()
build.WriteString("kill -9 `cat pid`\nnohup ./")
binFile := strings.TrimSuffix(p.Path, "/")
_, binFile = path.Split(binFile)
build.WriteString(binFile)
build.WriteString(" > log.txt & echo $! > pid\n")
err = ioutil.WriteFile(path.Join(p.Path, "restart.sh"), build.Bytes(), 0777)
if err != nil {
return
}
return p.writeExecSSE(sse, exec.Command("sh", "restart.sh"))
}
func Home() (string, error) {
user, err := user.Current()
if nil == err {
return user.HomeDir, nil
}
// cross compile support
if "windows" == runtime.GOOS {
return homeWindows()
}
// Unix-like system, so just assume Unix
return homeUnix()
}
func homeUnix() (string, error) {
// First prefer the HOME environmental variable
if home := os.Getenv("HOME"); home != "" {
return home, nil
}
// If that fails, try the shell
var stdout bytes.Buffer
cmd := exec.Command("sh", "-c", "eval echo ~$USER")
cmd.Stdout = &stdout
if err := cmd.Run(); err != nil {
return "", err
}
result := strings.TrimSpace(stdout.String())
if result == "" {
return "", errors.New("blank output when reading home directory")
}
return result, nil
}
func homeWindows() (string, error) {
drive := os.Getenv("HOMEDRIVE")
path := os.Getenv("HOMEPATH")
home := drive + path
if drive == "" || path == "" {
home = os.Getenv("USERPROFILE")
}
if home == "" {
return "", errors.New("HOMEDRIVE, HOMEPATH, and USERPROFILE are blank")
}
return home, nil
} }

View File

@@ -1,6 +1,7 @@
package pool package avformat
import ( import (
"github.com/langhuihui/monibuca/monica/pool"
"sync" "sync"
) )
@@ -33,13 +34,25 @@ type AVPacket struct {
func (av *AVPacket) IsKeyFrame() bool { func (av *AVPacket) IsKeyFrame() bool {
return av.VideoFrameType == 1 || av.VideoFrameType == 4 return av.VideoFrameType == 1 || av.VideoFrameType == 4
} }
func (av *AVPacket) ADTS2ASC() (tagPacket *AVPacket) {
tagPacket = NewAVPacket(FLV_TAG_TYPE_AUDIO)
tagPacket.Payload = ADTSToAudioSpecificConfig(av.Payload)
tagPacket.IsAACSequence = true
ADTSLength := 7 + (int(av.Payload[1]&1) << 1)
if len(av.Payload) > ADTSLength {
av.Payload[0] = 0xAF
av.Payload[1] = 0x01 //raw AAC
copy(av.Payload[2:], av.Payload[ADTSLength:])
av.Payload = av.Payload[:(len(av.Payload) - ADTSLength + 2)]
}
return
}
func (av *AVPacket) Recycle() { func (av *AVPacket) Recycle() {
if av.RefCount == 0 { if av.RefCount == 0 {
return return
} else if av.RefCount == 1 { } else if av.RefCount == 1 {
av.RefCount = 0 av.RefCount = 0
RecycleSlice(av.Payload) pool.RecycleSlice(av.Payload)
AVPacketPool.Put(av) AVPacketPool.Put(av)
} else { } else {
av.RefCount-- av.RefCount--

View File

@@ -70,7 +70,7 @@ var (
var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0} var FLVHeader = []byte{0x46, 0x4c, 0x56, 0x01, 0x05, 0, 0, 0, 9, 0, 0, 0, 0}
func WriteFLVTag(w io.Writer, tag *pool.SendPacket) (err error) { func WriteFLVTag(w io.Writer, tag *SendPacket) (err error) {
head := pool.GetSlice(11) head := pool.GetSlice(11)
defer pool.RecycleSlice(head) defer pool.RecycleSlice(head)
tail := pool.GetSlice(4) tail := pool.GetSlice(4)
@@ -93,13 +93,13 @@ func WriteFLVTag(w io.Writer, tag *pool.SendPacket) (err error) {
} }
return return
} }
func ReadFLVTag(r io.Reader) (tag *pool.AVPacket, err error) { func ReadFLVTag(r io.Reader) (tag *AVPacket, err error) {
head := pool.GetSlice(11) head := pool.GetSlice(11)
defer pool.RecycleSlice(head) defer pool.RecycleSlice(head)
if _, err = io.ReadFull(r, head); err != nil { if _, err = io.ReadFull(r, head); err != nil {
return return
} }
tag = pool.NewAVPacket(head[0]) tag = NewAVPacket(head[0])
dataSize := util.BigEndian.Uint24(head[1:]) dataSize := util.BigEndian.Uint24(head[1:])
tag.Timestamp = util.BigEndian.Uint24(head[4:]) tag.Timestamp = util.BigEndian.Uint24(head[4:])
body := pool.GetSlice(int(dataSize)) body := pool.GetSlice(int(dataSize))

View File

@@ -8,8 +8,8 @@ func (h AuthHook) AddHook(hook func(string) error) {
AuthHooks = append(h, hook) AuthHooks = append(h, hook)
} }
func (h AuthHook) Trigger(sign string) error { func (h AuthHook) Trigger(sign string) error {
for _, h := range h { for _, f := range h {
if err := h(sign); err != nil { if err := f(sign); err != nil {
return err return err
} }
} }
@@ -24,8 +24,8 @@ func (h OnPublishHook) AddHook(hook func(r *Room)) {
OnPublishHooks = append(h, hook) OnPublishHooks = append(h, hook)
} }
func (h OnPublishHook) Trigger(r *Room) { func (h OnPublishHook) Trigger(r *Room) {
for _, h := range h { for _, f := range h {
h(r) f(r)
} }
} }
@@ -37,8 +37,8 @@ func (h OnSubscribeHook) AddHook(hook func(s *OutputStream)) {
OnSubscribeHooks = append(h, hook) OnSubscribeHooks = append(h, hook)
} }
func (h OnSubscribeHook) Trigger(s *OutputStream) { func (h OnSubscribeHook) Trigger(s *OutputStream) {
for _, h := range h { for _, f := range h {
h(s) f(s)
} }
} }
@@ -50,8 +50,8 @@ func (h OnDropHook) AddHook(hook func(s *OutputStream)) {
OnDropHooks = append(h, hook) OnDropHooks = append(h, hook)
} }
func (h OnDropHook) Trigger(s *OutputStream) { func (h OnDropHook) Trigger(s *OutputStream) {
for _, h := range h { for _, f := range h {
h(s) f(s)
} }
} }
@@ -63,7 +63,7 @@ func (h OnSummaryHook) AddHook(hook func(bool)) {
OnSummaryHooks = append(h, hook) OnSummaryHooks = append(h, hook)
} }
func (h OnSummaryHook) Trigger(v bool) { func (h OnSummaryHook) Trigger(v bool) {
for _, h := range h { for _, f := range h {
h(v) f(v)
} }
} }

View File

@@ -2,15 +2,24 @@ package monica
import ( import (
"encoding/json" "encoding/json"
"github.com/BurntSushi/toml"
"io/ioutil" "io/ioutil"
"log" "log"
"time"
"github.com/BurntSushi/toml"
) )
var ConfigRaw []byte var ConfigRaw []byte
var Version = "0.2.3"
var EngineInfo = &struct {
Version string
StartTime time.Time
}{Version, time.Now()}
func Run(configFile string) (err error) { func Run(configFile string) (err error) {
log.Printf("start monibuca version:%s", Version)
if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil { if ConfigRaw, err = ioutil.ReadFile(configFile); err != nil {
log.Printf("read config file error: %v", err)
return return
} }
go Summary.StartSummary() go Summary.StartSummary()
@@ -29,6 +38,8 @@ func Run(configFile string) (err error) {
go config.Run() go config.Run()
} }
} }
} else {
log.Printf("decode config file error: %v", err)
} }
return return
} }

View File

@@ -2,11 +2,11 @@ package monica
import ( import (
"context" "context"
"github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
"log" "log"
"sync" "sync"
"time" "time"
"github.com/langhuihui/monibuca/monica/avformat"
) )
var ( var (
@@ -22,8 +22,8 @@ func (c *Collection) Get(name string) (result *Room) {
item, loaded := AllRoom.LoadOrStore(name, &Room{ item, loaded := AllRoom.LoadOrStore(name, &Room{
Subscribers: make(map[string]*OutputStream), Subscribers: make(map[string]*OutputStream),
Control: make(chan interface{}), Control: make(chan interface{}),
VideoChan: make(chan *pool.AVPacket, 1), VideoChan: make(chan *avformat.AVPacket, 1),
AudioChan: make(chan *pool.AVPacket, 1), AudioChan: make(chan *avformat.AVPacket, 1),
}) })
result = item.(*Room) result = item.(*Room)
if !loaded { if !loaded {
@@ -41,11 +41,11 @@ type Room struct {
Control chan interface{} Control chan interface{}
Cancel context.CancelFunc Cancel context.CancelFunc
Subscribers map[string]*OutputStream // 订阅者 Subscribers map[string]*OutputStream // 订阅者
VideoTag *pool.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据 VideoTag *avformat.AVPacket // 每个视频包都是这样的结构,区别在于Payload的大小.FMS在发送AVC sequence header,需要加上 VideoTags,这个tag 1个字节(8bits)的数据
AudioTag *pool.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据 AudioTag *avformat.AVPacket // 每个音频包都是这样的结构,区别在于Payload的大小.FMS在发送AAC sequence header,需要加上 AudioTags,这个tag 1个字节(8bits)的数据
FirstScreen []*pool.AVPacket FirstScreen []*avformat.AVPacket
AudioChan chan *pool.AVPacket AudioChan chan *avformat.AVPacket
VideoChan chan *pool.AVPacket VideoChan chan *avformat.AVPacket
UseTimestamp bool //是否采用数据包中的时间戳 UseTimestamp bool //是否采用数据包中的时间戳
} }
@@ -90,7 +90,7 @@ func (r *Room) Subscribe(s *OutputStream) {
if r.Err() == nil { if r.Err() == nil {
s.SubscribeTime = time.Now() s.SubscribeTime = time.Now()
log.Printf("subscribe :%s %s,to room %s", s.Type, s.ID, r.StreamPath) log.Printf("subscribe :%s %s,to room %s", s.Type, s.ID, r.StreamPath)
s.packetQueue = make(chan *pool.SendPacket, 1024) s.packetQueue = make(chan *avformat.SendPacket, 1024)
s.Context, s.Cancel = context.WithCancel(r) s.Context, s.Cancel = context.WithCancel(r)
s.Control <- &SubscribeCmd{s} s.Control <- &SubscribeCmd{s}
} }
@@ -153,12 +153,21 @@ func (r *Room) Run() {
} }
} }
} }
func (r *Room) PushAudio(audio *pool.AVPacket) { func (r *Room) PushAudio(audio *avformat.AVPacket) {
if len(audio.Payload) < 4 {
return
}
if audio.Payload[0] == 0xFF && (audio.Payload[1]&0xF0) == 0xF0 { if audio.Payload[0] == 0xFF && (audio.Payload[1]&0xF0) == 0xF0 {
audio.IsADTS = true //audio.IsADTS = true
r.AudioTag = audio r.AudioInfo.SoundFormat = 10
r.AudioInfo.SoundRate = avformat.SamplingFrequencies[(audio.Payload[2]&0x3c)>>2]
r.AudioInfo.SoundType = ((audio.Payload[2] & 0x1) << 2) | ((audio.Payload[3] & 0xc0) >> 6)
r.AudioTag = audio.ADTS2ASC()
} else if r.AudioTag == nil { } else if r.AudioTag == nil {
audio.IsAACSequence = true audio.IsAACSequence = true
if len(audio.Payload) < 5 {
return
}
r.AudioTag = audio r.AudioTag = audio
tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息 tmp := audio.Payload[0] // 第一个字节保存着音频的相关信息
if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息 if r.AudioInfo.SoundFormat = tmp >> 4; r.AudioInfo.SoundFormat == 10 { //真的是AAC的话后面有一个字节的详细信息
@@ -191,7 +200,7 @@ func (r *Room) PushAudio(audio *pool.AVPacket) {
r.AudioInfo.PacketCount++ r.AudioInfo.PacketCount++
r.AudioChan <- audio r.AudioChan <- audio
} }
func (r *Room) setH264Info(video *pool.AVPacket) { func (r *Room) setH264Info(video *avformat.AVPacket) {
r.VideoTag = video r.VideoTag = video
info := avformat.AVCDecoderConfigurationRecord{} info := avformat.AVCDecoderConfigurationRecord{}
//0:codec,1:IsAVCSequence,2~4:compositionTime //0:codec,1:IsAVCSequence,2~4:compositionTime
@@ -199,7 +208,10 @@ func (r *Room) setH264Info(video *pool.AVPacket) {
r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit) r.VideoInfo.SPSInfo, err = avformat.ParseSPS(info.SequenceParameterSetNALUnit)
} }
} }
func (r *Room) PushVideo(video *pool.AVPacket) { func (r *Room) PushVideo(video *avformat.AVPacket) {
if len(video.Payload) < 3 {
return
}
video.VideoFrameType = video.Payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2 video.VideoFrameType = video.Payload[0] >> 4 // 帧类型 4Bit, H264一般为1或者2
r.VideoInfo.CodecID = video.Payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC... r.VideoInfo.CodecID = video.Payload[0] & 0x0f // 编码类型ID 4Bit, JPEG, H263, AVC...
video.IsAVCSequence = video.VideoFrameType == 1 && video.Payload[1] == 0 video.IsAVCSequence = video.VideoFrameType == 1 && video.Payload[1] == 0

View File

@@ -3,12 +3,12 @@ package monica
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/langhuihui/monibuca/monica/pool" "github.com/langhuihui/monibuca/monica/avformat"
"time" "time"
) )
type Subscriber interface { type Subscriber interface {
Send(*pool.SendPacket) error Send(*avformat.SendPacket) error
} }
type SubscriberInfo struct { type SubscriberInfo struct {
@@ -23,14 +23,14 @@ type OutputStream struct {
context.Context context.Context
*Room *Room
SubscriberInfo SubscriberInfo
SendHandler func(*pool.SendPacket) error SendHandler func(*avformat.SendPacket) error
Cancel context.CancelFunc Cancel context.CancelFunc
Sign string Sign string
VTSent bool VTSent bool
ATSent bool ATSent bool
VSentTime uint32 VSentTime uint32
ASentTime uint32 ASentTime uint32
packetQueue chan *pool.SendPacket packetQueue chan *avformat.SendPacket
dropCount int dropCount int
OffsetTime uint32 OffsetTime uint32
firstScreenIndex int firstScreenIndex int
@@ -61,7 +61,7 @@ func (s *OutputStream) Play(streamPath string) (err error) {
} }
} }
} }
func (s *OutputStream) sendPacket(packet *pool.AVPacket, timestamp uint32) { func (s *OutputStream) sendPacket(packet *avformat.AVPacket, timestamp uint32) {
if !packet.IsAVCSequence && timestamp == 0 { if !packet.IsAVCSequence && timestamp == 0 {
timestamp = 1 //防止为0 timestamp = 1 //防止为0
} }
@@ -82,11 +82,11 @@ func (s *OutputStream) sendPacket(packet *pool.AVPacket, timestamp uint32) {
s.TotalDrop++ s.TotalDrop++
packet.Recycle() packet.Recycle()
} else if !s.IsClosed() { } else if !s.IsClosed() {
s.packetQueue <- pool.NewSendPacket(packet, timestamp) s.packetQueue <- avformat.NewSendPacket(packet, timestamp)
} }
} }
func (s *OutputStream) sendVideo(video *pool.AVPacket) error { func (s *OutputStream) sendVideo(video *avformat.AVPacket) error {
isKF := video.IsKeyFrame() isKF := video.IsKeyFrame()
if s.VTSent { if s.VTSent {
if s.FirstScreen == nil || s.firstScreenIndex == -1 { if s.FirstScreen == nil || s.firstScreenIndex == -1 {
@@ -119,7 +119,7 @@ func (s *OutputStream) sendVideo(video *pool.AVPacket) error {
s.VSentTime = video.Timestamp s.VSentTime = video.Timestamp
return s.sendVideo(video) return s.sendVideo(video)
} }
func (s *OutputStream) sendAudio(audio *pool.AVPacket) error { func (s *OutputStream) sendAudio(audio *avformat.AVPacket) error {
if s.ATSent { if s.ATSent {
if s.FirstScreen != nil && s.firstScreenIndex == -1 { if s.FirstScreen != nil && s.firstScreenIndex == -1 {
audio.Recycle() audio.Recycle()

73
monica/util/SSE.go Normal file
View File

@@ -0,0 +1,73 @@
package util
import (
"context"
"encoding/json"
"net/http"
"os/exec"
)
var (
sseEent = []byte("event: ")
sseBegin = []byte("data: ")
sseEnd = []byte("\n\n")
)
type SSE struct {
http.ResponseWriter
context.Context
}
func (sse *SSE) Write(data []byte) (n int, err error) {
if err = sse.Err(); err != nil {
return
}
_, err = sse.ResponseWriter.Write(sseBegin)
n, err = sse.ResponseWriter.Write(data)
_, err = sse.ResponseWriter.Write(sseEnd)
if err != nil {
return
}
sse.ResponseWriter.(http.Flusher).Flush()
return
}
func (sse *SSE) WriteEvent(event string, data []byte) (err error) {
if err = sse.Err(); err != nil {
return
}
_, err = sse.ResponseWriter.Write(sseEent)
_, err = sse.ResponseWriter.Write([]byte(event))
_, err = sse.ResponseWriter.Write([]byte("\n"))
_, err = sse.Write(data)
return
}
func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE {
header := w.Header()
header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive")
header.Set("X-Accel-Buffering", "no")
header.Set("Access-Control-Allow-Origin", "*")
return &SSE{
w,
ctx,
}
}
func (sse *SSE) WriteJSON(data interface{}) (err error) {
var jsonData []byte
if jsonData, err = json.Marshal(data); err == nil {
if _, err = sse.Write(jsonData); err != nil {
return
}
return
}
return
}
func (sse *SSE) WriteExec(cmd *exec.Cmd) error {
cmd.Stderr = sse
cmd.Stdout = sse
return cmd.Run()
}

View File

@@ -3,7 +3,6 @@ package HDL
import ( import (
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
"log" "log"
"net/http" "net/http"
"strings" "strings"
@@ -42,7 +41,7 @@ func HDLHandler(w http.ResponseWriter, r *http.Request) {
w.Write(avformat.FLVHeader) w.Write(avformat.FLVHeader)
p := OutputStream{ p := OutputStream{
Sign: sign, Sign: sign,
SendHandler: func(packet *pool.SendPacket) error { SendHandler: func(packet *avformat.SendPacket) error {
return avformat.WriteFLVTag(w, packet) return avformat.WriteFLVTag(w, packet)
}, },
SubscriberInfo: SubscriberInfo{ SubscriberInfo: SubscriberInfo{

View File

@@ -5,7 +5,6 @@ import (
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/avformat/mpegts" "github.com/langhuihui/monibuca/monica/avformat/mpegts"
"github.com/langhuihui/monibuca/monica/pool"
"github.com/langhuihui/monibuca/monica/util" "github.com/langhuihui/monibuca/monica/util"
"log" "log"
"time" "time"
@@ -46,12 +45,12 @@ func (ts *TS) run() {
ts.TotalPesCount++ ts.TotalPesCount++
switch tsPesPkt.PesPkt.Header.StreamID & 0xF0 { switch tsPesPkt.PesPkt.Header.StreamID & 0xF0 {
case mpegts.STREAM_ID_AUDIO: case mpegts.STREAM_ID_AUDIO:
av := pool.NewAVPacket(avformat.FLV_TAG_TYPE_AUDIO) av := avformat.NewAVPacket(avformat.FLV_TAG_TYPE_AUDIO)
av.Payload = tsPesPkt.PesPkt.Payload av.Payload = tsPesPkt.PesPkt.Payload
ts.PushAudio(av) ts.PushAudio(av)
case mpegts.STREAM_ID_VIDEO: case mpegts.STREAM_ID_VIDEO:
var err error var err error
av := pool.NewAVPacket(avformat.FLV_TAG_TYPE_VIDEO) av := avformat.NewAVPacket(avformat.FLV_TAG_TYPE_VIDEO)
ts.PTS = tsPesPkt.PesPkt.Header.Pts ts.PTS = tsPesPkt.PesPkt.Header.Pts
ts.DTS = tsPesPkt.PesPkt.Header.Dts ts.DTS = tsPesPkt.PesPkt.Header.Dts
lastDts := ts.lastDts lastDts := ts.lastDts
@@ -95,7 +94,7 @@ func (ts *TS) run() {
av.VideoFrameType = 1 av.VideoFrameType = 1
av.Payload = r.Bytes() av.Payload = r.Bytes()
ts.PushVideo(av) ts.PushVideo(av)
av = pool.NewAVPacket(avformat.FLV_TAG_TYPE_VIDEO) av = avformat.NewAVPacket(avformat.FLV_TAG_TYPE_VIDEO)
av.Timestamp = uint32(dts / 90) av.Timestamp = uint32(dts / 90)
r = bytes.NewBuffer([]byte{}) r = bytes.NewBuffer([]byte{})
continue continue

View File

@@ -61,9 +61,10 @@ func (p *HLS) run(info *M3u8Info) {
log.Printf("hls %s exit:%v", p.StreamPath, err) log.Printf("hls %s exit:%v", p.StreamPath, err)
p.Cancel() p.Cancel()
}() }()
errcount := 0
for ; err == nil && p.Err() == nil; resp, err = client.Do(info.Req) { for ; err == nil && p.Err() == nil; resp, err = client.Do(info.Req) {
if playlist, err := readM3U8(resp); err == nil { if playlist, err := readM3U8(resp); err == nil {
errcount = 0
info.LastM3u8 = playlist.String() info.LastM3u8 = playlist.String()
//if !playlist.Live { //if !playlist.Live {
// log.Println(p.LastM3u8) // log.Println(p.LastM3u8)
@@ -129,7 +130,11 @@ func (p *HLS) run(info *M3u8Info) {
time.Sleep(time.Second * time.Duration(playlist.Target) * 2) time.Sleep(time.Second * time.Duration(playlist.Target) * 2)
} else { } else {
log.Printf("%s readM3u8:%v", p.StreamPath, err) log.Printf("%s readM3u8:%v", p.StreamPath, err)
return errcount++
if errcount > 10 {
return
}
//return
} }
} }
} }

View File

@@ -1,28 +1,27 @@
package QoS package QoS
import ( import (
"strings"
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
) )
var ( // var (
selectMap = map[string][]string{ // selectMap = map[string][]string{
"low": {"low", "medium", "high"}, // "low": {"low", "medium", "high"},
"medium": {"medium", "low", "high"}, // "medium": {"medium", "low", "high"},
"high": {"high", "medium", "low"}, // "high": {"high", "medium", "low"},
} // }
) // )
func getQualityName(name string, qualityLevel string) string { // func getQualityName(name string, qualityLevel string) string {
if qualityLevel == "" { // for _, l := range selectMap[qualityLevel] {
return name // if _, ok := AllRoom.Load(name + "/" + l); ok {
} // return name + "/" + l
for _, l := range selectMap[qualityLevel] { // }
if _, ok := AllRoom.Load(name + "/" + l); ok { // }
return name + "/" + l // return name + "/" + qualityLevel
} // }
}
return name + "/" + qualityLevel
}
var config = struct { var config = struct {
Suffix []string Suffix []string
@@ -39,8 +38,23 @@ func init() {
func run() { func run() {
OnDropHooks.AddHook(func(s *OutputStream) { OnDropHooks.AddHook(func(s *OutputStream) {
if s.TotalDrop > s.TotalPacket>>2 { if s.TotalDrop > s.TotalPacket>>2 {
//TODO var newStreamPath = ""
//s.Control<-&ChangeRoomCmd{s,AllRoom.Get()} for i, suf := range config.Suffix {
if strings.HasSuffix(s.StreamPath, suf) {
if i < len(config.Suffix)-1 {
newStreamPath = s.StreamPath + "/" + config.Suffix[i+1]
break
}
} else {
newStreamPath = s.StreamPath + "/" + suf
break
}
}
if newStreamPath != "" {
if _, ok := AllRoom.Load(newStreamPath); ok {
s.Control <- &ChangeRoomCmd{s, AllRoom.Get(newStreamPath)}
}
}
} }
}) })
} }

View File

@@ -3,6 +3,7 @@ package cluster
import ( import (
"bufio" "bufio"
"encoding/json" "encoding/json"
"io"
"log" "log"
"math/rand" "math/rand"
"net" "net"
@@ -46,56 +47,52 @@ func run() {
if MayBeError(err) { if MayBeError(err) {
return return
} }
masterConn, err = net.DialTCP("tcp", nil, addr) go readMaster(addr)
if MayBeError(err) {
return
}
go readMaster()
} }
if config.ListenAddr != "" { if config.ListenAddr != "" {
Summary.Children = make(map[string]*ServerSummary)
OnSummaryHooks.AddHook(onSummary) OnSummaryHooks.AddHook(onSummary)
log.Printf("server bare start at %s", config.ListenAddr) log.Printf("server bare start at %s", config.ListenAddr)
log.Fatal(ListenBare(config.ListenAddr)) log.Fatal(ListenBare(config.ListenAddr))
} }
} }
func readMaster() {
func readMaster(addr *net.TCPAddr) {
var err error var err error
defer func() { var cmd byte
for {
t := 5 + rand.Int63n(5)
log.Printf("reconnect to master %s after %d seconds", config.Master, t)
time.Sleep(time.Duration(t) * time.Second)
addr, _ := net.ResolveTCPAddr("tcp", config.Master)
if masterConn, err = net.DialTCP("tcp", nil, addr); err == nil {
go readMaster()
return
}
}
}()
brw := bufio.NewReadWriter(bufio.NewReader(masterConn), bufio.NewWriter(masterConn))
log.Printf("connect to master %s reporting", config.Master)
//首次报告
if b, err := json.Marshal(Summary); err == nil {
_, err = masterConn.Write(b)
}
for { for {
cmd, err := brw.ReadByte() if masterConn, err = net.DialTCP("tcp", nil, addr); !MayBeError(err) {
if err != nil { reader := bufio.NewReader(masterConn)
return log.Printf("connect to master %s reporting", config.Master)
} for report(); err == nil; {
switch cmd { if cmd, err = reader.ReadByte(); !MayBeError(err) {
case MSG_SUMMARY: //收到主服务器指令,进行采集和上报 switch cmd {
log.Println("receive summary request from master") case MSG_SUMMARY: //收到主服务器指令,进行采集和上报
if cmd, err = brw.ReadByte(); err != nil { log.Println("receive summary request from master")
return if cmd, err = reader.ReadByte(); !MayBeError(err) {
} if cmd == 1 {
if cmd == 1 { Summary.Add()
Summary.Add() go onReport()
go onReport() } else {
} else { Summary.Done()
Summary.Done() }
}
}
}
} }
} }
t := 5 + rand.Int63n(5)
log.Printf("reconnect to master %s after %d seconds", config.Master, t)
time.Sleep(time.Duration(t) * time.Second)
}
}
func report() {
if b, err := json.Marshal(Summary); err == nil {
data := make([]byte, len(b)+2)
data[0] = MSG_SUMMARY
copy(data[1:], b)
data[len(data)-1] = 0
_, err = masterConn.Write(data)
} }
} }
@@ -103,28 +100,24 @@ func readMaster() {
func onReport() { func onReport() {
for range time.NewTicker(time.Second).C { for range time.NewTicker(time.Second).C {
if Summary.Running() { if Summary.Running() {
if b, err := json.Marshal(Summary); err == nil { report()
data := make([]byte, len(b)+2)
data[0] = MSG_SUMMARY
copy(data[1:], b)
data[len(data)-1] = 0
_, err = masterConn.Write(data)
}
} else { } else {
return return
} }
} }
} }
func orderReport(conn io.Writer, start bool) {
b := []byte{MSG_SUMMARY, 0}
if start {
b[1] = 1
}
conn.Write(b)
}
//通知从服务器需要上报或者关闭上报 //通知从服务器需要上报或者关闭上报
func onSummary(start bool) { func onSummary(start bool) {
slaves.Range(func(k, v interface{}) bool { slaves.Range(func(k, v interface{}) bool {
conn := v.(*net.TCPConn) orderReport(v.(*net.TCPConn), start)
b := []byte{MSG_SUMMARY, 0}
if start {
b[1] = 1
}
conn.Write(b)
return true return true
}) })
} }

View File

@@ -3,12 +3,14 @@ package cluster
import ( import (
"bufio" "bufio"
"encoding/binary" "encoding/binary"
"io"
"log"
"net"
"strings"
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool" "github.com/langhuihui/monibuca/monica/pool"
"io"
"net"
"strings"
) )
type Receiver struct { type Receiver struct {
@@ -24,14 +26,15 @@ func (p *Receiver) Auth(authSub *OutputStream) {
p.Flush() p.Flush()
} }
func (p *Receiver) readAVPacket(avType byte) (av *pool.AVPacket, err error) { func (p *Receiver) readAVPacket(avType byte) (av *avformat.AVPacket, err error) {
buf := pool.GetSlice(4) buf := pool.GetSlice(4)
defer pool.RecycleSlice(buf)
_, err = io.ReadFull(p, buf) _, err = io.ReadFull(p, buf)
if err != nil { if err != nil {
println(err.Error()) println(err.Error())
return return
} }
av = pool.NewAVPacket(avType) av = avformat.NewAVPacket(avType)
av.Timestamp = binary.BigEndian.Uint32(buf) av.Timestamp = binary.BigEndian.Uint32(buf)
_, err = io.ReadFull(p, buf) _, err = io.ReadFull(p, buf)
if MayBeError(err) { if MayBeError(err) {
@@ -39,10 +42,7 @@ func (p *Receiver) readAVPacket(avType byte) (av *pool.AVPacket, err error) {
} }
av.Payload = pool.GetSlice(int(binary.BigEndian.Uint32(buf))) av.Payload = pool.GetSlice(int(binary.BigEndian.Uint32(buf)))
_, err = io.ReadFull(p, av.Payload) _, err = io.ReadFull(p, av.Payload)
if MayBeError(err) { MayBeError(err)
return
}
pool.RecycleSlice(buf)
return return
} }
@@ -57,7 +57,7 @@ func PullUpStream(streamPath string) {
} }
brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn)) brw := bufio.NewReadWriter(bufio.NewReader(conn), bufio.NewWriter(conn))
p := &Receiver{ p := &Receiver{
Reader: conn, Reader: brw.Reader,
Writer: brw.Writer, Writer: brw.Writer,
} }
if p.Publish(streamPath, p) { if p.Publish(streamPath, p) {
@@ -72,11 +72,7 @@ func PullUpStream(streamPath string) {
return return
} }
defer p.Cancel() defer p.Cancel()
for { for cmd, err := brw.ReadByte(); !MayBeError(err); cmd, err = brw.ReadByte() {
cmd, err := brw.ReadByte()
if MayBeError(err) {
return
}
switch cmd { switch cmd {
case MSG_AUDIO: case MSG_AUDIO:
if audio, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil { if audio, err := p.readAVPacket(avformat.FLV_TAG_TYPE_AUDIO); err == nil {
@@ -103,6 +99,8 @@ func PullUpStream(streamPath string) {
v.Cancel() v.Cancel()
} }
} }
default:
log.Printf("unknown cmd:%v", cmd)
} }
} }
} }

View File

@@ -5,6 +5,8 @@ import (
"encoding/binary" "encoding/binary"
"encoding/json" "encoding/json"
"fmt" "fmt"
"github.com/langhuihui/monibuca/monica/avformat"
"io"
"net" "net"
"strconv" "strconv"
"strings" "strings"
@@ -53,7 +55,7 @@ func process(conn net.Conn) {
reader := bufio.NewReader(conn) reader := bufio.NewReader(conn)
connAddr := conn.RemoteAddr().String() connAddr := conn.RemoteAddr().String()
stream := OutputStream{ stream := OutputStream{
SendHandler: func(p *pool.SendPacket) error { SendHandler: func(p *avformat.SendPacket) error {
head := pool.GetSlice(9) head := pool.GetSlice(9)
head[0] = p.Packet.Type - 7 head[0] = p.Packet.Type - 7
binary.BigEndian.PutUint32(head[1:5], p.Timestamp) binary.BigEndian.PutUint32(head[1:5], p.Timestamp)
@@ -80,29 +82,32 @@ func process(conn net.Conn) {
if err != nil { if err != nil {
return return
} }
bytes = bytes[0 : len(bytes)-1]
switch cmd { switch cmd {
case MSG_SUBSCRIBE: case MSG_SUBSCRIBE:
if stream.Room != nil { if stream.Room != nil {
fmt.Printf("bare stream already exist from %s", conn.RemoteAddr()) fmt.Printf("bare stream already exist from %s", conn.RemoteAddr())
return return
} }
streamName := string(bytes[0 : len(bytes)-1]) go stream.Play(string(bytes))
go stream.Play(streamName)
case MSG_AUTH: case MSG_AUTH:
sign := strings.Split(string(bytes[0:len(bytes)-1]), ",") sign := strings.Split(string(bytes), ",")
head := []byte{MSG_AUTH, 2} head := []byte{MSG_AUTH, 2}
if len(sign) > 1 && AuthHooks.Trigger(sign[1]) == nil { if len(sign) > 1 && AuthHooks.Trigger(sign[1]) == nil {
head[1] = 1 head[1] = 1
} }
conn.Write(head) conn.Write(head)
conn.Write(bytes) conn.Write(bytes[0 : len(bytes)+1])
case MSG_SUMMARY: //收到从服务器发来报告,加入摘要中 case MSG_SUMMARY: //收到从服务器发来报告,加入摘要中
var summary *ServerSummary summary := &ServerSummary{}
if err = json.Unmarshal(bytes, summary); err == nil { if err = json.Unmarshal(bytes, summary); err == nil {
summary.Address = connAddr summary.Address = connAddr
Summary.Report(summary) Summary.Report(summary)
if _, ok := slaves.Load(connAddr); !ok { if _, ok := slaves.Load(connAddr); !ok {
slaves.Store(connAddr, conn) slaves.Store(connAddr, conn)
if Summary.Running() {
orderReport(io.Writer(conn), true)
}
defer slaves.Delete(connAddr) defer slaves.Delete(connAddr)
} }
} }

View File

@@ -1,74 +1,25 @@
package gateway package gateway
import ( import (
"context"
"encoding/json" "encoding/json"
"io/ioutil" "io/ioutil"
"log" "log"
"mime" "mime"
"net/http" "net/http"
"os/exec"
"path" "path"
"runtime" "runtime"
"time" "time"
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
. "github.com/langhuihui/monibuca/monica/util"
) )
var ( var (
config = new(ListenerConfig) config = new(ListenerConfig)
sseBegin = []byte("data: ") startTime = time.Now()
sseEnd = []byte("\n\n")
dashboardPath string dashboardPath string
) )
type SSE struct {
http.ResponseWriter
context.Context
}
func (sse *SSE) Write(data []byte) (n int, err error) {
if err = sse.Err(); err != nil {
return
}
_, err = sse.ResponseWriter.Write(sseBegin)
n, err = sse.ResponseWriter.Write(data)
_, err = sse.ResponseWriter.Write(sseEnd)
if err != nil {
return
}
sse.ResponseWriter.(http.Flusher).Flush()
return
}
func NewSSE(w http.ResponseWriter, ctx context.Context) *SSE {
header := w.Header()
header.Set("Content-Type", "text/event-stream")
header.Set("Cache-Control", "no-cache")
header.Set("Connection", "keep-alive")
header.Set("X-Accel-Buffering", "no")
header.Set("Access-Control-Allow-Origin", "*")
return &SSE{
w,
ctx,
}
}
func (sse *SSE) WriteJSON(data interface{}) (err error) {
var jsonData []byte
if jsonData, err = json.Marshal(data); err == nil {
if _, err = sse.Write(jsonData); err != nil {
return
}
return
}
return
}
func (sse *SSE) WriteExec(cmd *exec.Cmd) error {
cmd.Stderr = sse
cmd.Stdout = sse
return cmd.Run()
}
func init() { func init() {
_, currentFilePath, _, _ := runtime.Caller(0) _, currentFilePath, _, _ := runtime.Caller(0)
dashboardPath = path.Join(path.Dir(currentFilePath), "../../dashboard/dist") dashboardPath = path.Join(path.Dir(currentFilePath), "../../dashboard/dist")
@@ -81,6 +32,7 @@ func init() {
}) })
} }
func run() { func run() {
http.HandleFunc("/api/sysInfo", sysInfo)
http.HandleFunc("/api/stop", stopPublish) http.HandleFunc("/api/stop", stopPublish)
http.HandleFunc("/api/summary", summary) http.HandleFunc("/api/summary", summary)
http.HandleFunc("/api/logs", watchLogs) http.HandleFunc("/api/logs", watchLogs)
@@ -146,3 +98,9 @@ func summary(w http.ResponseWriter, r *http.Request) {
} }
} }
} }
func sysInfo(w http.ResponseWriter, r *http.Request) {
bytes, err := json.Marshal(EngineInfo)
if err == nil {
_, err = w.Write(bytes)
}
}

View File

@@ -2,12 +2,13 @@ package jessica
import ( import (
"encoding/binary" "encoding/binary"
"net/http"
"strings"
"github.com/gobwas/ws" "github.com/gobwas/ws"
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool" "github.com/langhuihui/monibuca/monica/pool"
"net/http"
"strings"
) )
func WsHandler(w http.ResponseWriter, r *http.Request) { func WsHandler(w http.ResponseWriter, r *http.Request) {
@@ -31,7 +32,7 @@ func WsHandler(w http.ResponseWriter, r *http.Request) {
defer conn.Close() defer conn.Close()
if isFlv { if isFlv {
baseStream.Type = "JessicaFlv" baseStream.Type = "JessicaFlv"
baseStream.SendHandler = func(packet *pool.SendPacket) error { baseStream.SendHandler = func(packet *avformat.SendPacket) error {
return avformat.WriteFLVTag(conn, packet) return avformat.WriteFLVTag(conn, packet)
} }
if err := ws.WriteHeader(conn, ws.Header{ if err := ws.WriteHeader(conn, ws.Header{
@@ -46,7 +47,7 @@ func WsHandler(w http.ResponseWriter, r *http.Request) {
} }
} else { } else {
baseStream.Type = "Jessica" baseStream.Type = "Jessica"
baseStream.SendHandler = func(packet *pool.SendPacket) error { baseStream.SendHandler = func(packet *avformat.SendPacket) error {
err := ws.WriteHeader(conn, ws.Header{ err := ws.WriteHeader(conn, ws.Header{
Fin: true, Fin: true,
OpCode: ws.OpBinary, OpCode: ws.OpBinary,

View File

@@ -3,7 +3,6 @@ package record
import ( import (
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
"github.com/langhuihui/monibuca/monica/util" "github.com/langhuihui/monibuca/monica/util"
"io" "io"
"os" "os"
@@ -17,7 +16,7 @@ func getDuration(file *os.File) uint32 {
if tagSize, err = util.ReadByteToUint32(file, true); err == nil { if tagSize, err = util.ReadByteToUint32(file, true); err == nil {
_, err = file.Seek(-int64(tagSize)-4, io.SeekEnd) _, err = file.Seek(-int64(tagSize)-4, io.SeekEnd)
if err == nil { if err == nil {
var tag *pool.AVPacket var tag *avformat.AVPacket
tag, err = avformat.ReadFLVTag(file) tag, err = avformat.ReadFLVTag(file)
if err == nil { if err == nil {
return tag.Timestamp return tag.Timestamp
@@ -40,7 +39,7 @@ func SaveFlv(streamPath string, append bool) error {
if err != nil { if err != nil {
return err return err
} }
p := OutputStream{SendHandler: func(packet *pool.SendPacket) error { p := OutputStream{SendHandler: func(packet *avformat.SendPacket) error {
return avformat.WriteFLVTag(file, packet) return avformat.WriteFLVTag(file, packet)
}} }}
p.ID = filePath p.ID = filePath

View File

@@ -3,6 +3,7 @@ package rtmp
import ( import (
"bufio" "bufio"
"errors" "errors"
"github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool" "github.com/langhuihui/monibuca/monica/pool"
"github.com/langhuihui/monibuca/monica/util" "github.com/langhuihui/monibuca/monica/util"
"io" "io"
@@ -312,21 +313,21 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m) return conn.writeMessage(RTMP_MSG_AMF0_COMMAND, m)
case SEND_UNPUBLISH_RESPONSE_MESSAGE: case SEND_UNPUBLISH_RESPONSE_MESSAGE:
case SEND_FULL_AUDIO_MESSAGE: case SEND_FULL_AUDIO_MESSAGE:
audio, ok := args.(*pool.SendPacket) audio, ok := args.(*avformat.SendPacket)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(audio, true, true) return conn.sendAVMessage(audio, true, true)
case SEND_AUDIO_MESSAGE: case SEND_AUDIO_MESSAGE:
audio, ok := args.(*pool.SendPacket) audio, ok := args.(*avformat.SendPacket)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
return conn.sendAVMessage(audio, true, false) return conn.sendAVMessage(audio, true, false)
case SEND_FULL_VDIEO_MESSAGE: case SEND_FULL_VDIEO_MESSAGE:
video, ok := args.(*pool.SendPacket) video, ok := args.(*avformat.SendPacket)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
@@ -334,7 +335,7 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
return conn.sendAVMessage(video, false, true) return conn.sendAVMessage(video, false, true)
case SEND_VIDEO_MESSAGE: case SEND_VIDEO_MESSAGE:
{ {
video, ok := args.(*pool.SendPacket) video, ok := args.(*avformat.SendPacket)
if !ok { if !ok {
errors.New(message + ", The parameter is AVPacket") errors.New(message + ", The parameter is AVPacket")
} }
@@ -349,7 +350,7 @@ func (conn *NetConnection) SendMessage(message string, args interface{}) error {
// 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间 // 当发送音视频数据的时候,当块类型为12的时候,Chunk Message Header有一个字段TimeStamp,指明一个时间
// 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值 // 当块类型为4,8的时候,Chunk Message Header有一个字段TimeStamp Delta,记录与上一个Chunk的时间差值
// 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同 // 当块类型为0的时候,Chunk Message Header没有时间字段,与上一个Chunk时间值相同
func (conn *NetConnection) sendAVMessage(av *pool.SendPacket, isAudio bool, isFirst bool) error { func (conn *NetConnection) sendAVMessage(av *avformat.SendPacket, isAudio bool, isFirst bool) error {
if conn.writeSeqNum > conn.bandwidth { if conn.writeSeqNum > conn.bandwidth {
conn.totalWrite += conn.writeSeqNum conn.totalWrite += conn.writeSeqNum
conn.writeSeqNum = 0 conn.writeSeqNum = 0

View File

@@ -5,7 +5,6 @@ import (
"fmt" "fmt"
. "github.com/langhuihui/monibuca/monica" . "github.com/langhuihui/monibuca/monica"
"github.com/langhuihui/monibuca/monica/avformat" "github.com/langhuihui/monibuca/monica/avformat"
"github.com/langhuihui/monibuca/monica/pool"
"log" "log"
"net" "net"
"strings" "strings"
@@ -103,7 +102,7 @@ func processRtmp(conn net.Conn) {
streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0] streamPath := nc.appName + "/" + strings.Split(pm.PublishingName, "?")[0]
pub := new(RTMP) pub := new(RTMP)
if pub.Publish(streamPath, pub) { if pub.Publish(streamPath, pub) {
pub.FirstScreen = make([]*pool.AVPacket, 0) pub.FirstScreen = make([]*avformat.AVPacket, 0)
room = pub.Room room = pub.Room
err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil) err = nc.SendMessage(SEND_STREAM_BEGIN_MESSAGE, nil)
err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status)) err = nc.SendMessage(SEND_PUBLISH_START_MESSAGE, newPublishResponseMessageData(nc.streamID, NetStream_Publish_Start, Level_Status))
@@ -114,15 +113,15 @@ func processRtmp(conn net.Conn) {
pm := msg.MsgData.(*PlayMessage) pm := msg.MsgData.(*PlayMessage)
streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0] streamPath := nc.appName + "/" + strings.Split(pm.StreamName, "?")[0]
nc.writeChunkSize = 512 nc.writeChunkSize = 512
stream := &OutputStream{SendHandler: func(packet *pool.SendPacket) (err error) { stream := &OutputStream{SendHandler: func(packet *avformat.SendPacket) (err error) {
switch true { switch true {
case packet.Packet.IsADTS: case packet.Packet.IsADTS:
tagPacket := pool.NewAVPacket(RTMP_MSG_AUDIO) tagPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
tagPacket.Payload = avformat.ADTSToAudioSpecificConfig(packet.Packet.Payload) tagPacket.Payload = avformat.ADTSToAudioSpecificConfig(packet.Packet.Payload)
err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, tagPacket) err = nc.SendMessage(SEND_FULL_AUDIO_MESSAGE, tagPacket)
ADTSLength := 7 + (int(packet.Packet.Payload[1]&1) << 1) ADTSLength := 7 + (int(packet.Packet.Payload[1]&1) << 1)
if len(packet.Packet.Payload) > ADTSLength { if len(packet.Packet.Payload) > ADTSLength {
contentPacket := pool.NewAVPacket(RTMP_MSG_AUDIO) contentPacket := avformat.NewAVPacket(RTMP_MSG_AUDIO)
contentPacket.Timestamp = packet.Timestamp contentPacket.Timestamp = packet.Timestamp
contentPacket.Payload = make([]byte, len(packet.Packet.Payload)-ADTSLength+2) contentPacket.Payload = make([]byte, len(packet.Packet.Payload)-ADTSLength+2)
contentPacket.Payload[0] = 0xAF contentPacket.Payload[0] = 0xAF
@@ -162,7 +161,7 @@ func processRtmp(conn net.Conn) {
} }
} }
case RTMP_MSG_AUDIO: case RTMP_MSG_AUDIO:
pkt := pool.NewAVPacket(RTMP_MSG_AUDIO) pkt := avformat.NewAVPacket(RTMP_MSG_AUDIO)
if msg.Timestamp == 0xffffff { if msg.Timestamp == 0xffffff {
totalDuration += msg.ExtendTimestamp totalDuration += msg.ExtendTimestamp
} else { } else {
@@ -172,7 +171,7 @@ func processRtmp(conn net.Conn) {
pkt.Payload = msg.Body pkt.Payload = msg.Body
room.PushAudio(pkt) room.PushAudio(pkt)
case RTMP_MSG_VIDEO: case RTMP_MSG_VIDEO:
pkt := pool.NewAVPacket(RTMP_MSG_VIDEO) pkt := avformat.NewAVPacket(RTMP_MSG_VIDEO)
if msg.Timestamp == 0xffffff { if msg.Timestamp == 0xffffff {
totalDuration += msg.ExtendTimestamp totalDuration += msg.ExtendTimestamp
} else { } else {

21
pm/.gitignore vendored Normal file
View File

@@ -0,0 +1,21 @@
.DS_Store
node_modules
# local env files
.env.local
.env.*.local
# Log files
npm-debug.log*
yarn-debug.log*
yarn-error.log*
# Editor directories and files
.idea
.vscode
*.suo
*.ntvs*
*.njsproj
*.sln
*.sw?

24
pm/README.md Normal file
View File

@@ -0,0 +1,24 @@
# pm
## Project setup
```
npm install
```
### Compiles and hot-reloads for development
```
npm run serve
```
### Compiles and minifies for production
```
npm run build
```
### Lints and fixes files
```
npm run lint
```
### Customize configuration
See [Configuration Reference](https://cli.vuejs.org/config/).

5
pm/babel.config.js Normal file
View File

@@ -0,0 +1,5 @@
module.exports = {
presets: [
'@vue/cli-plugin-babel/preset'
]
}

535
pm/dist/ajax.js vendored Normal file
View File

@@ -0,0 +1,535 @@
// a simple ajax
!(function () {
var jsonType = 'application/json';
var htmlType = 'text/html';
var xmlTypeRE = /^(?:text|application)\/xml/i;
var blankRE = /^\s*$/; // \s
/*
* default setting
* */
var _settings = {
type: "GET",
beforeSend: noop,
success: noop,
error: noop,
complete: noop,
context: null,
xhr: function () {
return new window.XMLHttpRequest();
},
accepts: {
json: jsonType,
xml: 'application/xml, text/xml',
html: htmlType,
text: 'text/plain'
},
crossDomain: false,
timeout: 0,
username: null,
password: null,
processData: true,
promise: noop
};
function noop() {
}
var ajax = function (options) {
//
var settings = extend({}, options || {});
//
for (var key in _settings) {
if (settings[key] === undefined) {
settings[key] = _settings[key];
}
}
//
try {
var q = {};
var promise = new Promise(function (resolve, reject) {
q.resolve = resolve;
q.reject = reject;
});
promise.resolve = q.resolve;
promise.reject = q.reject;
settings.promise = promise;
}
catch (e) {
//
settings.promise = {
resolve: noop,
reject: noop
};
}
//
if (!settings.crossDomain) {
settings.crossDomain = /^([\w-]+:)?\/\/([^\/]+)/.test(settings.url) && RegExp.$2 !== window.location.href;
}
var dataType = settings.dataType;
// jsonp
if (dataType === 'jsonp') {
//
var hasPlaceholder = /=\?/.test(settings.url);
if (!hasPlaceholder) {
var jsonpCallback = (settings.jsonp || 'callback') + '=?';
settings.url = appendQuery(settings.url, jsonpCallback)
}
return JSONP(settings);
}
// url
if (!settings.url) {
settings.url = window.location.toString();
}
//
serializeData(settings);
var mime = settings.accepts[dataType]; // mime
var baseHeader = {}; // header
var protocol = /^([\w-]+:)\/\//.test(settings.url) ? RegExp.$1 : window.location.protocol; // protocol
var xhr = _settings.xhr();
var abortTimeout;
// X-Requested-With header
// For cross-domain requests, seeing as conditions for a preflight are
// akin to a jigsaw puzzle, we simply never set it to be sure.
// (it can always be set on a per-request basis or even using ajaxSetup)
// For same-domain requests, won't change header if already provided.
if (!settings.crossDomain && !baseHeader['X-Requested-With']) {
baseHeader['X-Requested-With'] = 'XMLHttpRequest';
}
// mime
if (mime) {
//
baseHeader['Accept'] = mime;
if (mime.indexOf(',') > -1) {
mime = mime.split(',', 2)[0]
}
//
xhr.overrideMimeType && xhr.overrideMimeType(mime);
}
// contentType
if (settings.contentType || (settings.data && settings.type.toUpperCase() !== 'GET')) {
baseHeader['Content-Type'] = (settings.contentType || 'application/x-www-form-urlencoded; charset=UTF-8');
}
// headers
settings.headers = extend(baseHeader, settings.headers || {});
// on ready state change
xhr.onreadystatechange = function () {
// readystate
if (xhr.readyState === 4) {
clearTimeout(abortTimeout);
var result;
var error = false;
//
if ((xhr.status >= 200 && xhr.status < 300) || xhr.status === 304) {
dataType = dataType || mimeToDataType(xhr.getResponseHeader('content-type'));
result = xhr.responseText;
try {
// xml
if (dataType === 'xml') {
result = xhr.responseXML;
}
// json
else if (dataType === 'json') {
result = blankRE.test(result) ? null : JSON.parse(result);
}
}
catch (e) {
error = e;
}
if (error) {
ajaxError(error, 'parseerror', xhr, settings);
}
else {
ajaxSuccess(result, xhr, settings);
}
}
else {
ajaxError(null, 'error', xhr, settings);
}
}
};
// async
var async = 'async' in settings ? settings.async : true;
// open
xhr.open(settings.type, settings.url, async, settings.username, settings.password);
// xhrFields
if (settings.xhrFields) {
for (var name in settings.xhrFields) {
xhr[name] = settings.xhrFields[name];
}
}
// Override mime type if needed
if (settings.mimeType && xhr.overrideMimeType) {
xhr.overrideMimeType(settings.mimeType);
}
// set request header
for (var name in settings.headers) {
// Support: IE<9
// IE's ActiveXObject throws a 'Type Mismatch' exception when setting
// request header to a null-value.
//
// To keep consistent with other XHR implementations, cast the value
// to string and ignore `undefined`.
if (settings.headers[name] !== undefined) {
xhr.setRequestHeader(name, settings.headers[name] + "");
}
}
// before send
if (ajaxBeforeSend(xhr, settings) === false) {
xhr.abort();
return false;
}
// timeout
if (settings.timeout > 0) {
abortTimeout = window.setTimeout(function () {
xhr.onreadystatechange = noop;
xhr.abort();
ajaxError(null, 'timeout', xhr, settings);
}, settings.timeout);
}
// send
xhr.send(settings.data ? settings.data : null);
return settings.promise;
};
/*
* method get
* */
ajax.get = function (url, data, success, dataType) {
if (isFunction(data)) {
dataType = dataType || success;
success = data;
data = undefined;
}
return ajax({
url: url,
data: data,
success: success,
dataType: dataType
});
};
/*
* method post
*
* dataType:
* */
ajax.post = function (url, data, success, dataType) {
if (isFunction(data)) {
dataType = dataType || success;
success = data;
data = undefined;
}
return ajax({
type: 'POST',
url: url,
data: data,
success: success,
dataType: dataType
})
};
/*
* method getJSON
* */
ajax.getJSON = function (url, data, success) {
if (isFunction(data)) {
success = data;
data = undefined;
}
return ajax({
url: url,
data: data,
success: success,
dataType: 'json'
})
};
/*
* method ajaxSetup
* */
ajax.ajaxSetup = function (target, settings) {
return settings ? extend(extend(target, _settings), settings) : extend(_settings, target);
};
/*
* utils
*
* */
// triggers and extra global event ajaxBeforeSend that's like ajaxSend but cancelable
function ajaxBeforeSend(xhr, settings) {
var context = settings.context;
//
if (settings.beforeSend.call(context, xhr, settings) === false) {
return false;
}
}
// ajax success
function ajaxSuccess(data, xhr, settings) {
var context = settings.context;
var status = 'success';
settings.success.call(context, data, status, xhr);
settings.promise.resolve(data, status, xhr);
ajaxComplete(status, xhr, settings);
}
// status: "success", "notmodified", "error", "timeout", "abort", "parsererror"
function ajaxComplete(status, xhr, settings) {
var context = settings.context;
settings.complete.call(context, xhr, status);
}
// type: "timeout", "error", "abort", "parsererror"
function ajaxError(error, type, xhr, settings) {
var context = settings.context;
settings.error.call(context, xhr, type, error);
settings.promise.reject(xhr, type, error);
ajaxComplete(type, xhr, settings);
}
// jsonp
/*
* tks: https://www.cnblogs.com/rubylouvre/archive/2011/02/13/1953087.html
* */
function JSONP(options) {
//
var callbackName = options.jsonpCallback || 'jsonp' + (new Date().getTime());
var script = window.document.createElement('script');
var abort = function () {
// 设置 window.xxx = noop
if (callbackName in window) {
window[callbackName] = noop;
}
};
var xhr = {abort: abort};
var abortTimeout;
var head = window.document.getElementsByTagName('head')[0] || window.document.documentElement;
// ie8+
script.onerror = function (error) {
_error(error);
};
function _error(error) {
window.clearTimeout(abortTimeout);
xhr.abort();
ajaxError(error.type, xhr, error.type, options);
_removeScript();
}
window[callbackName] = function (data) {
window.clearTimeout(abortTimeout);
ajaxSuccess(data, xhr, options);
_removeScript();
};
//
serializeData(options);
script.src = options.url.replace(/=\?/, '=' + callbackName);
//
script.src = appendQuery(script.src, '_=' + (new Date()).getTime());
//
script.async = true;
// script charset
if (options.scriptCharset) {
script.charset = options.scriptCharset;
}
//
head.insertBefore(script, head.firstChild);
//
if (options.timeout > 0) {
abortTimeout = window.setTimeout(function () {
xhr.abort();
ajaxError('timeout', xhr, 'timeout', options);
_removeScript();
}, options.timeout);
}
// remove script
function _removeScript() {
if (script.clearAttributes) {
script.clearAttributes();
} else {
script.onload = script.onreadystatechange = script.onerror = null;
}
if (script.parentNode) {
script.parentNode.removeChild(script);
}
//
script = null;
delete window[callbackName];
}
return options.promise;
}
// mime to data type
function mimeToDataType(mime) {
return mime && (mime === htmlType ? 'html' : mime === jsonType ? 'json' : xmlTypeRE.test(mime) && 'xml') || 'text'
}
// append query
function appendQuery(url, query) {
return (url + '&' + query).replace(/[&?]{1,2}/, '?');
}
// serialize data
function serializeData(options) {
// formData
if (isObject(options) && !isFormData(options.data) && options.processData) {
options.data = param(options.data);
}
if (options.data && (!options.type || options.type.toUpperCase() === 'GET')) {
options.url = appendQuery(options.url, options.data);
}
}
// serialize
function serialize(params, obj, traditional, scope) {
var _isArray = isArray(obj);
for (var key in obj) {
var value = obj[key];
if (scope) {
key = traditional ? scope : scope + '[' + (_isArray ? '' : key) + ']';
}
// handle data in serializeArray format
if (!scope && _isArray) {
params.add(value.name, value.value);
}
else if (traditional ? _isArray(value) : isObject(value)) {
serialize(params, value, traditional, key);
}
else {
params.add(key, value);
}
}
}
// param
function param(obj, traditional) {
var params = [];
//
params.add = function (k, v) {
this.push(encodeURIComponent(k) + '=' + encodeURIComponent(v));
};
serialize(params, obj, traditional);
return params.join('&').replace('%20', '+');
}
// extend
function extend(target) {
var slice = Array.prototype.slice;
var args = slice.call(arguments, 1);
//
for (var i = 0, length = args.length; i < length; i++) {
var source = args[i] || {};
for (var key in source) {
if (source.hasOwnProperty(key) && source[key] !== undefined) {
target[key] = source[key];
}
}
}
return target;
}
// is object
function isObject(obj) {
var type = typeof obj;
return type === 'function' || type === 'object' && !!obj;
}
// is formData
function isFormData(obj) {
return obj instanceof FormData;
}
// is array
function isArray(value) {
return Object.prototype.toString.call(value) === "[object Array]";
}
// is function
function isFunction(value) {
return typeof value === "function";
}
// browser
window.ajax = ajax;
})();

1
pm/dist/css/app.200d2f8f.css vendored Normal file
View File

@@ -0,0 +1 @@
.content{background:#fff}pre{white-space:pre-wrap;word-wrap:break-word}.ivu-tabs .ivu-tabs-tabpane{padding:20px}

File diff suppressed because one or more lines are too long

BIN
pm/dist/favicon.ico vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

BIN
pm/dist/fonts/ionicons.143146fa.woff2 vendored Normal file

Binary file not shown.

BIN
pm/dist/fonts/ionicons.99ac3308.woff vendored Normal file

Binary file not shown.

BIN
pm/dist/fonts/ionicons.d535a25a.ttf vendored Normal file

Binary file not shown.

870
pm/dist/img/ionicons.a2c4a261.svg vendored Normal file

File diff suppressed because one or more lines are too long

After

Width:  |  Height:  |  Size: 542 KiB

1
pm/dist/index.html vendored Normal file
View File

@@ -0,0 +1 @@
<!DOCTYPE html><html lang=en><head><meta charset=utf-8><meta http-equiv=X-UA-Compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><link rel=icon href=/favicon.ico><title>Monibuca Instance Manager</title><script src=ajax.js></script><link href=/css/app.200d2f8f.css rel=preload as=style><link href=/css/chunk-vendors.22ebf426.css rel=preload as=style><link href=/js/app.9b5890f5.js rel=preload as=script><link href=/js/chunk-vendors.f701a5a3.js rel=preload as=script><link href=/css/chunk-vendors.22ebf426.css rel=stylesheet><link href=/css/app.200d2f8f.css rel=stylesheet></head><body><noscript><strong>We're sorry but pm doesn't work properly without JavaScript enabled. Please enable it to continue.</strong></noscript><div id=app></div><script src=/js/chunk-vendors.f701a5a3.js></script><script src=/js/app.9b5890f5.js></script></body></html>

2
pm/dist/js/app.9b5890f5.js vendored Normal file

File diff suppressed because one or more lines are too long

1
pm/dist/js/app.9b5890f5.js.map vendored Normal file

File diff suppressed because one or more lines are too long

51
pm/dist/js/chunk-vendors.f701a5a3.js vendored Normal file

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

11593
pm/package-lock.json generated Normal file

File diff suppressed because it is too large Load Diff

50
pm/package.json Normal file
View File

@@ -0,0 +1,50 @@
{
"name": "pm",
"version": "0.1.0",
"private": true,
"scripts": {
"serve": "vue-cli-service serve",
"build": "vue-cli-service build",
"lint": "vue-cli-service lint"
},
"dependencies": {
"@iarna/toml": "^2.2.3",
"core-js": "^3.4.4",
"view-design": "^4.0.0",
"vue": "^2.6.10",
"vue-router": "^3.1.3",
"vuex": "^3.1.2"
},
"devDependencies": {
"@vue/cli-plugin-babel": "^4.1.0",
"@vue/cli-plugin-eslint": "^4.1.0",
"@vue/cli-plugin-router": "^4.1.0",
"@vue/cli-plugin-vuex": "^4.1.0",
"@vue/cli-service": "^4.1.0",
"babel-eslint": "^10.0.3",
"eslint": "^5.16.0",
"eslint-plugin-vue": "^5.0.0",
"less": "^3.0.4",
"less-loader": "^5.0.0",
"vue-cli-plugin-iview": "^2.0.0",
"vue-template-compiler": "^2.6.10"
},
"eslintConfig": {
"root": true,
"env": {
"node": true
},
"extends": [
"plugin:vue/essential",
"eslint:recommended"
],
"rules": {},
"parserOptions": {
"parser": "babel-eslint"
}
},
"browserslist": [
"> 1%",
"last 2 versions"
]
}

535
pm/public/ajax.js Normal file
View File

@@ -0,0 +1,535 @@
// a simple ajax
!(function () {
var jsonType = 'application/json';
var htmlType = 'text/html';
var xmlTypeRE = /^(?:text|application)\/xml/i;
var blankRE = /^\s*$/; // \s
/*
* default setting
* */
var _settings = {
type: "GET",
beforeSend: noop,
success: noop,
error: noop,
complete: noop,
context: null,
xhr: function () {
return new window.XMLHttpRequest();
},
accepts: {
json: jsonType,
xml: 'application/xml, text/xml',
html: htmlType,
text: 'text/plain'
},
crossDomain: false,
timeout: 0,
username: null,
password: null,
processData: true,
promise: noop
};
function noop() {
}
var ajax = function (options) {
//
var settings = extend({}, options || {});
//
for (var key in _settings) {
if (settings[key] === undefined) {
settings[key] = _settings[key];
}
}
//
try {
var q = {};
var promise = new Promise(function (resolve, reject) {
q.resolve = resolve;
q.reject = reject;
});
promise.resolve = q.resolve;
promise.reject = q.reject;
settings.promise = promise;
}
catch (e) {
//
settings.promise = {
resolve: noop,
reject: noop
};
}
//
if (!settings.crossDomain) {
settings.crossDomain = /^([\w-]+:)?\/\/([^\/]+)/.test(settings.url) && RegExp.$2 !== window.location.href;
}
var dataType = settings.dataType;
// jsonp
if (dataType === 'jsonp') {
//
var hasPlaceholder = /=\?/.test(settings.url);
if (!hasPlaceholder) {
var jsonpCallback = (settings.jsonp || 'callback') + '=?';
settings.url = appendQuery(settings.url, jsonpCallback)
}
return JSONP(settings);
}
// url
if (!settings.url) {
settings.url = window.location.toString();
}
//
serializeData(settings);
var mime = settings.accepts[dataType]; // mime
var baseHeader = {}; // header
var protocol = /^([\w-]+:)\/\//.test(settings.url) ? RegExp.$1 : window.location.protocol; // protocol
var xhr = _settings.xhr();
var abortTimeout;
// X-Requested-With header
// For cross-domain requests, seeing as conditions for a preflight are
// akin to a jigsaw puzzle, we simply never set it to be sure.
// (it can always be set on a per-request basis or even using ajaxSetup)
// For same-domain requests, won't change header if already provided.
if (!settings.crossDomain && !baseHeader['X-Requested-With']) {
baseHeader['X-Requested-With'] = 'XMLHttpRequest';
}
// mime
if (mime) {
//
baseHeader['Accept'] = mime;
if (mime.indexOf(',') > -1) {
mime = mime.split(',', 2)[0]
}
//
xhr.overrideMimeType && xhr.overrideMimeType(mime);
}
// contentType
if (settings.contentType || (settings.data && settings.type.toUpperCase() !== 'GET')) {
baseHeader['Content-Type'] = (settings.contentType || 'application/x-www-form-urlencoded; charset=UTF-8');
}
// headers
settings.headers = extend(baseHeader, settings.headers || {});
// on ready state change
xhr.onreadystatechange = function () {
// readystate
if (xhr.readyState === 4) {
clearTimeout(abortTimeout);
var result;
var error = false;
//
if ((xhr.status >= 200 && xhr.status < 300) || xhr.status === 304) {
dataType = dataType || mimeToDataType(xhr.getResponseHeader('content-type'));
result = xhr.responseText;
try {
// xml
if (dataType === 'xml') {
result = xhr.responseXML;
}
// json
else if (dataType === 'json') {
result = blankRE.test(result) ? null : JSON.parse(result);
}
}
catch (e) {
error = e;
}
if (error) {
ajaxError(error, 'parseerror', xhr, settings);
}
else {
ajaxSuccess(result, xhr, settings);
}
}
else {
ajaxError(null, 'error', xhr, settings);
}
}
};
// async
var async = 'async' in settings ? settings.async : true;
// open
xhr.open(settings.type, settings.url, async, settings.username, settings.password);
// xhrFields
if (settings.xhrFields) {
for (var name in settings.xhrFields) {
xhr[name] = settings.xhrFields[name];
}
}
// Override mime type if needed
if (settings.mimeType && xhr.overrideMimeType) {
xhr.overrideMimeType(settings.mimeType);
}
// set request header
for (var name in settings.headers) {
// Support: IE<9
// IE's ActiveXObject throws a 'Type Mismatch' exception when setting
// request header to a null-value.
//
// To keep consistent with other XHR implementations, cast the value
// to string and ignore `undefined`.
if (settings.headers[name] !== undefined) {
xhr.setRequestHeader(name, settings.headers[name] + "");
}
}
// before send
if (ajaxBeforeSend(xhr, settings) === false) {
xhr.abort();
return false;
}
// timeout
if (settings.timeout > 0) {
abortTimeout = window.setTimeout(function () {
xhr.onreadystatechange = noop;
xhr.abort();
ajaxError(null, 'timeout', xhr, settings);
}, settings.timeout);
}
// send
xhr.send(settings.data ? settings.data : null);
return settings.promise;
};
/*
* method get
* */
ajax.get = function (url, data, success, dataType) {
if (isFunction(data)) {
dataType = dataType || success;
success = data;
data = undefined;
}
return ajax({
url: url,
data: data,
success: success,
dataType: dataType
});
};
/*
* method post
*
* dataType:
* */
ajax.post = function (url, data, success, dataType) {
if (isFunction(data)) {
dataType = dataType || success;
success = data;
data = undefined;
}
return ajax({
type: 'POST',
url: url,
data: data,
success: success,
dataType: dataType
})
};
/*
* method getJSON
* */
ajax.getJSON = function (url, data, success) {
if (isFunction(data)) {
success = data;
data = undefined;
}
return ajax({
url: url,
data: data,
success: success,
dataType: 'json'
})
};
/*
* method ajaxSetup
* */
ajax.ajaxSetup = function (target, settings) {
return settings ? extend(extend(target, _settings), settings) : extend(_settings, target);
};
/*
* utils
*
* */
// triggers and extra global event ajaxBeforeSend that's like ajaxSend but cancelable
function ajaxBeforeSend(xhr, settings) {
var context = settings.context;
//
if (settings.beforeSend.call(context, xhr, settings) === false) {
return false;
}
}
// ajax success
function ajaxSuccess(data, xhr, settings) {
var context = settings.context;
var status = 'success';
settings.success.call(context, data, status, xhr);
settings.promise.resolve(data, status, xhr);
ajaxComplete(status, xhr, settings);
}
// status: "success", "notmodified", "error", "timeout", "abort", "parsererror"
function ajaxComplete(status, xhr, settings) {
var context = settings.context;
settings.complete.call(context, xhr, status);
}
// type: "timeout", "error", "abort", "parsererror"
function ajaxError(error, type, xhr, settings) {
var context = settings.context;
settings.error.call(context, xhr, type, error);
settings.promise.reject(xhr, type, error);
ajaxComplete(type, xhr, settings);
}
// jsonp
/*
* tks: https://www.cnblogs.com/rubylouvre/archive/2011/02/13/1953087.html
* */
function JSONP(options) {
//
var callbackName = options.jsonpCallback || 'jsonp' + (new Date().getTime());
var script = window.document.createElement('script');
var abort = function () {
// 设置 window.xxx = noop
if (callbackName in window) {
window[callbackName] = noop;
}
};
var xhr = {abort: abort};
var abortTimeout;
var head = window.document.getElementsByTagName('head')[0] || window.document.documentElement;
// ie8+
script.onerror = function (error) {
_error(error);
};
function _error(error) {
window.clearTimeout(abortTimeout);
xhr.abort();
ajaxError(error.type, xhr, error.type, options);
_removeScript();
}
window[callbackName] = function (data) {
window.clearTimeout(abortTimeout);
ajaxSuccess(data, xhr, options);
_removeScript();
};
//
serializeData(options);
script.src = options.url.replace(/=\?/, '=' + callbackName);
//
script.src = appendQuery(script.src, '_=' + (new Date()).getTime());
//
script.async = true;
// script charset
if (options.scriptCharset) {
script.charset = options.scriptCharset;
}
//
head.insertBefore(script, head.firstChild);
//
if (options.timeout > 0) {
abortTimeout = window.setTimeout(function () {
xhr.abort();
ajaxError('timeout', xhr, 'timeout', options);
_removeScript();
}, options.timeout);
}
// remove script
function _removeScript() {
if (script.clearAttributes) {
script.clearAttributes();
} else {
script.onload = script.onreadystatechange = script.onerror = null;
}
if (script.parentNode) {
script.parentNode.removeChild(script);
}
//
script = null;
delete window[callbackName];
}
return options.promise;
}
// mime to data type
function mimeToDataType(mime) {
return mime && (mime === htmlType ? 'html' : mime === jsonType ? 'json' : xmlTypeRE.test(mime) && 'xml') || 'text'
}
// append query
function appendQuery(url, query) {
return (url + '&' + query).replace(/[&?]{1,2}/, '?');
}
// serialize data
function serializeData(options) {
// formData
if (isObject(options) && !isFormData(options.data) && options.processData) {
options.data = param(options.data);
}
if (options.data && (!options.type || options.type.toUpperCase() === 'GET')) {
options.url = appendQuery(options.url, options.data);
}
}
// serialize
function serialize(params, obj, traditional, scope) {
var _isArray = isArray(obj);
for (var key in obj) {
var value = obj[key];
if (scope) {
key = traditional ? scope : scope + '[' + (_isArray ? '' : key) + ']';
}
// handle data in serializeArray format
if (!scope && _isArray) {
params.add(value.name, value.value);
}
else if (traditional ? _isArray(value) : isObject(value)) {
serialize(params, value, traditional, key);
}
else {
params.add(key, value);
}
}
}
// param
function param(obj, traditional) {
var params = [];
//
params.add = function (k, v) {
this.push(encodeURIComponent(k) + '=' + encodeURIComponent(v));
};
serialize(params, obj, traditional);
return params.join('&').replace('%20', '+');
}
// extend
function extend(target) {
var slice = Array.prototype.slice;
var args = slice.call(arguments, 1);
//
for (var i = 0, length = args.length; i < length; i++) {
var source = args[i] || {};
for (var key in source) {
if (source.hasOwnProperty(key) && source[key] !== undefined) {
target[key] = source[key];
}
}
}
return target;
}
// is object
function isObject(obj) {
var type = typeof obj;
return type === 'function' || type === 'object' && !!obj;
}
// is formData
function isFormData(obj) {
return obj instanceof FormData;
}
// is array
function isArray(value) {
return Object.prototype.toString.call(value) === "[object Array]";
}
// is function
function isFunction(value) {
return typeof value === "function";
}
// browser
window.ajax = ajax;
})();

BIN
pm/public/favicon.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 4.2 KiB

18
pm/public/index.html Normal file
View File

@@ -0,0 +1,18 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width,initial-scale=1.0">
<link rel="icon" href="<%= BASE_URL %>favicon.ico">
<title>Monibuca Instance Manager</title>
<script src="ajax.js"></script>
</head>
<body>
<noscript>
<strong>We're sorry but pm doesn't work properly without JavaScript enabled. Please enable it to continue.</strong>
</noscript>
<div id="app"></div>
<!-- built files will be auto injected -->
</body>
</html>

17
pm/src/App.vue Normal file
View File

@@ -0,0 +1,17 @@
<template>
<div id="app">
<router-view/>
</div>
</template>
<script>
export default {
name: 'app',
components: {
}
}
</script>
<style>
</style>

BIN
pm/src/assets/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 6.7 KiB

View File

@@ -0,0 +1,65 @@
<template>
<Modal v-bind="$attrs" v-on="$listeners" :title="info.Path">
<Steps :current="currentStep" size="small" :status="status">
<Step title="解析请求"></Step>
<Step title="创建目录"></Step>
<Step title="写入文件"></Step>
<Step title="执行go mod init"></Step>
<Step title="执行go build"></Step>
<Step title="启动实例"></Step>
<Step title="完成"></Step>
</Steps>
<div>
<pre>{{log}}</pre>
</div>
<div slot="footer">
<Checkbox v-model="clearDir">安装前清空目录</Checkbox>
<Button type="primary" @click="start" :loading="status=='process'">开始</Button>
</div>
</Modal>
</template>
<script>
let eventSource = null;
export default {
name: "CreateInstance",
props: {
info: Object
},
methods: {
start() {
this.status = "process";
eventSource = new EventSource(
"/instance/create?info=" +
JSON.stringify(this.info) +
(this.clearDir ? "&clear=true" : "")
);
eventSource.onopen = () => (this.log = "");
eventSource.onmessage = evt => {
this.log += evt.data + "\n";
if (evt.data == "success") {
this.status = "finish";
eventSource.close();
}
};
eventSource.addEventListener("exception", evt => {
this.log += evt.data + "\n";
this.status = "error";
eventSource.close();
});
eventSource.addEventListener("step", evt => {
let [step, msg] = evt.data.split(":");
this.currentStep = step | 0;
this.log += msg + "\n";
});
},
},
data() {
return { clearDir: true, currentStep: 0, log: "", status: "wait" };
}
};
</script>
<style scoped>
</style>

View File

@@ -0,0 +1,108 @@
<template>
<List border>
<ListItem v-for="item in instances" :key="item.Name">
<ListItemMeta :title="item.Name" :description="item.Path"></ListItemMeta>
<template v-if="hasGateway(item)">
{{item.Info}}
</template>
<template slot="action">
<li v-if="hasGateway(item)" @click="window.open(gateWayHref(item),'_blank')">
<Icon type="md-browsers"/>
管理界面
</li>
<li @click="restart(item)">
<Icon type="ios-refresh"/>
重启
</li>
<li @click="shutdown(item)">
<Icon type="ios-power"/>
关闭
</li>
</template>
</ListItem>
<Modal v-model="showRestart">
<Checkbox v-model="update">go get -u</Checkbox>
<Checkbox v-model="build">go build</Checkbox>
</Modal>
</List>
</template>
<script>
import toml from "@iarna/toml"
export default {
name: "InstanceList",
data() {
return {instances: {}, showRestart: false, update: false, build: false}
},
mounted() {
window.ajax.getJSON("/instance/list").then(x => {
for (let name in x) {
let instance = x[name]
instance.Config = toml.parse(instance.Config)
if (this.hasGateway(instance)) {
window.ajax.getJSON("//" + this.gateWayHref(instance) + "/api/sysInfo").then(x => {
instance.Info = "引擎版本:" + x.Version + "启动时间:" + x.StartTime
}).catch(() => {
instance.Info = "无法访问实例"
})
} else {
instance.Info = "实例未配置网关插件"
}
}
this.instances = x;
});
}, methods: {
hasGateway(item) {
return item.Config.Plugins.hasOwnProperty("GateWay")
},
gateWayHref(item) {
return location.hostname + ":" + item.Config.Plugins.GateWay.split(":").pop()
},
restart(item) {
const msg = this.$Message.loading({
content: 'restart ' + item.Name + '...',
duration: 0
});
let arg = item.Name
if (this.update) {
arg += "&update=true"
}
if (this.build) {
arg += "&build=true"
}
const es = new EventSource("/instance/restart?instance=" + arg)
es.onmessage = evt => {
if (evt.data == "success") {
this.$Message.success("重启成功!")
msg()
} else {
this.$Message.info(evt.data)
}
}
es.addEventListener("failed", evt => {
this.$Message.error(evt.data)
msg()
})
es.onerror = e => {
if (e) this.$Message.error(e);
msg()
es.close()
}
},
shutdown(item) {
window.ajax.get("/instance/shutdown?instance=" + item.Name).then(x => {
if (x == "success") {
this.$Message.success("已关闭实例")
} else {
this.$Message.error(x)
}
})
},
}
}
</script>
<style scoped>
</style>

13
pm/src/main.js Normal file
View File

@@ -0,0 +1,13 @@
import Vue from 'vue'
import App from './App.vue'
import router from './router'
import store from './store'
import './plugins/iview.js'
Vue.config.productionTip = false
new Vue({
router,
store,
render: h => h(App)
}).$mount('#app')

6
pm/src/plugins/iview.js Normal file
View File

@@ -0,0 +1,6 @@
import Vue from 'vue'
import ViewUI from 'view-design'
Vue.use(ViewUI)
import 'view-design/dist/styles/iview.css'

20
pm/src/router/index.js Normal file
View File

@@ -0,0 +1,20 @@
import Vue from 'vue'
import VueRouter from 'vue-router'
import Instances from "../views/Instances"
Vue.use(VueRouter)
const routes = [
{
path: '/',
name: 'instances',
component: Instances
}
]
const router = new VueRouter({
mode: 'history',
base: process.env.BASE_URL,
routes
})
export default router

15
pm/src/store/index.js Normal file
View File

@@ -0,0 +1,15 @@
import Vue from 'vue'
import Vuex from 'vuex'
Vue.use(Vuex)
export default new Vuex.Store({
state: {
},
mutations: {
},
actions: {
},
modules: {
}
})

235
pm/src/views/Instances.vue Normal file
View File

@@ -0,0 +1,235 @@
<template>
<Layout class="layout">
<Header style=" background:unset;text-align: center;">Monibuca 实例管理器</Header>
<Content class="content">
<Tabs value="name1">
<TabPane label="实例" name="name1">
<InstanceList></InstanceList>
</TabPane>
<TabPane label="创建" name="name2">
<Steps :current="createStep">
<Step title="选择目录" content="选择创建实例的目录"></Step>
<Step title="选插件" content="选择要启用的插件"></Step>
<Step title="完成" content="完成实例创建"></Step>
</Steps>
<div style="margin:50px;width:auto">
<i-input v-model="createPath" v-if="createStep==0">
<Button slot="prepend" icon="md-arrow-round-up" @click="goUp"></Button>
</i-input>
<List v-else-if="createStep==1" border>
<ListItem v-for="(item,name) in plugins" :key="name">
<ListItemMeta :title="name" :description="item.Path"></ListItemMeta>
{{item.Config}}
<template slot="action">
<li @click="removePlugin(name)">
<Icon type="ios-trash"/>
移除
</li>
</template>
</ListItem>
</List>
<div v-else>
<h3>实例名称</h3>
<i-input
v-model="instanceName"
:placeholder="createPath.split('/').pop()"
></i-input>
<h4>安装路径</h4>
<div>
<pre>{{createPath}}</pre>
</div>
<h4>启用的插件</h4>
<div>
<pre>{{pluginStr}}</pre>
</div>
<h4>配置文件</h4>
<div>
<pre>{{configStr}}</pre>
</div>
</div>
<ButtonGroup style="display:table;margin:50px auto;">
<Button
size="large"
type="primary"
@click="createStep--"
v-if="createStep!=0"
>
<Icon type="ios-arrow-back"></Icon>
上一步
</Button>
<Button
size="large"
type="success"
@click="showAddPlugin=true"
v-if="createStep==1"
>
+
添加插件
</Button>
<Button
size="large"
type="primary"
@click="createStep++"
v-if="createStep!=2"
>
下一步
<Icon type="ios-arrow-forward"></Icon>
</Button>
<Button size="large" type="success" @click="createInstance" v-else>开始创建</Button>
</ButtonGroup>
</div>
</TabPane>
<TabPane label="导入" name="name3"></TabPane>
</Tabs>
</Content>
<Modal v-model="showAddPlugin" title="添加Plugin" @on-ok="addPlugin">
<Form :model="formPlugin" label-position="top">
<FormItem label="插件名称">
<i-input v-model="formPlugin.Name" placeholder="插件名称必须和插件注册时的名称一致"></i-input>
</FormItem>
<FormItem label="插件包地址">
<i-input v-model="formPlugin.Path">
<Button slot="append" @click="showBuiltinPlugin=true">内置插件</Button>
</i-input>
</FormItem>
<Alert
type="show-icon"
v-if="!Object.values(builtinPlugins).includes(formPlugin.Path)"
>
如果该插件是私有仓库请到服务器上输入echo "machine {{privateHost}} login 用户名 password 密码" >> ~/.netrc
并且添加环境变量GOPRIVATE={{privateHost}}
</Alert>
<FormItem label="插件配置信息">
<i-input type="textarea" v-model="formPlugin.Config" placeholder="请输入toml格式"></i-input>
</FormItem>
</Form>
</Modal>
<Modal v-model="showBuiltinPlugin">
<List>
<ListItem v-for="(item,name) in builtinPlugins" :key="name">
<ListItemMeta :title="name" :description="item"></ListItemMeta>
<template slot="action">
<li @click="addBuiltin(name,item)">
<Icon type="ios-add"/>
添加
</li>
</template>
</ListItem>
</List>
</Modal>
<CreateInstance v-model="showCreate" :info="createInfo"></CreateInstance>
</Layout>
</template>
<script>
import CreateInstance from "../components/CreateInstance";
import InstanceList from "../components/InstanceList";
export default {
components: {
CreateInstance,InstanceList
},
data() {
return {
instanceName: "",
createStep: 0,
showCreate: false,
createInfo: null,
createPath: "/opt/monibuca",
plugins: {},
showAddPlugin: false,
formPlugin: {},
showBuiltinPlugin: false,
builtinPlugins: {
Auth: "github.com/langhuihui/monibuca/plugins/auth",
Cluster: "github.com/langhuihui/monibuca/plugins/cluster",
GateWay: "github.com/langhuihui/monibuca/plugins/gateway",
HDL: "github.com/langhuihui/monibuca/plugins/HDL",
Jessica: "github.com/langhuihui/monibuca/plugins/jessica",
QoS: "github.com/langhuihui/monibuca/plugins/QoS",
RecordFlv: "github.com/langhuihui/monibuca/plugins/record",
RTMP: "github.com/langhuihui/monibuca/plugins/rtmp"
},
defaultConfig: {
Auth: 'Key = "www.monibuca.com"',
RecordFlv: 'Path="./resource"',
QoS: 'Suffix = ["high","medium","low"]',
Cluster: 'Master = "localhost:2019"\nListenAddr = ":2019"',
GateWay: 'ListenAddr = ":8081"',
RTMP: 'ListenAddr = ":1935"',
Jessica: 'ListenAddr = ":8080"',
HDL: 'ListenAddr = ":2020"'
}
};
},
computed: {
pluginStr() {
return Object.values(this.plugins)
.map(x => x.Path)
.join("\n");
},
configStr() {
return Object.values(this.plugins)
.map(
x => `[Plugins.${x.Name}]
${x.Config || ""}`
)
.join("\n");
},
privateHost() {
return (
(this.formPlugin.Path && this.formPlugin.Path.split("/")[0]) ||
"仓库域名"
);
}
},
methods: {
goUp() {
let paths = this.createPath.split("/");
paths.pop();
this.createPath = paths.join("/");
},
createInstance() {
this.showCreate = true;
this.createInfo = {
Name: this.instanceName || this.createPath.split("/").pop(),
Path: this.createPath,
Plugins: Object.values(this.plugins).map(x => x.Path),
Config: this.configStr
};
},
addPlugin() {
this.plugins[this.formPlugin.Name] = this.formPlugin;
this.formPlugin = {};
},
removePlugin(name) {
delete this.plugins[name];
this.$forceUpdate();
},
addBuiltin(name, item) {
this.formPlugin.Name = name;
this.formPlugin.Path = item;
this.formPlugin.Config = this.defaultConfig[name];
this.showBuiltinPlugin = false;
},
}
};
</script>
<style>
.content {
background: white;
}
pre {
white-space: pre-wrap;
word-wrap: break-word;
}
.ivu-tabs .ivu-tabs-tabpane {
padding: 20px;
}
</style>

18
slave.toml Normal file
View File

@@ -0,0 +1,18 @@
# [Plugins.HDL]
# ListenAddr = ":2020"
[Plugins.Jessica]
ListenAddr = ":8082"
[Plugins.RTMP]
ListenAddr = ":1936"
[Plugins.GateWay]
ListenAddr = ":8083"
[Plugins.Cluster]
Master = "localhost:2019"
#ListenAddr = ":2019"
#
#[Plugins.Auth]
#Key="www.monibuca.com"
# [Plugins.RecordFlv]
# Path="./resource"
# [Plugins.QoS]
# Suffix = ["high","medium","low"]