18 Commits

Author SHA1 Message Date
Ingo Oppermann
4a12b0293f Use configured logging target 2023-01-03 11:54:48 +01:00
Ingo Oppermann
f472fe150f Merge branch 'dev' into logging 2023-01-03 11:45:50 +01:00
Ingo Oppermann
1bbb7a9c1f Use config locations for import and ffmigrage 2023-01-03 11:45:10 +01:00
Ingo Oppermann
37e00407cc Allow to define a logging target 2023-01-03 11:28:57 +01:00
Ingo Oppermann
17c9f6ef13 Test different standard location for config file
If no path is given in the environment variable CORE_CONFIGFILE, different
standard locations will be probed:
- os.UserConfigDir() + /datarhei-core/config.js
- os.UserHomeDir() + /.config/datarhei-core/config.js
- ./config/config.js
If the config.js doesn't exist in any of these locations, it will be
assumed at ./config/config.js
2023-01-03 07:55:55 +01:00
Ingo Oppermann
ff6b0d9584 Require go1.19 for tests 2023-01-03 07:05:00 +01:00
Ingo Oppermann
378a3cd9cf Allow to set a soft memory limit for the binary itself
The setting debug.memory_limit_mbytes should not be used in conjuction
with debug.force_gc because the memory limit influences the garbage
collector.
2023-01-02 11:58:54 +01:00
Ingo Oppermann
992b04d180 Allow alternative syntax for auth0 tenants as environment variable 2023-01-02 11:39:58 +01:00
Ingo Oppermann
391681447e Fix MustDir config type to create directory 2023-01-02 10:54:29 +01:00
Ingo Oppermann
59aa6af767 Allow partial process config updates 2023-01-02 07:20:39 +01:00
Ingo Oppermann
c44fb30a84 Fix check for at least one process input and output 2023-01-02 06:57:02 +01:00
Ingo Oppermann
0cd8be130c Remove letsdebug module
This module has a dependency of a modules that requires cgo, that's a no-go.
2022-12-31 17:46:46 +01:00
Ingo Oppermann
65a617c2af Fix modifying DTS in RTMP packets (datarhei/restreamer#487, datarhei/restreamer#367) 2022-12-29 10:43:15 +01:00
Ingo Oppermann
8a1dc59a81 Set a default of 20ms for internal SRT latency 2022-12-27 13:46:02 +01:00
Ingo Oppermann
ee2a188be8 Allow defaults for template parameter 2022-12-27 13:41:07 +01:00
Ingo Oppermann
1a9ef8b7c9 Add Let's Debug auto TLS error diagnostic 2022-12-27 10:26:49 +01:00
Ingo Oppermann
d0262cc887 Add logging for service 2022-12-27 09:47:59 +01:00
Ingo Oppermann
18be75d013 Use new streamid format for {srt} placeholder 2022-11-22 21:25:54 +01:00
26 changed files with 702 additions and 224 deletions

View File

@@ -3,20 +3,20 @@ name: tests
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- uses: actions/setup-go@v2
with:
go-version: '1.18'
- name: Run coverage
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unit-linux
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- uses: actions/setup-go@v2
with:
go-version: "1.19"
- name: Run coverage
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unit-linux

View File

@@ -6,9 +6,11 @@ import (
"fmt"
"io"
golog "log"
"math"
gonet "net"
gohttp "net/http"
"net/url"
"os"
"path/filepath"
"runtime/debug"
"sync"
@@ -39,6 +41,7 @@ import (
"github.com/datarhei/core/v16/update"
"github.com/caddyserver/certmagic"
"go.uber.org/zap"
)
// The API interface is the implementation for the restreamer API.
@@ -145,7 +148,12 @@ func (a *api) Reload() error {
a.errorChan = make(chan error, 1)
}
logger := log.New("Core").WithOutput(log.NewConsoleWriter(a.log.writer, log.Lwarn, true))
logger := log.New("Core").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(a.log.writer, true),
log.Lwarn,
),
)
store, err := configstore.NewJSON(a.config.path, func() {
a.errorChan <- ErrConfigReload
@@ -181,31 +189,54 @@ func (a *api) Reload() error {
break
}
buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines)
buffer := log.NewBufferWriter(cfg.Log.MaxLines)
var writer log.Writer
logger = logger.WithOutput(log.NewLevelRewriter(
log.NewMultiWriter(
log.NewTopicWriter(
log.NewConsoleWriter(a.log.writer, loglevel, true),
cfg.Log.Topics,
),
buffer,
),
[]log.LevelRewriteRule{
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
{
Level: log.Ldebug,
Component: "HTTP",
Match: map[string]string{
"client": "^(::1|127.0.0.1)$",
"method": "^(PUT|POST|DELETE)$",
"status_text": "^Unauthorized$",
"user_agent": "^Lavf/",
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(
log.NewLevelWriter(
log.NewLevelRewriter(
log.NewMultiWriter(
log.NewTopicWriter(
writer,
cfg.Log.Topics,
),
buffer,
),
[]log.LevelRewriteRule{
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
{
Level: log.Ldebug,
Component: "HTTP",
Match: map[string]string{
"client": "^(::1|127.0.0.1)$",
"method": "^(PUT|POST|DELETE)$",
"status_text": "^Unauthorized$",
"user_agent": "^Lavf/",
},
},
},
},
},
))
),
loglevel,
),
)
logfields := log.Fields{
"application": app.Name,
@@ -227,6 +258,8 @@ func (a *api) Reload() error {
logger.Info().WithFields(logfields).Log("")
logger.Info().WithField("path", a.config.path).Log("Read config file")
configlogger := logger.WithComponent("Config")
cfg.Messages(func(level string, v configvars.Variable, message string) {
configlogger = configlogger.WithFields(log.Fields{
@@ -443,8 +476,8 @@ func (a *api) start() error {
a.replacer = replace.New()
{
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base())
a.replacer.RegisterTemplate("memfs", a.memfs.Base())
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base(), nil)
a.replacer.RegisterTemplate("memfs", a.memfs.Base(), nil)
host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address)
if len(host) == 0 {
@@ -461,21 +494,23 @@ func (a *api) start() error {
template += "?token=" + cfg.RTMP.Token
}
a.replacer.RegisterTemplate("rtmp", template)
a.replacer.RegisterTemplate("rtmp", template, nil)
host, port, _ = gonet.SplitHostPort(cfg.SRT.Address)
if len(host) == 0 {
host = "localhost"
}
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&streamid=#!:m={mode},r={name}"
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name},mode:{mode}"
if len(cfg.SRT.Token) != 0 {
template += ",token=" + cfg.SRT.Token
template += ",token:" + cfg.SRT.Token
}
if len(cfg.SRT.Passphrase) != 0 {
template += "&passphrase=" + cfg.SRT.Passphrase
}
a.replacer.RegisterTemplate("srt", template)
a.replacer.RegisterTemplate("srt", template, map[string]string{
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
})
}
store := store.NewJSONStore(store.JSONConfig{
@@ -655,26 +690,28 @@ func (a *api) start() error {
if cfg.TLS.Enable {
if cfg.TLS.Auto {
if len(cfg.Host.Name) == 0 {
return fmt.Errorf("at least one host must be provided in host.name or RS_HOST_NAME")
return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME")
}
certmagic.Default.Storage = &certmagic.FileStorage{
Path: cfg.DB.Dir + "/cert",
}
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
certmagic.Default.Logger = zap.NewNop()
certmagic.DefaultACME.Agreed = true
certmagic.DefaultACME.Email = cfg.TLS.Email
certmagic.DefaultACME.CA = certmagic.LetsEncryptProductionCA
certmagic.DefaultACME.DisableHTTPChallenge = false
certmagic.DefaultACME.DisableTLSALPNChallenge = true
certmagic.DefaultACME.Logger = nil
certmagic.Default.Storage = &certmagic.FileStorage{
Path: cfg.DB.Dir + "/cert",
}
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
certmagic.Default.Logger = nil
certmagic.DefaultACME.Logger = zap.NewNop()
magic := certmagic.NewDefault()
acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME)
acme.Logger = zap.NewNop()
magic.Issuers = []certmagic.Issuer{acme}
magic.Logger = zap.NewNop()
autocertManager = magic
@@ -713,6 +750,19 @@ func (a *api) start() error {
if err != nil {
logger.Error().WithField("error", err).Log("Failed to acquire certificate")
certerror = true
/*
problems, err := letsdebug.Check(host, letsdebug.HTTP01)
if err != nil {
logger.Error().WithField("error", err).Log("Failed to debug certificate acquisition")
}
for _, p := range problems {
logger.Error().WithFields(log.Fields{
"name": p.Name,
"detail": p.Detail,
}).Log(p.Explanation)
}
*/
break
}
@@ -1101,6 +1151,12 @@ func (a *api) start() error {
}(ctx)
}
if cfg.Debug.MemoryLimit > 0 {
debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024)
} else {
debug.SetMemoryLimit(math.MaxInt64)
}
// Start the restream processes
restream.Start()
@@ -1270,4 +1326,6 @@ func (a *api) Destroy() {
a.memfs.DeleteAll()
a.memfs = nil
}
a.log.logger.core.Close()
}

View File

@@ -17,12 +17,19 @@ import (
)
func main() {
logger := log.New("Migration").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithFields(log.Fields{
logger := log.New("Migration").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(os.Stderr, true),
log.Linfo,
),
).WithFields(log.Fields{
"from": "ffmpeg4",
"to": "ffmpeg5",
})
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
@@ -63,6 +70,27 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
return fmt.Errorf("the configuration contains errors: %v", messages)
}
var writer log.Writer
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(writer)
ff, err := ffmpeg.New(ffmpeg.Config{
Binary: cfg.FFmpeg.Binary,
})

View File

@@ -13,9 +13,16 @@ import (
)
func main() {
logger := log.New("Import").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithField("version", "v1")
logger := log.New("Import").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(os.Stderr, true),
log.Linfo,
),
).WithField("version", "v1")
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
@@ -31,8 +38,6 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
logger = log.New("")
}
logger.Info().Log("Database import")
cfg := configstore.Get()
// Merging the persisted config with the environment variables
@@ -58,6 +63,27 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
return fmt.Errorf("the configuration contains errors: %v", messages)
}
var writer log.Writer
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(writer)
logger.Info().Log("Checking for database ...")
// Check if there's a v1.json from the old Restreamer

View File

@@ -141,6 +141,8 @@ func (d *Config) init() {
d.vars.Register(value.NewString(&d.Log.Level, "info"), "log.level", "CORE_LOG_LEVEL", nil, "Loglevel: silent, error, warn, info, debug", false, false)
d.vars.Register(value.NewStringList(&d.Log.Topics, []string{}, ","), "log.topics", "CORE_LOG_TOPICS", nil, "Show only selected log topics", false, false)
d.vars.Register(value.NewInt(&d.Log.MaxLines, 1000), "log.max_lines", "CORE_LOG_MAXLINES", nil, "Number of latest log lines to keep in memory", false, false)
d.vars.Register(value.NewString(&d.Log.Target.Output, "stderr"), "log.target.output", "CORE_LOG_TARGET_OUTPUT", nil, "Where to write the logs to: stdout, stderr, file", false, false)
d.vars.Register(value.NewString(&d.Log.Target.Path, ""), "log.target.path", "CORE_LOG_TARGET_PATH", nil, "Path to log file if output is 'file'", false, false)
// DB
d.vars.Register(value.NewMustDir(&d.DB.Dir, "./config"), "db.dir", "CORE_DB_DIR", nil, "Directory for holding the operational data", false, false)
@@ -232,6 +234,7 @@ func (d *Config) init() {
// Debug
d.vars.Register(value.NewBool(&d.Debug.Profiling, false), "debug.profiling", "CORE_DEBUG_PROFILING", nil, "Enable profiling endpoint on /profiling", false, false)
d.vars.Register(value.NewInt(&d.Debug.ForceGC, 0), "debug.force_gc", "CORE_DEBUG_FORCEGC", nil, "Number of seconds between forcing GC to return memory to the OS", false, false)
d.vars.Register(value.NewInt64(&d.Debug.MemoryLimit, 0), "debug.memory_limit_mbytes", "CORE_DEBUG_MEMORY_LIMIT_MBYTES", nil, "Impose a soft memory limit for the core, in megabytes", false, false)
// Metrics
d.vars.Register(value.NewBool(&d.Metrics.Enable, false), "metrics.enable", "CORE_METRICS_ENABLE", nil, "Enable collecting historic metrics data", false, false)

View File

@@ -22,6 +22,10 @@ type Data struct {
Level string `json:"level" enums:"debug,info,warn,error,silent" jsonschema:"enum=debug,enum=info,enum=warn,enum=error,enum=silent"`
Topics []string `json:"topics"`
MaxLines int `json:"max_lines"`
Target struct {
Output string `json:"name"`
Path string `json:"path"`
} `json:"target"` // discard, stderr, stdout, file:/path/to/file.log
} `json:"log"`
DB struct {
Dir string `json:"dir"`
@@ -135,8 +139,9 @@ type Data struct {
MaxPort int `json:"max_port"`
} `json:"playout"`
Debug struct {
Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc"`
Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc"`
MemoryLimit int64 `json:"memory_limit_mbytes"`
} `json:"debug"`
Metrics struct {
Enable bool `json:"enable"`
@@ -181,7 +186,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Address = d.Address
data.CheckForUpdates = d.CheckForUpdates
data.Log = d.Log
data.DB = d.DB
data.Host = d.Host
data.API = d.API
@@ -189,14 +193,11 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Debug = d.Debug
data.Metrics = d.Metrics
data.Sessions = d.Sessions
data.Service = d.Service
data.Router = d.Router
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Host.Name = copy.Slice(d.Host.Name)
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
@@ -228,6 +229,16 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Storage.Memory = d.Storage.Memory
// Actual changes
data.Log.Level = d.Log.Level
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Log.MaxLines = d.Log.MaxLines
data.Log.Target.Output = "stderr"
data.Log.Target.Path = ""
data.Debug.Profiling = d.Debug.Profiling
data.Debug.ForceGC = d.Debug.ForceGC
data.Debug.MemoryLimit = 0
data.TLS.Enable = d.TLS.Enable
data.TLS.Address = d.TLS.Address
data.TLS.Auto = d.TLS.Auto
@@ -259,7 +270,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Address = d.Address
data.CheckForUpdates = d.CheckForUpdates
data.Log = d.Log
data.DB = d.DB
data.Host = d.Host
data.API = d.API
@@ -267,14 +277,11 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Debug = d.Debug
data.Metrics = d.Metrics
data.Sessions = d.Sessions
data.Service = d.Service
data.Router = d.Router
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Host.Name = copy.Slice(d.Host.Name)
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
@@ -299,6 +306,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Router.Routes = copy.StringMap(d.Router.Routes)
// Actual changes
data.Log.Level = d.Log.Level
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Log.MaxLines = d.Log.MaxLines
data.Debug.Profiling = d.Debug.Profiling
data.Debug.ForceGC = d.Debug.ForceGC
data.TLS.Enable = d.TLS.Enable
data.TLS.Address = d.TLS.Address
data.TLS.Auto = d.TLS.Auto

View File

@@ -118,6 +118,10 @@ func (c *jsonStore) load(cfg *config.Config) error {
return err
}
if len(jsondata) == 0 {
return nil
}
data, err := migrate(jsondata)
if err != nil {
return err

53
config/store/location.go Normal file
View File

@@ -0,0 +1,53 @@
package store
import (
"os"
"path"
)
// Location returns the path to the config file. If no path is provided,
// different standard location will be probed:
// - os.UserConfigDir() + /datarhei-core/config.js
// - os.UserHomeDir() + /.config/datarhei-core/config.js
// - ./config/config.js
// If the config doesn't exist in none of these locations, it will be assumed
// at ./config/config.js
func Location(filepath string) string {
configfile := filepath
if len(configfile) != 0 {
return configfile
}
locations := []string{}
if dir, err := os.UserConfigDir(); err == nil {
locations = append(locations, dir+"/datarhei-core/config.js")
}
if dir, err := os.UserHomeDir(); err == nil {
locations = append(locations, dir+"/.config/datarhei-core/config.js")
}
locations = append(locations, "./config/config.js")
for _, path := range locations {
info, err := os.Stat(path)
if err != nil {
continue
}
if info.IsDir() {
continue
}
configfile = path
}
if len(configfile) == 0 {
configfile = "./config/config.js"
}
os.MkdirAll(path.Dir(configfile), 0740)
return configfile
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"strings"
)
@@ -16,6 +17,28 @@ type Auth0Tenant struct {
Users []string `json:"users"`
}
func (a *Auth0Tenant) String() string {
u := url.URL{
Scheme: "auth0",
Host: a.Domain,
}
if len(a.ClientID) != 0 {
u.User = url.User(a.ClientID)
}
q := url.Values{}
q.Set("aud", a.Audience)
for _, user := range a.Users {
q.Add("user", user)
}
u.RawQuery = q.Encode()
return u.String()
}
type TenantList struct {
p *[]Auth0Tenant
separator string
@@ -32,18 +55,34 @@ func NewTenantList(p *[]Auth0Tenant, val []Auth0Tenant, separator string) *Tenan
return v
}
// Set allows to set a tenant list in two formats:
// - a separator separated list of bas64 encoded Auth0Tenant JSON objects
// - a separator separated list of Auth0Tenant in URL representation: auth0://[clientid]@[domain]?aud=[audience]&user=...&user=...
func (s *TenantList) Set(val string) error {
list := []Auth0Tenant{}
for i, elm := range strings.Split(val, s.separator) {
data, err := base64.StdEncoding.DecodeString(elm)
if err != nil {
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
}
t := Auth0Tenant{}
if err := json.Unmarshal(data, &t); err != nil {
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
if strings.HasPrefix(elm, "auth0://") {
data, err := url.Parse(elm)
if err != nil {
return fmt.Errorf("invalid url encoding of tenant %d: %w", i, err)
}
t.Domain = data.Host
t.ClientID = data.User.Username()
t.Audience = data.Query().Get("aud")
t.Users = data.Query()["user"]
} else {
data, err := base64.StdEncoding.DecodeString(elm)
if err != nil {
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
}
if err := json.Unmarshal(data, &t); err != nil {
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
}
}
list = append(list, t)
@@ -62,10 +101,10 @@ func (s *TenantList) String() string {
list := []string{}
for _, t := range *s.p {
list = append(list, fmt.Sprintf("%s (%d users)", t.Domain, len(t.Users)))
list = append(list, t.String())
}
return strings.Join(list, ",")
return strings.Join(list, s.separator)
}
func (s *TenantList) Validate() error {

View File

@@ -0,0 +1,43 @@
package value
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAuth0Value(t *testing.T) {
tenants := []Auth0Tenant{}
v := NewTenantList(&tenants, nil, " ")
require.Equal(t, "(empty)", v.String())
v.Set("auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3")
require.Equal(t, []Auth0Tenant{
{
Domain: "domain",
ClientID: "clientid",
Audience: "audience",
Users: []string{"user1", "user2"},
},
{
Domain: "domain2",
Audience: "audience2",
Users: []string{"user3"},
},
}, tenants)
require.Equal(t, "auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3", v.String())
require.NoError(t, v.Validate())
v.Set("eyJkb21haW4iOiJkYXRhcmhlaS5ldS5hdXRoMC5jb20iLCJhdWRpZW5jZSI6Imh0dHBzOi8vZGF0YXJoZWkuY29tL2NvcmUiLCJ1c2VycyI6WyJhdXRoMHx4eHgiXX0=")
require.Equal(t, []Auth0Tenant{
{
Domain: "datarhei.eu.auth0.com",
ClientID: "",
Audience: "https://datarhei.com/core",
Users: []string{"auth0|xxx"},
},
}, tenants)
require.Equal(t, "auth0://datarhei.eu.auth0.com?aud=https%3A%2F%2Fdatarhei.com%2Fcore&user=auth0%7Cxxx", v.String())
require.NoError(t, v.Validate())
}

View File

@@ -34,6 +34,10 @@ func (u *MustDir) Validate() error {
return fmt.Errorf("path name must not be empty")
}
if err := os.MkdirAll(val, 0750); err != nil {
return fmt.Errorf("%s can't be created (%w)", val, err)
}
finfo, err := os.Stat(val)
if err != nil {
return fmt.Errorf("%s does not exist", val)

View File

@@ -3,7 +3,7 @@ package value
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestIntValue(t *testing.T) {
@@ -11,19 +11,19 @@ func TestIntValue(t *testing.T) {
ivar := NewInt(&i, 11)
assert.Equal(t, "11", ivar.String())
assert.Equal(t, nil, ivar.Validate())
assert.Equal(t, false, ivar.IsEmpty())
require.Equal(t, "11", ivar.String())
require.Equal(t, nil, ivar.Validate())
require.Equal(t, false, ivar.IsEmpty())
i = 42
assert.Equal(t, "42", ivar.String())
assert.Equal(t, nil, ivar.Validate())
assert.Equal(t, false, ivar.IsEmpty())
require.Equal(t, "42", ivar.String())
require.Equal(t, nil, ivar.Validate())
require.Equal(t, false, ivar.IsEmpty())
ivar.Set("77")
assert.Equal(t, int(77), i)
require.Equal(t, int(77), i)
}
type testdata struct {
@@ -37,22 +37,22 @@ func TestCopyStruct(t *testing.T) {
NewInt(&data1.value1, 1)
NewInt(&data1.value2, 2)
assert.Equal(t, int(1), data1.value1)
assert.Equal(t, int(2), data1.value2)
require.Equal(t, int(1), data1.value1)
require.Equal(t, int(2), data1.value2)
data2 := testdata{}
val21 := NewInt(&data2.value1, 3)
val22 := NewInt(&data2.value2, 4)
assert.Equal(t, int(3), data2.value1)
assert.Equal(t, int(4), data2.value2)
require.Equal(t, int(3), data2.value1)
require.Equal(t, int(4), data2.value2)
data2 = data1
assert.Equal(t, int(1), data2.value1)
assert.Equal(t, int(2), data2.value2)
require.Equal(t, int(1), data2.value1)
require.Equal(t, int(2), data2.value2)
assert.Equal(t, "1", val21.String())
assert.Equal(t, "2", val22.String())
require.Equal(t, "1", val21.String())
require.Equal(t, "2", val22.String())
}

2
go.mod
View File

@@ -26,6 +26,7 @@ require (
github.com/swaggo/swag v1.8.7
github.com/vektah/gqlparser/v2 v2.5.1
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.23.0
golang.org/x/mod v0.6.0
)
@@ -79,7 +80,6 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect

View File

@@ -12,7 +12,7 @@ import (
func (r *queryResolver) Log(ctx context.Context) ([]string, error) {
if r.LogBuffer == nil {
r.LogBuffer = log.NewBufferWriter(log.Lsilent, 1)
r.LogBuffer = log.NewBufferWriter(1)
}
events := r.LogBuffer.Events()

View File

@@ -22,7 +22,7 @@ func NewLog(buffer log.BufferWriter) *LogHandler {
}
if l.buffer == nil {
l.buffer = log.NewBufferWriter(log.Lsilent, 1)
l.buffer = log.NewBufferWriter(1)
}
return l

View File

@@ -51,7 +51,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg")
}
if len(process.Input) == 0 && len(process.Output) == 0 {
if len(process.Input) == 0 || len(process.Output) == 0 {
return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined")
}
@@ -189,6 +189,14 @@ func (h *RestreamHandler) Update(c echo.Context) error {
Autostart: true,
}
current, err := h.restream.GetProcess(id)
if err != nil {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
// Prefill the config with the current values
process.Unmarshal(current.Config)
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}

View File

@@ -14,28 +14,29 @@ import (
type Level uint
const (
Lsilent Level = 0
Lerror Level = 1
Lwarn Level = 2
Linfo Level = 3
Ldebug Level = 4
Lsilent Level = 0b0000
Lerror Level = 0b0001
Lwarn Level = 0b0010
Linfo Level = 0b0100
Ldebug Level = 0b1000
)
// String returns a string representing the log level.
func (level Level) String() string {
names := []string{
"SILENT",
"ERROR",
"WARN",
"INFO",
"DEBUG",
}
if level > Ldebug {
switch level {
case Lsilent:
return "SILENT"
case Lerror:
return "ERROR"
case Lwarn:
return "WARN"
case Linfo:
return "INFO"
case Ldebug:
return "DEBUG"
default:
return `¯\_(ツ)_/¯`
}
return names[level]
}
func (level *Level) MarshalJSON() ([]byte, error) {
@@ -97,6 +98,9 @@ type Logger interface {
// Write implements the io.Writer interface such that it can be used in e.g. the
// the log/Logger facility. Messages will be printed with debug level.
Write(p []byte) (int, error)
// Close closes the underlying writer.
Close()
}
// logger is an implementation of the Logger interface.
@@ -184,6 +188,10 @@ func (l *logger) Write(p []byte) (int, error) {
return newEvent(l).Write(p)
}
func (l *logger) Close() {
l.output.Close()
}
type Event struct {
logger *logger
@@ -352,12 +360,6 @@ func (l *Event) Write(p []byte) (int, error) {
return len(p), nil
}
type Eventx struct {
Time time.Time `json:"ts"`
Level Level `json:"level"`
Component string `json:"component"`
Reference string `json:"ref"`
Message string `json:"message"`
Caller string `json:"caller"`
Detail interface{} `json:"detail"`
func (l *Event) Close() {
l.logger.Close()
}

View File

@@ -5,25 +5,25 @@ import (
"bytes"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLoglevelNames(t *testing.T) {
assert.Equal(t, "DEBUG", Ldebug.String())
assert.Equal(t, "ERROR", Lerror.String())
assert.Equal(t, "WARN", Lwarn.String())
assert.Equal(t, "INFO", Linfo.String())
assert.Equal(t, `SILENT`, Lsilent.String())
require.Equal(t, "DEBUG", Ldebug.String())
require.Equal(t, "ERROR", Lerror.String())
require.Equal(t, "WARN", Lwarn.String())
require.Equal(t, "INFO", Linfo.String())
require.Equal(t, `SILENT`, Lsilent.String())
}
func TestLogColorToNotTTY(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
w := NewConsoleWriter(writer, Linfo, true).(*syncWriter)
w := NewLevelWriter(NewConsoleWriter(writer, true), Linfo).(*levelWriter).writer.(*syncWriter)
formatter := w.writer.(*consoleWriter).formatter.(*consoleFormatter)
assert.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
require.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
}
func TestLogContext(t *testing.T) {
@@ -31,7 +31,7 @@ func TestLogContext(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("component").WithOutput(NewConsoleWriter(writer, Ldebug, false))
logger := New("component").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
logger.Debug().Log("debug")
logger.Info().Log("info")
@@ -53,19 +53,19 @@ func TestLogContext(t *testing.T) {
lenWithoutCtx := buffer.Len()
buffer.Reset()
assert.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
require.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
}
func TestLogClone(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
logger.Info().Log("info")
writer.Flush()
assert.Contains(t, buffer.String(), `component="test"`)
require.Contains(t, buffer.String(), `component="test"`)
buffer.Reset()
@@ -74,33 +74,33 @@ func TestLogClone(t *testing.T) {
logger2.Info().Log("info")
writer.Flush()
assert.Contains(t, buffer.String(), `component="tset"`)
require.Contains(t, buffer.String(), `component="tset"`)
}
func TestLogSilent(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Lsilent, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lsilent))
logger.Debug().Log("debug")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
}
@@ -108,26 +108,26 @@ func TestLogDebug(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Ldebug, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
logger.Debug().Log("debug")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -135,26 +135,26 @@ func TestLogInfo(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
logger.Debug().Log("debug")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -162,26 +162,26 @@ func TestLogWarn(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Lwarn, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lwarn))
logger.Debug().Log("debug")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -189,25 +189,25 @@ func TestLogError(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewConsoleWriter(writer, Lerror, false))
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lerror))
logger.Debug().Log("debug")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}

43
log/output.go Normal file
View File

@@ -0,0 +1,43 @@
package log
import (
"io"
"os"
"github.com/mattn/go-isatty"
)
type consoleOutput struct {
writer io.Writer
formatter Formatter
}
func NewConsoleOutput(w io.Writer, useColor bool) Writer {
writer := &consoleOutput{
writer: w,
}
color := useColor
if color {
if w, ok := w.(*os.File); ok {
if !isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd()) {
color = false
}
} else {
color = false
}
}
writer.formatter = NewConsoleFormatter(color)
return NewSyncWriter(writer)
}
func (w *consoleOutput) Write(e *Event) error {
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *consoleOutput) Close() {}

View File

@@ -13,18 +13,50 @@ import (
type Writer interface {
Write(e *Event) error
Close()
}
type discardWriter struct{}
func NewDiscardWriter() Writer {
return &discardWriter{}
}
func (w *discardWriter) Write(e *Event) error { return nil }
func (w *discardWriter) Close() {}
type levelWriter struct {
writer Writer
level Level
}
func NewLevelWriter(w Writer, level Level) Writer {
return &levelWriter{
writer: w,
level: level,
}
}
func (w *levelWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
return w.writer.Write(e)
}
func (w *levelWriter) Close() {
w.writer.Close()
}
type jsonWriter struct {
writer io.Writer
level Level
formatter Formatter
}
func NewJSONWriter(w io.Writer, level Level) Writer {
func NewJSONWriter(w io.Writer) Writer {
writer := &jsonWriter{
writer: w,
level: level,
formatter: NewJSONFormatter(),
}
@@ -32,25 +64,21 @@ func NewJSONWriter(w io.Writer, level Level) Writer {
}
func (w *jsonWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *jsonWriter) Close() {}
type consoleWriter struct {
writer io.Writer
level Level
formatter Formatter
}
func NewConsoleWriter(w io.Writer, level Level, useColor bool) Writer {
func NewConsoleWriter(w io.Writer, useColor bool) Writer {
writer := &consoleWriter{
writer: w,
level: level,
}
color := useColor
@@ -71,15 +99,13 @@ func NewConsoleWriter(w io.Writer, level Level, useColor bool) Writer {
}
func (w *consoleWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *consoleWriter) Close() {}
type topicWriter struct {
writer Writer
topics map[string]struct{}
@@ -112,6 +138,10 @@ func (w *topicWriter) Write(e *Event) error {
return err
}
func (w *topicWriter) Close() {
w.writer.Close()
}
type levelRewriter struct {
writer Writer
rules []levelRewriteRule
@@ -182,6 +212,10 @@ rules:
return w.writer.Write(e)
}
func (w *levelRewriter) Close() {
w.writer.Close()
}
type syncWriter struct {
mu sync.Mutex
writer Writer
@@ -193,11 +227,15 @@ func NewSyncWriter(writer Writer) Writer {
}
}
func (s *syncWriter) Write(e *Event) error {
s.mu.Lock()
defer s.mu.Unlock()
func (w *syncWriter) Write(e *Event) error {
w.mu.Lock()
defer w.mu.Unlock()
return s.writer.Write(e)
return w.writer.Write(e)
}
func (w *syncWriter) Close() {
w.writer.Close()
}
type multiWriter struct {
@@ -212,8 +250,8 @@ func NewMultiWriter(writer ...Writer) Writer {
return mw
}
func (m *multiWriter) Write(e *Event) error {
for _, w := range m.writer {
func (w *multiWriter) Write(e *Event) error {
for _, w := range w.writer {
if err := w.Write(e); err != nil {
return err
}
@@ -222,6 +260,12 @@ func (m *multiWriter) Write(e *Event) error {
return nil
}
func (w *multiWriter) Close() {
for _, w := range w.writer {
w.Close()
}
}
type BufferWriter interface {
Writer
Events() []*Event
@@ -230,13 +274,10 @@ type BufferWriter interface {
type bufferWriter struct {
lines *ring.Ring
lock sync.RWMutex
level Level
}
func NewBufferWriter(level Level, lines int) BufferWriter {
b := &bufferWriter{
level: level,
}
func NewBufferWriter(lines int) BufferWriter {
b := &bufferWriter{}
if lines > 0 {
b.lines = ring.New(lines)
@@ -245,33 +286,31 @@ func NewBufferWriter(level Level, lines int) BufferWriter {
return b
}
func (b *bufferWriter) Write(e *Event) error {
if b.level < e.Level || e.Level == Lsilent {
return nil
}
func (w *bufferWriter) Write(e *Event) error {
w.lock.Lock()
defer w.lock.Unlock()
b.lock.Lock()
defer b.lock.Unlock()
if b.lines != nil {
b.lines.Value = e.clone()
b.lines = b.lines.Next()
if w.lines != nil {
w.lines.Value = e.clone()
w.lines = w.lines.Next()
}
return nil
}
func (b *bufferWriter) Events() []*Event {
func (w *bufferWriter) Close() {}
func (w *bufferWriter) Events() []*Event {
var lines = []*Event{}
if b.lines == nil {
if w.lines == nil {
return lines
}
b.lock.RLock()
defer b.lock.RUnlock()
w.lock.RLock()
defer w.lock.RUnlock()
b.lines.Do(func(l interface{}) {
w.lines.Do(func(l interface{}) {
if l == nil {
return
}
@@ -281,3 +320,32 @@ func (b *bufferWriter) Events() []*Event {
return lines
}
type fileWriter struct {
writer *os.File
formatter Formatter
}
func NewFileWriter(path string, formatter Formatter) Writer {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR|os.O_SYNC, 0600)
if err != nil {
return NewDiscardWriter()
}
writer := &fileWriter{
writer: file,
formatter: formatter,
}
return NewSyncWriter(writer)
}
func (w *fileWriter) Write(e *Event) error {
_, err := w.writer.Write(append(w.formatter.Bytes(e), '\n'))
return err
}
func (w *fileWriter) Close() {
w.writer.Close()
}

17
main.go
View File

@@ -5,15 +5,26 @@ import (
"os/signal"
"github.com/datarhei/core/v16/app/api"
"github.com/datarhei/core/v16/config/store"
"github.com/datarhei/core/v16/log"
_ "github.com/joho/godotenv/autoload"
)
func main() {
logger := log.New("Core").WithOutput(log.NewConsoleWriter(os.Stderr, log.Lwarn, true))
logger := log.New("Core").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(
os.Stderr,
true,
),
log.Lwarn,
),
)
app, err := api.New(os.Getenv("CORE_CONFIGFILE"), os.Stderr)
configfile := store.Location(os.Getenv("CORE_CONFIGFILE"))
app, err := api.New(configfile, os.Stderr)
if err != nil {
logger.Error().WithError(err).Log("Failed to create new API")
os.Exit(1)
@@ -51,6 +62,8 @@ func main() {
signal.Notify(quit, os.Interrupt)
<-quit
logger.Close()
// Stop the app
app.Destroy()
}

View File

@@ -9,12 +9,13 @@ import (
type Replacer interface {
// RegisterTemplate registers a template for a specific placeholder. Template
// may contain placeholders as well of the form {name}. They will be replaced
// by the parameters of the placeholder (see Replace).
RegisterTemplate(placeholder, template string)
// by the parameters of the placeholder (see Replace). If a parameter is not of
// a template is not present, default values can be provided.
RegisterTemplate(placeholder, template string, defaults map[string]string)
// RegisterTemplateFunc does the same as RegisterTemplate, but the template
// is returned by the template function.
RegisterTemplateFunc(placeholder string, template func() string)
RegisterTemplateFunc(placeholder string, template func() string, defaults map[string]string)
// Replace replaces all occurences of placeholder in str with value. The placeholder is of the
// form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^
@@ -28,8 +29,13 @@ type Replacer interface {
Replace(str, placeholder, value string) string
}
type template struct {
fn func() string
defaults map[string]string
}
type replacer struct {
templates map[string]func() string
templates map[string]template
re *regexp.Regexp
templateRe *regexp.Regexp
@@ -38,7 +44,7 @@ type replacer struct {
// New returns a Replacer
func New() Replacer {
r := &replacer{
templates: make(map[string]func() string),
templates: make(map[string]template),
re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`),
templateRe: regexp.MustCompile(`{([a-z]+)}`),
}
@@ -46,12 +52,18 @@ func New() Replacer {
return r
}
func (r *replacer) RegisterTemplate(placeholder, template string) {
r.templates[placeholder] = func() string { return template }
func (r *replacer) RegisterTemplate(placeholder, tmpl string, defaults map[string]string) {
r.templates[placeholder] = template{
fn: func() string { return tmpl },
defaults: defaults,
}
}
func (r *replacer) RegisterTemplateFunc(placeholder string, template func() string) {
r.templates[placeholder] = template
func (r *replacer) RegisterTemplateFunc(placeholder string, tmplFn func() string, defaults map[string]string) {
r.templates[placeholder] = template{
fn: tmplFn,
defaults: defaults,
}
}
func (r *replacer) Replace(str, placeholder, value string) string {
@@ -63,16 +75,20 @@ func (r *replacer) Replace(str, placeholder, value string) string {
// We need a copy from the value
v := value
var tmpl template = template{
fn: func() string { return v },
}
// Check for a registered template
if len(v) == 0 {
tmplFunc, ok := r.templates[placeholder]
t, ok := r.templates[placeholder]
if ok {
v = tmplFunc()
tmpl = t
}
}
v = r.compileTemplate(v, matches[3])
v = tmpl.fn()
v = r.compileTemplate(v, matches[3], tmpl.defaults)
if len(matches[2]) != 0 {
// If there's a character to escape, we also have to escape the
@@ -97,13 +113,18 @@ func (r *replacer) Replace(str, placeholder, value string) string {
// placeholder name and will be replaced with the value. The resulting string is "Hello World!".
// If a placeholder name is not present in the params string, it will not be replaced. The key
// and values can be escaped as in net/url.QueryEscape.
func (r *replacer) compileTemplate(str, params string) string {
if len(params) == 0 {
func (r *replacer) compileTemplate(str, params string, defaults map[string]string) string {
if len(params) == 0 && len(defaults) == 0 {
return str
}
p := make(map[string]string)
// Copy the defaults
for key, value := range defaults {
p[key] = value
}
// taken from net/url.ParseQuery
for params != "" {
var key string

View File

@@ -34,7 +34,7 @@ func TestReplace(t *testing.T) {
func TestReplaceTemplate(t *testing.T) {
r := New()
r.RegisterTemplate("foobar", "Hello {who}! {what}?")
r.RegisterTemplate("foobar", "Hello {who}! {what}?", nil)
replaced := r.Replace("{foobar,who=World}", "foobar", "")
require.Equal(t, "Hello World! {what}?", replaced)
@@ -46,6 +46,20 @@ func TestReplaceTemplate(t *testing.T) {
require.Equal(t, "Hello World! E=mc\\\\:2?", replaced)
}
func TestReplaceTemplateDefaults(t *testing.T) {
r := New()
r.RegisterTemplate("foobar", "Hello {who}! {what}?", map[string]string{
"who": "someone",
"what": "something",
})
replaced := r.Replace("{foobar}", "foobar", "")
require.Equal(t, "Hello someone! something?", replaced)
replaced = r.Replace("{foobar,who=World}", "foobar", "")
require.Equal(t, "Hello World! something?", replaced)
}
func TestReplaceCompileTemplate(t *testing.T) {
samples := [][3]string{
{"Hello {who}!", "who=World", "Hello World!"},
@@ -58,7 +72,27 @@ func TestReplaceCompileTemplate(t *testing.T) {
r := New().(*replacer)
for _, e := range samples {
replaced := r.compileTemplate(e[0], e[1])
replaced := r.compileTemplate(e[0], e[1], nil)
require.Equal(t, e[2], replaced, e[0])
}
}
func TestReplaceCompileTemplateDefaults(t *testing.T) {
samples := [][3]string{
{"Hello {who}!", "", "Hello someone!"},
{"Hello {who}!", "who=World", "Hello World!"},
{"Hello {who}! {what}?", "who=World", "Hello World! something?"},
{"Hello {who}! {what}?", "who=World,what=Yeah", "Hello World! Yeah?"},
{"Hello {who}! {what}?", "who=World,what=", "Hello World! ?"},
}
r := New().(*replacer)
for _, e := range samples {
replaced := r.compileTemplate(e[0], e[1], map[string]string{
"who": "someone",
"what": "something",
})
require.Equal(t, e[2], replaced, e[0])
}
}

View File

@@ -381,7 +381,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
demuxer := &pktque.FilterDemuxer{
Filter: filters,

View File

@@ -9,6 +9,8 @@ import (
"net/http"
"strings"
"time"
"github.com/datarhei/core/v16/log"
)
type API interface {
@@ -19,6 +21,7 @@ type Config struct {
URL string
Token string
Client *http.Client
Logger log.Logger
}
type api struct {
@@ -29,6 +32,8 @@ type api struct {
accessTokenType string
client *http.Client
logger log.Logger
}
func New(config Config) (API, error) {
@@ -36,6 +41,11 @@ func New(config Config) (API, error) {
url: config.URL,
token: config.Token,
client: config.Client,
logger: config.Logger,
}
if a.logger == nil {
a.logger = log.New("")
}
if !strings.HasSuffix(a.url, "/") {
@@ -95,7 +105,7 @@ func (c *copyReader) Read(p []byte) (int, error) {
if err == io.EOF {
c.reader = c.copy
c.copy = new(bytes.Buffer)
c.copy = &bytes.Buffer{}
}
return i, err

View File

@@ -55,7 +55,7 @@ func New(config Config) (Service, error) {
}
if s.logger == nil {
s.logger = log.New("Service")
s.logger = log.New("")
}
s.logger = s.logger.WithField("url", config.URL)
@@ -214,7 +214,10 @@ func (s *service) collect() (time.Duration, error) {
return 15 * time.Minute, fmt.Errorf("failed to send monitor data to service: %w", err)
}
s.logger.Debug().WithField("next", r.Next).Log("Sent monitor data")
s.logger.Debug().WithFields(log.Fields{
"next": r.Next,
"data": data,
}).Log("Sent monitor data")
if r.Next == 0 {
r.Next = 5 * 60
@@ -230,6 +233,8 @@ func (s *service) Start() {
go s.tick(ctx, time.Second)
s.stopOnce = sync.Once{}
s.logger.Info().Log("Connected")
})
}
@@ -237,6 +242,8 @@ func (s *service) Stop() {
s.stopOnce.Do(func() {
s.stopTicker()
s.startOnce = sync.Once{}
s.logger.Info().Log("Disconnected")
})
}