配置合并和覆盖

This commit is contained in:
dexter
2022-02-08 19:31:50 +08:00
parent 1fd7b14bd7
commit f9d0567f45
20 changed files with 530 additions and 350 deletions

View File

@@ -2,6 +2,7 @@ package codec
import (
"errors"
)
const (
@@ -134,14 +135,7 @@ type ADTSVariableHeader struct {
// NumberOfRawDataBlockInFrame, 表示ADTS帧中有number_of_raw_data_blocks_in_frame + 1个AAC原始帧
// 所以说number_of_raw_data_blocks_in_frame == 0 表示说ADTS帧中有一个AAC数据块并不是说没有。(一个AAC原始帧包含一段时间内1024个采样及相关数据)
func ADTSToAudioSpecificConfig(data []byte) []byte {
profile := ((data[2] & 0xc0) >> 6) + 1
sampleRate := (data[2] & 0x3c) >> 2
channel := ((data[2] & 0x1) << 2) | ((data[3] & 0xc0) >> 6)
config1 := (profile << 3) | ((sampleRate & 0xe) >> 1)
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
return []byte{0xAF, 0x00, config1, config2}
}
func AudioSpecificConfigToADTS(asc AudioSpecificConfig, rawDataLength int) (adts ADTS, adtsByte []byte, err error) {
if asc.ChannelConfiguration > 8 || asc.FrameLengthFlag > 13 {
err = errors.New("Reserved field.")
@@ -218,3 +212,5 @@ func ParseRTPAAC(payload []byte) (result [][]byte) {
}
return
}

View File

@@ -108,3 +108,14 @@ func ReadFLVTag(r io.Reader) (t byte, timestamp uint32, payload []byte, err erro
}
return
}
func VideoAVCC2FLV(avcc net.Buffers, ts uint32) (flv net.Buffers) {
b := util.Buffer(make([]byte, 0, 15))
b.WriteByte(FLV_TAG_TYPE_VIDEO)
dataSize := util.SizeOfBuffers(avcc)
b.WriteUint24(uint32(dataSize))
b.WriteUint24(ts)
b.WriteByte(byte(ts >> 24))
b.WriteUint24(0)
return append(append(append(flv, b), avcc...), util.PutBE(b.Malloc(4), dataSize+11))
}

View File

@@ -4,11 +4,12 @@ import (
"net"
"time"
"github.com/Monibuca/engine/v4/util"
"github.com/Monibuca/engine/v4/codec"
"github.com/pion/rtp"
)
type NALUSlice net.Buffers
// type H264Slice NALUSlice
// type H265Slice NALUSlice
@@ -82,14 +83,8 @@ func (av *AVFrame[T]) AppendRaw(raw ...T) {
av.Raw = append(av.Raw, raw...)
}
func (av *AVFrame[T]) FillFLV(t byte, ts uint32) {
b := util.Buffer(make([]byte, 0, 15))
b.WriteByte(t)
dataSize := util.SizeOfBuffers(av.AVCC)
b.WriteUint24(uint32(dataSize))
b.WriteUint24(ts)
b.WriteByte(byte(ts >> 24))
b.WriteUint24(0)
av.FLV = append(append(append(av.FLV, b), av.AVCC...), util.PutBE(b.Malloc(4), dataSize+11))
av.FLV = codec.VideoAVCC2FLV(av.AVCC, ts)
av.FLV[0][0] = t
}
func (av *AVFrame[T]) AppendAVCC(avcc ...[]byte) {
av.AVCC = append(av.AVCC, avcc...)
@@ -151,3 +146,8 @@ func (avcc AVCCFrame) AudioCodecID() byte {
// }
// return
// }
type DecoderConfiguration[T RawSlice] struct {
AVCC T
Raw T
FLV net.Buffers
}

207
config.go
View File

@@ -1,207 +0,0 @@
package engine
import (
"context"
"log"
"net"
"net/http"
"reflect"
"runtime"
"strings"
"time"
"golang.org/x/sync/errgroup"
)
type Second int
func (s Second) Duration() time.Duration {
return time.Duration(s) * time.Second
}
type PluginConfig interface {
Update(Config)
}
type TCPPluginConfig interface {
PluginConfig
context.Context
ServeTCP(*net.TCPConn)
}
type HTTPPluginConfig interface {
PluginConfig
context.Context
http.Handler
}
type Config map[string]any
func (config Config) Unmarshal(s any) {
var el reflect.Value
if v, ok := s.(reflect.Value); ok {
el = v
} else {
el = reflect.ValueOf(s).Elem()
}
t := el.Type()
for k, v := range config {
var fv reflect.Value
value := reflect.ValueOf(v)
if f, ok := t.FieldByName(strings.ToUpper(k[:1]) + k[1:]); ok {
// 兼容首字母大写的属性
fv = el.FieldByName(f.Name)
} else if f, ok := t.FieldByName(strings.ToUpper(k)); ok {
// 兼容全部大写的属性
fv = el.FieldByName(f.Name)
} else {
continue
}
if t.Kind() == reflect.Slice {
l := value.Len()
s := reflect.MakeSlice(t.Elem(), l, value.Cap())
for i := 0; i < l; i++ {
fv := value.Field(i)
if fv.Type() == reflect.TypeOf(config) {
fv.FieldByName("Unmarshal").Call([]reflect.Value{s.Field(i)})
} else {
s.Field(i).Set(fv)
}
}
fv.Set(s)
} else if child, ok := v.(Config); ok {
child.Unmarshal(fv)
} else {
fv.Set(value)
}
}
}
func (config Config) Assign(source Config) {
for k, v := range source {
m, isMap := v.(map[string]any)
if _, ok := config[k]; !ok || !isMap {
config[k] = v
} else {
Config(config[k].(map[string]any)).Assign(m)
}
}
}
func (config Config) Has(key string) (ok bool) {
if config == nil {
return
}
_, ok = config[key]
return
}
type TCPConfig struct {
ListenAddr string
ListenNum int //同时并行监听数量0为CPU核心数量
}
func (tcp *TCPConfig) listen(l net.Listener, handler func(*net.TCPConn)) {
var tempDelay time.Duration
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Printf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay)
time.Sleep(tempDelay)
continue
}
return
}
conn.(*net.TCPConn).SetNoDelay(false)
tempDelay = 0
go handler(conn.(*net.TCPConn))
}
}
func (tcp *TCPConfig) Listen(plugin TCPPluginConfig) error {
l, err := net.Listen("tcp", tcp.ListenAddr)
if err != nil {
return err
}
count := tcp.ListenNum
if count == 0 {
count = runtime.NumCPU()
}
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
<-plugin.Done()
return l.Close()
}
type HTTPConfig struct {
ListenAddr string
ListenAddrTLS string
CertFile string
KeyFile string
CORS bool //是否自动添加CORS头
}
// ListenAddrs Listen http and https
func (config *HTTPConfig) Listen(plugin HTTPPluginConfig) error {
var g errgroup.Group
if config.ListenAddrTLS != "" {
g.Go(func() error {
return http.ListenAndServeTLS(config.ListenAddrTLS, config.CertFile, config.KeyFile, plugin)
})
}
if config.ListenAddr != "" {
g.Go(func() error { return http.ListenAndServe(config.ListenAddr, plugin) })
}
g.Go(func() error {
<-plugin.Done()
return plugin.Err()
})
return g.Wait()
}
type PublishConfig struct {
PubAudio bool
PubVideo bool
KillExit bool // 是否踢掉已经存在的发布者
PublishTimeout Second // 发布无数据超时
WaitCloseTimeout Second // 延迟自动关闭(无订阅时)
}
type SubscribeConfig struct {
SubAudio bool
SubVideo bool
IFrameOnly bool // 只要关键帧
WaitTimeout Second // 等待流超时
}
type PullConfig struct {
AutoReconnect bool // 自动重连
PullOnStart bool // 启动时拉流
PullOnSubscribe bool // 订阅时自动拉流
AutoPullList map[string]string // 自动拉流列表
}
type PushConfig struct {
AutoPushList map[string]string // 自动推流列表
}
type EngineConfig struct {
*http.ServeMux
context.Context
Publish PublishConfig
Subscribe SubscribeConfig
HTTP HTTPConfig
RTPReorder bool
EnableAVCC bool //启用AVCC格式rtmp协议使用
EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用
EnableFLV bool //开启FLV格式hdl协议使用
}

151
config/config.go Normal file
View File

@@ -0,0 +1,151 @@
package config
import (
"net"
"net/http"
"reflect"
"strings"
"time"
)
type Config map[string]any
type Second int
func (s Second) Duration() time.Duration {
return time.Duration(s) * time.Second
}
type Plugin interface {
Update(Config)
}
type TCPPlugin interface {
Plugin
ServeTCP(*net.TCPConn)
}
type HTTPPlugin interface {
Plugin
http.Handler
}
func (config Config) Unmarshal(s any) {
if s == nil {
return
}
var el reflect.Value
if v, ok := s.(reflect.Value); ok {
el = v
} else {
el = reflect.ValueOf(s)
}
if el.Kind() == reflect.Pointer {
el = el.Elem()
}
t := el.Type()
//字段映射,小写对应的大写
nameMap := make(map[string]string)
for i, j := 0, t.NumField(); i < j; i++ {
name := t.Field(i).Name
nameMap[strings.ToLower(name)] = name
}
for k, v := range config {
value := reflect.ValueOf(v)
// 需要被写入的字段
fv := el.FieldByName(nameMap[k])
if t.Kind() == reflect.Slice {
l := value.Len()
s := reflect.MakeSlice(t.Elem(), l, value.Cap())
for i := 0; i < l; i++ {
fv := value.Field(i)
if fv.Type() == reflect.TypeOf(config) {
fv.FieldByName("Unmarshal").Call([]reflect.Value{s.Field(i)})
} else {
s.Field(i).Set(fv)
}
}
fv.Set(s)
} else if child, ok := v.(Config); ok {
child.Unmarshal(fv)
} else {
fv.Set(value)
}
}
}
// 覆盖配置
func (config Config) Assign(source Config) {
for k, v := range source {
switch m := config[k].(type) {
case Config:
m.Assign(v.(Config))
default:
config[k] = v
}
}
}
// 合并配置,不覆盖
func (config Config) Merge(source Config) {
for k, v := range source {
if _, ok := config[k]; !ok {
switch m := config[k].(type) {
case Config:
m.Merge(v.(Config))
default:
config[k] = v
}
}
}
}
func (config Config) Set(key string, value any) {
config[strings.ToLower(key)] = value
}
func (config Config) Has(key string) (ok bool) {
_, ok = config[strings.ToLower(key)]
return
}
func (config Config) HasChild(key string) (ok bool) {
_, ok = config[strings.ToLower(key)].(Config)
return ok
}
func (config Config) GetChild(key string) Config {
return config[strings.ToLower(key)].(Config)
}
func Struct2Config(s any) (config Config) {
var t reflect.Type
var v reflect.Value
if vv, ok := s.(reflect.Value); ok {
v = vv
t = vv.Type()
} else {
t = reflect.TypeOf(s)
v = reflect.ValueOf(s)
if t.Kind() == reflect.Pointer {
v = v.Elem()
t = t.Elem()
}
}
for i, j := 0, t.NumField(); i < j; i++ {
ft := t.Field(i)
switch ft.Type.Kind() {
case reflect.Struct:
config[ft.Name] = Struct2Config(v.Field(i))
case reflect.Slice:
fallthrough
default:
if config == nil {
config = make(Config)
}
reflect.ValueOf(config).SetMapIndex(reflect.ValueOf(strings.ToLower(ft.Name)), v.Field(i))
}
}
return
}

34
config/http.go Normal file
View File

@@ -0,0 +1,34 @@
package config
import (
"context"
"net/http"
"golang.org/x/sync/errgroup"
)
type HTTP struct {
ListenAddr string
ListenAddrTLS string
CertFile string
KeyFile string
CORS bool //是否自动添加CORS头
}
// ListenAddrs Listen http and https
func (config *HTTP) Listen(ctx context.Context, plugin HTTPPlugin) error {
var g errgroup.Group
if config.ListenAddrTLS != "" {
g.Go(func() error {
return http.ListenAndServeTLS(config.ListenAddrTLS, config.CertFile, config.KeyFile, plugin)
})
}
if config.ListenAddr != "" {
g.Go(func() error { return http.ListenAndServe(config.ListenAddr, plugin) })
}
g.Go(func() error {
<-ctx.Done()
return ctx.Err()
})
return g.Wait()
}

55
config/tcp.go Normal file
View File

@@ -0,0 +1,55 @@
package config
import (
"context"
"log"
"net"
"runtime"
"time"
)
type TCP struct {
ListenAddr string
ListenNum int //同时并行监听数量0为CPU核心数量
}
func (tcp *TCP) listen(l net.Listener, handler func(*net.TCPConn)) {
var tempDelay time.Duration
for {
conn, err := l.Accept()
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Temporary() {
if tempDelay == 0 {
tempDelay = 5 * time.Millisecond
} else {
tempDelay *= 2
}
if max := 1 * time.Second; tempDelay > max {
tempDelay = max
}
log.Printf("%s: Accept error: %v; retrying in %v", tcp.ListenAddr, err, tempDelay)
time.Sleep(tempDelay)
continue
}
return
}
conn.(*net.TCPConn).SetNoDelay(false)
tempDelay = 0
go handler(conn.(*net.TCPConn))
}
}
func (tcp *TCP) Listen(ctx context.Context, plugin TCPPlugin) error {
l, err := net.Listen("tcp", tcp.ListenAddr)
if err != nil {
return err
}
count := tcp.ListenNum
if count == 0 {
count = runtime.NumCPU()
}
for i := 0; i < count; i++ {
go tcp.listen(l, plugin.ServeTCP)
}
<-ctx.Done()
return l.Close()
}

48
config/types.go Normal file
View File

@@ -0,0 +1,48 @@
package config
type Publish struct {
PubAudio bool
PubVideo bool
KillExit bool // 是否踢掉已经存在的发布者
PublishTimeout Second // 发布无数据超时
WaitCloseTimeout Second // 延迟自动关闭(无订阅时)
}
type Subscribe struct {
SubAudio bool
SubVideo bool
IFrameOnly bool // 只要关键帧
WaitTimeout Second // 等待流超时
}
type Pull struct {
AutoReconnect bool // 自动重连
PullOnStart bool // 启动时拉流
PullOnSubscribe bool // 订阅时自动拉流
AutoPullList map[string]string // 自动拉流列表
}
type Push struct {
AutoPushList map[string]string // 自动推流列表
}
type Engine struct {
Publish
Subscribe
HTTP
RTPReorder bool
EnableAVCC bool //启用AVCC格式rtmp协议使用
EnableRTP bool //启用RTP格式rtsp、gb18181等协议使用
EnableFLV bool //开启FLV格式hdl协议使用
}
func (g *Engine) Update(override Config) {
override.Unmarshal(g)
}
var Global = &Engine{
Publish{true, true, false, 10, 10},
Subscribe{true, true, false, 10},
HTTP{ListenAddr: ":8080", CORS: true},
false, true, true, true,
}

40
http.go
View File

@@ -4,31 +4,31 @@ import (
"encoding/json"
"net/http"
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/util"
. "github.com/logrusorgru/aurora"
)
func (config *EngineConfig) Update(override Config) {
override.Unmarshal(config)
if config.Context == nil {
config.Context = Ctx
handleFunc("/sysInfo", sysInfo)
handleFunc("/closeStream", closeStream)
util.Print(Green("api server start at "), BrightBlue(config.HTTP.ListenAddr), BrightBlue(config.HTTP.ListenAddrTLS))
config.HTTP.Listen(config)
}
type GlobalConfig struct {
*http.ServeMux
*config.Engine
}
func handleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
config.HandleFunc("/api"+pattern, func(rw http.ResponseWriter, r *http.Request) {
if config.HTTP.CORS {
util.CORS(rw, r)
}
handler(rw, r)
})
func (cfg *GlobalConfig) Update(override config.Config) {
cfg.Engine.Update(override)
Engine.RawConfig = config.Struct2Config(cfg.Engine)
util.Print(Green("api server start at "), BrightBlue(cfg.ListenAddr), BrightBlue(cfg.ListenAddrTLS))
cfg.Listen(Engine, cfg)
}
func closeStream(w http.ResponseWriter, r *http.Request) {
func (config *GlobalConfig) API_sysInfo(rw http.ResponseWriter, r *http.Request) {
json.NewEncoder(rw).Encode(&struct {
Version string
StartTime string
}{Engine.Version, StartTime.Format("2006-01-02 15:04:05")})
}
func (config *GlobalConfig) API_closeStream(w http.ResponseWriter, r *http.Request) {
if streamPath := r.URL.Query().Get("stream"); streamPath != "" {
if s := Streams.Get(streamPath); s != nil {
s.Close()
@@ -40,9 +40,3 @@ func closeStream(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("no query stream"))
}
}
func sysInfo(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(&struct {
Version string
StartTime string
}{Version, StartTime.Format("2006-01-02 15:04:05")})
}

124
main.go
View File

@@ -13,6 +13,7 @@ import (
"strings"
"time" // colorable
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/util"
"github.com/google/uuid"
@@ -20,37 +21,25 @@ import (
"gopkg.in/yaml.v3"
)
var Version = "4.0.0"
var (
DefaultPublishConfig = PublishConfig{
true, true, false, 10, 10,
}
DefaultSubscribeConfig = SubscribeConfig{
true, true, false, 10,
}
config = &EngineConfig{
http.NewServeMux(),
Ctx,
DefaultPublishConfig,
DefaultSubscribeConfig,
HTTPConfig{ListenAddr: ":8080", CORS: true},
false, true, true, true,
}
// ConfigRaw 配置信息的原始数据
ConfigRaw []byte
StartTime time.Time //启动时间
Plugins = make(map[string]*Plugin) // Plugins 所有的插件配置
Ctx context.Context
settingDir string
EngineConfig = &GlobalConfig{
Engine: config.Global,
ServeMux: http.NewServeMux(),
}
Engine = InstallPlugin(EngineConfig)
)
func InstallPlugin(config PluginConfig) *Plugin {
name := strings.TrimSuffix(reflect.TypeOf(config).Elem().Name(), "Config")
func InstallPlugin[T config.Plugin](config T) *Plugin {
t := reflect.TypeOf(config).Elem()
name := strings.TrimSuffix(t.Name(), "Config")
plugin := &Plugin{
Name: name,
Config: config,
Modified: make(Config),
}
_, pluginFilePath, _, _ := runtime.Caller(1)
configDir := filepath.Dir(pluginFilePath)
@@ -65,24 +54,20 @@ func InstallPlugin(config PluginConfig) *Plugin {
return plugin
}
// Plugin 插件配置定义
// Plugin 插件信息
type Plugin struct {
context.Context `json:"-"`
context.CancelFunc `json:"-"`
Name string //插件名称
Config PluginConfig //插件配置
Config config.Plugin //插件配置
Version string //插件版本
RawConfig Config //配置的map形式方便查询
Modified Config //修改过的配置项
}
func init() {
if parts := strings.Split(util.CurrentDir(), "@"); len(parts) > 1 {
Version = parts[len(parts)-1]
}
RawConfig config.Config //配置的map形式方便查询
Modified config.Config //修改过的配置项
}
// Run 启动Monibuca引擎
func Run(ctx context.Context, configFile string) (err error) {
Ctx = ctx
Engine.Context = ctx
if err := util.CreateShutdownScript(); err != nil {
log.Print(Red("create shutdown script error:"), err)
}
@@ -95,29 +80,24 @@ func Run(ctx context.Context, configFile string) (err error) {
log.Print(Red("create dir .m7s error:"), err)
return
}
util.Print(BgGreen(Black("Ⓜ starting m7s ")), BrightBlue(Version))
var cg Config
var engineCg Config
util.Print(BgGreen(White("Ⓜ starting m7s ")))
var cg config.Config
if ConfigRaw != nil {
if err = yaml.Unmarshal(ConfigRaw, &cg); err == nil {
if cfg, ok := cg["engine"]; ok {
engineCg = cfg.(Config)
Engine.RawConfig = cg.GetChild("global")
}
}
}
go config.Update(engineCg)
Engine.registerHandler()
go EngineConfig.Update(Engine.RawConfig)
for name, config := range Plugins {
if v, ok := cg[strings.ToLower(name)]; ok {
config.RawConfig = v.(Config)
config.RawConfig = cg.GetChild(name)
config.assign()
}
config.merge()
}
UUID := uuid.NewString()
reportTimer := time.NewTimer(time.Minute)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, "http://monibuca.com:2022/report/engine", nil)
req.Header.Set("os", runtime.GOOS)
req.Header.Set("version", Version)
req.Header.Set("version", Engine.Version)
req.Header.Set("uuid", UUID)
var c http.Client
for {
@@ -132,13 +112,19 @@ func Run(ctx context.Context, configFile string) (err error) {
}
func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
if opt == nil {
return
}
var cors bool
if v, ok := opt.RawConfig["cors"]; ok {
cors = v.(bool)
} else if config.HTTP.CORS {
} else if EngineConfig.CORS {
cors = true
}
config.HandleFunc("/"+strings.ToLower(opt.Name)+pattern, func(rw http.ResponseWriter, r *http.Request) {
if opt != Engine {
pattern = "/" + strings.ToLower(opt.Name) + pattern
}
Engine.HandleFunc(pattern, func(rw http.ResponseWriter, r *http.Request) {
if cors {
util.CORS(rw, r)
}
@@ -147,11 +133,16 @@ func (opt *Plugin) HandleFunc(pattern string, handler func(http.ResponseWriter,
}
func (opt *Plugin) HandleApi(pattern string, handler func(http.ResponseWriter, *http.Request)) {
opt.HandleFunc("/api"+pattern, handler)
if opt == nil {
return
}
pattern = "/api" + pattern
util.Println("http handle added:", pattern)
opt.HandleFunc(pattern, handler)
}
// 读取独立配置合并入总配置中
func (opt *Plugin) merge() {
func (opt *Plugin) assign() {
f, err := os.Open(opt.settingPath())
if err == nil {
if err = yaml.NewDecoder(f).Decode(&opt.Modified); err == nil {
@@ -162,8 +153,45 @@ func (opt *Plugin) merge() {
}
}
}
t := reflect.TypeOf(opt.Config).Elem()
// 用全局配置覆盖没有设置的配置
for i, j := 0, t.NumField(); i < j; i++ {
fname := t.Field(i).Name
if Engine.RawConfig.Has(fname) {
if !opt.RawConfig.Has(fname) {
opt.RawConfig.Set(fname, Engine.RawConfig[fname])
} else if opt.RawConfig.HasChild(fname) {
opt.RawConfig.GetChild(fname).Merge(Engine.RawConfig.GetChild(fname))
}
}
}
opt.registerHandler()
opt.Update()
}
func (opt *Plugin) Update() {
if opt.CancelFunc != nil {
opt.CancelFunc()
}
opt.Context, opt.CancelFunc = context.WithCancel(Engine)
go opt.Config.Update(opt.RawConfig)
}
func (opt *Plugin) registerHandler() {
t := reflect.TypeOf(opt.Config).Elem()
v := reflect.ValueOf(opt.Config).Elem()
// 注册http响应
for i, j := 0, t.NumMethod(); i < j; i++ {
mt := t.Method(i)
if strings.HasPrefix(mt.Name, "API") {
parts := strings.Split(mt.Name, "_")
parts[0] = ""
patten := reflect.ValueOf(strings.Join(parts, "/"))
reflect.ValueOf(opt.HandleApi).Call([]reflect.Value{patten, v.Method(i)})
}
}
}
func (opt *Plugin) settingPath() string {
return filepath.Join(settingDir, strings.ToLower(opt.Name)+".yaml")
}

View File

@@ -1,8 +1,11 @@
package engine
import (
"io"
"net/url"
"time"
"github.com/Monibuca/engine/v4/config"
)
type IPublisher interface {
@@ -12,26 +15,28 @@ type IPublisher interface {
type Publisher struct {
Type string
PullURL *url.URL
*Stream `json:"-"`
Config PublishConfig
}
func (pub *Publisher) Publish(streamPath string, realPub IPublisher) bool {
func (pub *Publisher) Publish(streamPath string, realPub IPublisher, config config.Publish) bool {
Streams.Lock()
defer Streams.Unlock()
s, created := findOrCreateStream(streamPath, time.Second)
if s.IsClosed() {
return false
}
if s.Publisher != nil && pub.Config.KillExit {
if s.Publisher != nil {
if config.KillExit {
s.Publisher.Close()
} else {
return false
}
}
pub.Stream = s
s.Publisher = realPub
if created {
s.PublishTimeout = pub.Config.PublishTimeout.Duration()
s.WaitCloseTimeout = pub.Config.WaitCloseTimeout.Duration()
s.PublishTimeout = config.PublishTimeout.Duration()
s.WaitCloseTimeout = config.WaitCloseTimeout.Duration()
go s.run()
}
s.actionChan <- PublishAction{}
@@ -41,3 +46,14 @@ func (pub *Publisher) Publish(streamPath string, realPub IPublisher) bool {
func (pub *Publisher) OnStateChange(oldState StreamState, newState StreamState) bool {
return true
}
// 用于远程拉流的发布者
type Puller struct {
Publisher
RemoteURL *url.URL
io.ReadCloser
}
func (puller *Puller) Close() {
puller.ReadCloser.Close()
}

View File

@@ -65,6 +65,17 @@ var StreamFSM = [STATE_DESTROYED + 1]map[StreamAction]StreamState{
// Streams 所有的流集合
var Streams = util.Map[string, *Stream]{Map: make(map[string]*Stream)}
func FilterStreams[T IPublisher]() (ss []*Stream) {
Streams.RLock()
defer Streams.RUnlock()
for _, s := range Streams.Map {
if _, ok := s.Publisher.(T); ok {
ss = append(ss, s)
}
}
return
}
type UnSubscibeAction *Subscriber
type PublishAction struct{}
type UnPublishAction struct{}
@@ -125,7 +136,7 @@ func findOrCreateStream(streamPath string, waitTimeout time.Duration) (s *Stream
s.actionChan = make(chan any, 1)
s.StartTime = time.Now()
s.timeout = time.NewTimer(waitTimeout)
s.Context, s.cancel = context.WithCancel(Ctx)
s.Context, s.cancel = context.WithCancel(Engine)
s.Init(s)
return s, true
}

View File

@@ -6,6 +6,7 @@ import (
"time"
. "github.com/Monibuca/engine/v4/common"
"github.com/Monibuca/engine/v4/config"
"github.com/Monibuca/engine/v4/track"
)
@@ -16,7 +17,7 @@ type VideoFrame AVFrame[NALUSlice]
type Subscriber struct {
context.Context `json:"-"`
cancel context.CancelFunc
Config SubscribeConfig
Config config.Subscribe
Stream *Stream `json:"-"`
ID string
TotalDrop int //总丢帧
@@ -39,7 +40,7 @@ func (s *Subscriber) Close() {
}
//Subscribe 开始订阅 将Subscriber与Stream关联
func (sub *Subscriber) Subscribe(streamPath string, config SubscribeConfig) bool {
func (sub *Subscriber) Subscribe(streamPath string, config config.Subscribe) bool {
Streams.Lock()
defer Streams.Unlock()
s, created := findOrCreateStream(streamPath, config.WaitTimeout.Duration())

View File

@@ -1,6 +1,7 @@
package track
import (
"net"
"time"
"github.com/Monibuca/engine/v4/codec"
@@ -21,8 +22,7 @@ type AAC Audio
func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
if frame.IsSequence() {
aac.DecoderConfiguration.Reset()
aac.DecoderConfiguration.AppendAVCC(frame)
aac.DecoderConfiguration.AVCC = AudioSlice(frame)
config1, config2 := frame[2], frame[3]
//audioObjectType = (config1 & 0xF8) >> 3
// 1 AAC MAIN ISO/IEC 14496-3 subpart 4
@@ -31,8 +31,8 @@ func (aac *AAC) WriteAVCC(ts uint32, frame AVCCFrame) {
// 4 AAC LTP ISO/IEC 14496-3 subpart 4
aac.Channels = ((config2 >> 3) & 0x0F) //声道
aac.SampleRate = uint32(codec.SamplingFrequencies[((config1&0x7)<<1)|(config2>>7)])
aac.DecoderConfiguration.AppendRaw(AudioSlice(frame[2:]))
aac.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_AUDIO, 0)
aac.DecoderConfiguration.Raw = AudioSlice(frame[2:])
aac.DecoderConfiguration.FLV = net.Buffers{adcflv1, frame, adcflv2}
} else {
(*Audio)(aac).WriteAVCC(ts, frame)
}

View File

@@ -1,6 +1,7 @@
package track
import (
"net"
"strings"
"github.com/Monibuca/engine/v4/codec"
@@ -8,6 +9,9 @@ import (
"github.com/Monibuca/engine/v4/util"
)
var adcflv1 = []byte{codec.FLV_TAG_TYPE_AUDIO, 0, 0, 4, 0, 0, 0, 0, 0, 0, 0}
var adcflv2 = []byte{0, 0, 0, 15}
type Audio struct {
Media[AudioSlice]
Channels byte
@@ -36,12 +40,18 @@ func (at *Audio) Play(onAudio func(*AVFrame[AudioSlice]) error) {
}
}
func (at *Audio) WriteADTS(adts []byte) {
at.SampleRate = uint32(codec.SamplingFrequencies[(adts[2]&0x3c)>>2])
at.Channels = ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
at.DecoderConfiguration.AppendAVCC(codec.ADTSToAudioSpecificConfig(adts))
at.DecoderConfiguration.AppendRaw(at.DecoderConfiguration.AVCC[0][2:])
at.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_AUDIO, 0)
profile := ((adts[2] & 0xc0) >> 6) + 1
sampleRate := (adts[2] & 0x3c) >> 2
channel := ((adts[2] & 0x1) << 2) | ((adts[3] & 0xc0) >> 6)
config1 := (profile << 3) | ((sampleRate & 0xe) >> 1)
config2 := ((sampleRate & 0x1) << 7) | (channel << 3)
at.SampleRate = uint32(codec.SamplingFrequencies[sampleRate])
at.Channels = channel
at.DecoderConfiguration.AVCC = []byte{0xAF, 0x00, config1, config2}
at.DecoderConfiguration.Raw = at.DecoderConfiguration.AVCC[:2]
at.DecoderConfiguration.FLV = net.Buffers{adcflv1, at.DecoderConfiguration.AVCC, adcflv2}
}
func (at *Audio) WriteAVCC(ts uint32, frame AVCCFrame) {
at.Media.WriteAVCC(ts, frame)
at.Flush()

View File

@@ -29,7 +29,7 @@ type Media[T RawSlice] struct {
CodecID byte
SampleRate uint32
SampleSize byte
DecoderConfiguration AVFrame[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
DecoderConfiguration DecoderConfiguration[T] `json:"-"` //H264(SPS、PPS) H265(VPS、SPS、PPS) AAC(config)
util.BytesPool //无锁内存池,用于发布者(在同一个协程中)复用小块的内存,通常是解包时需要临时使用
lastAvccTS uint32 //上一个avcc帧的时间戳
}

View File

@@ -28,20 +28,29 @@ func (vt *H264) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
func (vt *H264) WriteSlice(slice NALUSlice) {
switch slice.H264Type() {
case codec.NALU_SPS:
vt.DecoderConfiguration.Reset()
vt.DecoderConfiguration.AppendRaw(slice)
if len(vt.DecoderConfiguration.Raw) > 0 {
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
}
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
case codec.NALU_PPS:
vt.DecoderConfiguration.AppendRaw(slice)
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
vt.SPSInfo, _ = codec.ParseSPS(slice[0])
lenSPS := util.SizeOfBuffers(net.Buffers(vt.DecoderConfiguration.Raw[0]))
lenPPS := util.SizeOfBuffers(net.Buffers(vt.DecoderConfiguration.Raw[1]))
if len(vt.DecoderConfiguration.Raw) > 0 {
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
}
lenSPS := len(vt.DecoderConfiguration.Raw[0])
lenPPS := len(vt.DecoderConfiguration.Raw[1])
if len(vt.DecoderConfiguration.AVCC) > 0 {
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
}
if lenSPS > 3 {
vt.DecoderConfiguration.AppendAVCC(codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][0][1:4])
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD[:6], vt.DecoderConfiguration.Raw[0][1:4])
} else {
vt.DecoderConfiguration.AppendAVCC(codec.RTMP_AVC_HEAD)
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, codec.RTMP_AVC_HEAD)
}
tmp := []byte{0xE1, 0, 0, 0x01, 0, 0}
vt.DecoderConfiguration.AppendAVCC(tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0][0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1][0])
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, tmp[:1], util.PutBE(tmp[1:3], lenSPS), vt.DecoderConfiguration.Raw[0], tmp[3:4], util.PutBE(tmp[3:6], lenPPS), vt.DecoderConfiguration.Raw[1])
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
case codec.NALU_IDR_Picture:
vt.Value.IFrame = true
fallthrough
@@ -53,16 +62,17 @@ func (vt *H264) WriteSlice(slice NALUSlice) {
func (vt *H264) WriteAVCC(ts uint32, frame AVCCFrame) {
if frame.IsSequence() {
vt.DecoderConfiguration.Reset()
vt.DecoderConfiguration.SeqInTrack = vt.Value.SeqInTrack
vt.DecoderConfiguration.AppendAVCC(frame)
if len(vt.DecoderConfiguration.AVCC) > 0 {
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
}
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame)
var info codec.AVCDecoderConfigurationRecord
if _, err := info.Unmarshal(frame[5:]); err == nil {
vt.SPSInfo, _ = codec.ParseSPS(info.SequenceParameterSetNALUnit)
vt.nalulenSize = int(info.LengthSizeMinusOne&3 + 1)
vt.DecoderConfiguration.AppendRaw(NALUSlice{info.SequenceParameterSetNALUnit}, NALUSlice{info.PictureParameterSetNALUnit})
vt.DecoderConfiguration.Raw = NALUSlice{info.SequenceParameterSetNALUnit, info.PictureParameterSetNALUnit}
}
vt.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_VIDEO, 0)
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
} else {
(*Video)(vt).WriteAVCC(ts, frame)
vt.Value.IFrame = frame.IsIDR()

View File

@@ -1,6 +1,7 @@
package track
import (
"net"
"time"
"github.com/Monibuca/engine/v4/codec"
@@ -26,17 +27,23 @@ func (vt *H265) WriteAnnexB(pts uint32, dts uint32, frame AnnexBFrame) {
func (vt *H265) WriteSlice(slice NALUSlice) {
switch slice.H265Type() {
case codec.NAL_UNIT_VPS:
vt.DecoderConfiguration.Reset()
vt.DecoderConfiguration.AppendRaw(slice)
if len(vt.DecoderConfiguration.Raw) > 0 {
vt.DecoderConfiguration.Raw = vt.DecoderConfiguration.Raw[:0]
}
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
case codec.NAL_UNIT_SPS:
vt.DecoderConfiguration.AppendRaw(slice)
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
vt.SPSInfo, _ = codec.ParseHevcSPS(slice[0])
case codec.NAL_UNIT_PPS:
vt.DecoderConfiguration.AppendRaw(slice)
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0][0], vt.DecoderConfiguration.Raw[1][0], vt.DecoderConfiguration.Raw[2][0])
vt.DecoderConfiguration.Raw = append(vt.DecoderConfiguration.Raw, slice[0])
extraData, err := codec.BuildH265SeqHeaderFromVpsSpsPps(vt.DecoderConfiguration.Raw[0], vt.DecoderConfiguration.Raw[1], vt.DecoderConfiguration.Raw[2])
if err == nil {
vt.DecoderConfiguration.AppendAVCC(extraData)
if len(vt.DecoderConfiguration.AVCC) > 0 {
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
}
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, extraData)
}
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
case
codec.NAL_UNIT_CODED_SLICE_BLA,
codec.NAL_UNIT_CODED_SLICE_BLANT,
@@ -52,15 +59,16 @@ func (vt *H265) WriteSlice(slice NALUSlice) {
}
func (vt *H265) WriteAVCC(ts uint32, frame AVCCFrame) {
if frame.IsSequence() {
vt.DecoderConfiguration.Reset()
vt.DecoderConfiguration.SeqInTrack = vt.Value.SeqInTrack
vt.DecoderConfiguration.AppendAVCC(frame)
if len(vt.DecoderConfiguration.AVCC) > 0 {
vt.DecoderConfiguration.AVCC = vt.DecoderConfiguration.AVCC[:0]
}
vt.DecoderConfiguration.AVCC = append(vt.DecoderConfiguration.AVCC, frame)
if vps, sps, pps, err := codec.ParseVpsSpsPpsFromSeqHeaderWithoutMalloc(frame); err == nil {
vt.SPSInfo, _ = codec.ParseHevcSPS(frame)
vt.nalulenSize = int(frame[26]) & 0x03
vt.DecoderConfiguration.AppendRaw(NALUSlice{vps}, NALUSlice{sps}, NALUSlice{pps})
vt.DecoderConfiguration.Raw = NALUSlice{vps, sps, pps}
}
vt.DecoderConfiguration.FillFLV(codec.FLV_TAG_TYPE_VIDEO, 0)
vt.DecoderConfiguration.FLV = codec.VideoAVCC2FLV(net.Buffers(vt.DecoderConfiguration.AVCC), 0)
} else {
(*Video)(vt).WriteAVCC(ts, frame)
vt.Value.IFrame = frame.IsIDR()

View File

@@ -95,6 +95,7 @@ func SizeOfBuffers(buf net.Buffers) (size int) {
func CutBuffers(buf net.Buffers, size int) {
}
// SplitBuffers 按照一定大小分割 Buffers
func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) {
for total := SizeOfBuffers(buf); total > 0; {
@@ -122,3 +123,5 @@ func SplitBuffers(buf net.Buffers, size int) (result []net.Buffers) {
}
return
}

View File

@@ -19,3 +19,14 @@ func (s *Slice[T]) Delete(v T) bool {
}
return false
}
func (s *Slice[T]) Reset() {
if len(*s) > 0 {
*s = (*s)[:0]
}
}
func (s *Slice[T]) ResetAppend(first T) {
s.Reset()
s.Add(first)
}