4 Commits

Author SHA1 Message Date
Ingo Oppermann
1b80c4718e Merge branch 'dev' 2022-11-11 12:38:07 +01:00
Ingo Oppermann
8ba1c8c0ac Merge branch 'dev' 2022-11-09 15:17:16 +01:00
Ingo Oppermann
bae68f8d31 Merge branch 'dev' 2022-09-30 15:05:20 +02:00
Jan Stabenow
da833a3602 Mod updates build env. 2022-09-30 10:14:03 +02:00
26 changed files with 223 additions and 701 deletions

View File

@@ -3,20 +3,20 @@ name: tests
on: [push, pull_request]
jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- uses: actions/setup-go@v2
with:
go-version: "1.19"
- name: Run coverage
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unit-linux
build:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 2
- uses: actions/setup-go@v2
with:
go-version: '1.18'
- name: Run coverage
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
token: ${{ secrets.CODECOV_TOKEN }}
files: coverage.out
flags: unit-linux

View File

@@ -6,11 +6,9 @@ import (
"fmt"
"io"
golog "log"
"math"
gonet "net"
gohttp "net/http"
"net/url"
"os"
"path/filepath"
"runtime/debug"
"sync"
@@ -41,7 +39,6 @@ import (
"github.com/datarhei/core/v16/update"
"github.com/caddyserver/certmagic"
"go.uber.org/zap"
)
// The API interface is the implementation for the restreamer API.
@@ -148,12 +145,7 @@ func (a *api) Reload() error {
a.errorChan = make(chan error, 1)
}
logger := log.New("Core").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(a.log.writer, true),
log.Lwarn,
),
)
logger := log.New("Core").WithOutput(log.NewConsoleWriter(a.log.writer, log.Lwarn, true))
store, err := configstore.NewJSON(a.config.path, func() {
a.errorChan <- ErrConfigReload
@@ -189,54 +181,31 @@ func (a *api) Reload() error {
break
}
buffer := log.NewBufferWriter(cfg.Log.MaxLines)
var writer log.Writer
buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines)
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(
log.NewLevelWriter(
log.NewLevelRewriter(
log.NewMultiWriter(
log.NewTopicWriter(
writer,
cfg.Log.Topics,
),
buffer,
),
[]log.LevelRewriteRule{
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
{
Level: log.Ldebug,
Component: "HTTP",
Match: map[string]string{
"client": "^(::1|127.0.0.1)$",
"method": "^(PUT|POST|DELETE)$",
"status_text": "^Unauthorized$",
"user_agent": "^Lavf/",
},
},
},
logger = logger.WithOutput(log.NewLevelRewriter(
log.NewMultiWriter(
log.NewTopicWriter(
log.NewConsoleWriter(a.log.writer, loglevel, true),
cfg.Log.Topics,
),
loglevel,
buffer,
),
)
[]log.LevelRewriteRule{
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
{
Level: log.Ldebug,
Component: "HTTP",
Match: map[string]string{
"client": "^(::1|127.0.0.1)$",
"method": "^(PUT|POST|DELETE)$",
"status_text": "^Unauthorized$",
"user_agent": "^Lavf/",
},
},
},
))
logfields := log.Fields{
"application": app.Name,
@@ -258,8 +227,6 @@ func (a *api) Reload() error {
logger.Info().WithFields(logfields).Log("")
logger.Info().WithField("path", a.config.path).Log("Read config file")
configlogger := logger.WithComponent("Config")
cfg.Messages(func(level string, v configvars.Variable, message string) {
configlogger = configlogger.WithFields(log.Fields{
@@ -476,8 +443,8 @@ func (a *api) start() error {
a.replacer = replace.New()
{
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base(), nil)
a.replacer.RegisterTemplate("memfs", a.memfs.Base(), nil)
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base())
a.replacer.RegisterTemplate("memfs", a.memfs.Base())
host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address)
if len(host) == 0 {
@@ -494,23 +461,21 @@ func (a *api) start() error {
template += "?token=" + cfg.RTMP.Token
}
a.replacer.RegisterTemplate("rtmp", template, nil)
a.replacer.RegisterTemplate("rtmp", template)
host, port, _ = gonet.SplitHostPort(cfg.SRT.Address)
if len(host) == 0 {
host = "localhost"
}
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name},mode:{mode}"
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&streamid=#!:m={mode},r={name}"
if len(cfg.SRT.Token) != 0 {
template += ",token:" + cfg.SRT.Token
template += ",token=" + cfg.SRT.Token
}
if len(cfg.SRT.Passphrase) != 0 {
template += "&passphrase=" + cfg.SRT.Passphrase
}
a.replacer.RegisterTemplate("srt", template, map[string]string{
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
})
a.replacer.RegisterTemplate("srt", template)
}
store := store.NewJSONStore(store.JSONConfig{
@@ -690,28 +655,26 @@ func (a *api) start() error {
if cfg.TLS.Enable {
if cfg.TLS.Auto {
if len(cfg.Host.Name) == 0 {
return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME")
return fmt.Errorf("at least one host must be provided in host.name or RS_HOST_NAME")
}
certmagic.Default.Storage = &certmagic.FileStorage{
Path: cfg.DB.Dir + "/cert",
}
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
certmagic.Default.Logger = zap.NewNop()
certmagic.DefaultACME.Agreed = true
certmagic.DefaultACME.Email = cfg.TLS.Email
certmagic.DefaultACME.CA = certmagic.LetsEncryptProductionCA
certmagic.DefaultACME.DisableHTTPChallenge = false
certmagic.DefaultACME.DisableTLSALPNChallenge = true
certmagic.DefaultACME.Logger = zap.NewNop()
certmagic.DefaultACME.Logger = nil
certmagic.Default.Storage = &certmagic.FileStorage{
Path: cfg.DB.Dir + "/cert",
}
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
certmagic.Default.Logger = nil
magic := certmagic.NewDefault()
acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME)
acme.Logger = zap.NewNop()
magic.Issuers = []certmagic.Issuer{acme}
magic.Logger = zap.NewNop()
autocertManager = magic
@@ -750,19 +713,6 @@ func (a *api) start() error {
if err != nil {
logger.Error().WithField("error", err).Log("Failed to acquire certificate")
certerror = true
/*
problems, err := letsdebug.Check(host, letsdebug.HTTP01)
if err != nil {
logger.Error().WithField("error", err).Log("Failed to debug certificate acquisition")
}
for _, p := range problems {
logger.Error().WithFields(log.Fields{
"name": p.Name,
"detail": p.Detail,
}).Log(p.Explanation)
}
*/
break
}
@@ -1151,12 +1101,6 @@ func (a *api) start() error {
}(ctx)
}
if cfg.Debug.MemoryLimit > 0 {
debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024)
} else {
debug.SetMemoryLimit(math.MaxInt64)
}
// Start the restream processes
restream.Start()
@@ -1326,6 +1270,4 @@ func (a *api) Destroy() {
a.memfs.DeleteAll()
a.memfs = nil
}
a.log.logger.core.Close()
}

View File

@@ -17,19 +17,12 @@ import (
)
func main() {
logger := log.New("Migration").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(os.Stderr, true),
log.Linfo,
),
).WithFields(log.Fields{
logger := log.New("Migration").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithFields(log.Fields{
"from": "ffmpeg4",
"to": "ffmpeg5",
})
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
@@ -70,27 +63,6 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
return fmt.Errorf("the configuration contains errors: %v", messages)
}
var writer log.Writer
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(writer)
ff, err := ffmpeg.New(ffmpeg.Config{
Binary: cfg.FFmpeg.Binary,
})

View File

@@ -13,16 +13,9 @@ import (
)
func main() {
logger := log.New("Import").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(os.Stderr, true),
log.Linfo,
),
).WithField("version", "v1")
logger := log.New("Import").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithField("version", "v1")
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
configstore, err := cfgstore.NewJSON(configfile, nil)
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
if err != nil {
logger.Error().WithError(err).Log("Loading configuration failed")
os.Exit(1)
@@ -38,6 +31,8 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
logger = log.New("")
}
logger.Info().Log("Database import")
cfg := configstore.Get()
// Merging the persisted config with the environment variables
@@ -63,27 +58,6 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
return fmt.Errorf("the configuration contains errors: %v", messages)
}
var writer log.Writer
if cfg.Log.Target.Output == "stdout" {
writer = log.NewConsoleWriter(
os.Stdout,
true,
)
} else if cfg.Log.Target.Output == "file" {
writer = log.NewFileWriter(
cfg.Log.Target.Path,
log.NewJSONFormatter(),
)
} else {
writer = log.NewConsoleWriter(
os.Stderr,
true,
)
}
logger = logger.WithOutput(writer)
logger.Info().Log("Checking for database ...")
// Check if there's a v1.json from the old Restreamer

View File

@@ -141,8 +141,6 @@ func (d *Config) init() {
d.vars.Register(value.NewString(&d.Log.Level, "info"), "log.level", "CORE_LOG_LEVEL", nil, "Loglevel: silent, error, warn, info, debug", false, false)
d.vars.Register(value.NewStringList(&d.Log.Topics, []string{}, ","), "log.topics", "CORE_LOG_TOPICS", nil, "Show only selected log topics", false, false)
d.vars.Register(value.NewInt(&d.Log.MaxLines, 1000), "log.max_lines", "CORE_LOG_MAXLINES", nil, "Number of latest log lines to keep in memory", false, false)
d.vars.Register(value.NewString(&d.Log.Target.Output, "stderr"), "log.target.output", "CORE_LOG_TARGET_OUTPUT", nil, "Where to write the logs to: stdout, stderr, file", false, false)
d.vars.Register(value.NewString(&d.Log.Target.Path, ""), "log.target.path", "CORE_LOG_TARGET_PATH", nil, "Path to log file if output is 'file'", false, false)
// DB
d.vars.Register(value.NewMustDir(&d.DB.Dir, "./config"), "db.dir", "CORE_DB_DIR", nil, "Directory for holding the operational data", false, false)
@@ -234,7 +232,6 @@ func (d *Config) init() {
// Debug
d.vars.Register(value.NewBool(&d.Debug.Profiling, false), "debug.profiling", "CORE_DEBUG_PROFILING", nil, "Enable profiling endpoint on /profiling", false, false)
d.vars.Register(value.NewInt(&d.Debug.ForceGC, 0), "debug.force_gc", "CORE_DEBUG_FORCEGC", nil, "Number of seconds between forcing GC to return memory to the OS", false, false)
d.vars.Register(value.NewInt64(&d.Debug.MemoryLimit, 0), "debug.memory_limit_mbytes", "CORE_DEBUG_MEMORY_LIMIT_MBYTES", nil, "Impose a soft memory limit for the core, in megabytes", false, false)
// Metrics
d.vars.Register(value.NewBool(&d.Metrics.Enable, false), "metrics.enable", "CORE_METRICS_ENABLE", nil, "Enable collecting historic metrics data", false, false)

View File

@@ -22,10 +22,6 @@ type Data struct {
Level string `json:"level" enums:"debug,info,warn,error,silent" jsonschema:"enum=debug,enum=info,enum=warn,enum=error,enum=silent"`
Topics []string `json:"topics"`
MaxLines int `json:"max_lines"`
Target struct {
Output string `json:"name"`
Path string `json:"path"`
} `json:"target"` // discard, stderr, stdout, file:/path/to/file.log
} `json:"log"`
DB struct {
Dir string `json:"dir"`
@@ -139,9 +135,8 @@ type Data struct {
MaxPort int `json:"max_port"`
} `json:"playout"`
Debug struct {
Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc"`
MemoryLimit int64 `json:"memory_limit_mbytes"`
Profiling bool `json:"profiling"`
ForceGC int `json:"force_gc"`
} `json:"debug"`
Metrics struct {
Enable bool `json:"enable"`
@@ -186,6 +181,7 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Address = d.Address
data.CheckForUpdates = d.CheckForUpdates
data.Log = d.Log
data.DB = d.DB
data.Host = d.Host
data.API = d.API
@@ -193,11 +189,14 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Debug = d.Debug
data.Metrics = d.Metrics
data.Sessions = d.Sessions
data.Service = d.Service
data.Router = d.Router
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Host.Name = copy.Slice(d.Host.Name)
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
@@ -229,16 +228,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
data.Storage.Memory = d.Storage.Memory
// Actual changes
data.Log.Level = d.Log.Level
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Log.MaxLines = d.Log.MaxLines
data.Log.Target.Output = "stderr"
data.Log.Target.Path = ""
data.Debug.Profiling = d.Debug.Profiling
data.Debug.ForceGC = d.Debug.ForceGC
data.Debug.MemoryLimit = 0
data.TLS.Enable = d.TLS.Enable
data.TLS.Address = d.TLS.Address
data.TLS.Auto = d.TLS.Auto
@@ -270,6 +259,7 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Address = d.Address
data.CheckForUpdates = d.CheckForUpdates
data.Log = d.Log
data.DB = d.DB
data.Host = d.Host
data.API = d.API
@@ -277,11 +267,14 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.SRT = d.SRT
data.FFmpeg = d.FFmpeg
data.Playout = d.Playout
data.Debug = d.Debug
data.Metrics = d.Metrics
data.Sessions = d.Sessions
data.Service = d.Service
data.Router = d.Router
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Host.Name = copy.Slice(d.Host.Name)
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
@@ -306,13 +299,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
data.Router.Routes = copy.StringMap(d.Router.Routes)
// Actual changes
data.Log.Level = d.Log.Level
data.Log.Topics = copy.Slice(d.Log.Topics)
data.Log.MaxLines = d.Log.MaxLines
data.Debug.Profiling = d.Debug.Profiling
data.Debug.ForceGC = d.Debug.ForceGC
data.TLS.Enable = d.TLS.Enable
data.TLS.Address = d.TLS.Address
data.TLS.Auto = d.TLS.Auto

View File

@@ -118,10 +118,6 @@ func (c *jsonStore) load(cfg *config.Config) error {
return err
}
if len(jsondata) == 0 {
return nil
}
data, err := migrate(jsondata)
if err != nil {
return err

View File

@@ -1,53 +0,0 @@
package store
import (
"os"
"path"
)
// Location returns the path to the config file. If no path is provided,
// different standard location will be probed:
// - os.UserConfigDir() + /datarhei-core/config.js
// - os.UserHomeDir() + /.config/datarhei-core/config.js
// - ./config/config.js
// If the config doesn't exist in none of these locations, it will be assumed
// at ./config/config.js
func Location(filepath string) string {
configfile := filepath
if len(configfile) != 0 {
return configfile
}
locations := []string{}
if dir, err := os.UserConfigDir(); err == nil {
locations = append(locations, dir+"/datarhei-core/config.js")
}
if dir, err := os.UserHomeDir(); err == nil {
locations = append(locations, dir+"/.config/datarhei-core/config.js")
}
locations = append(locations, "./config/config.js")
for _, path := range locations {
info, err := os.Stat(path)
if err != nil {
continue
}
if info.IsDir() {
continue
}
configfile = path
}
if len(configfile) == 0 {
configfile = "./config/config.js"
}
os.MkdirAll(path.Dir(configfile), 0740)
return configfile
}

View File

@@ -4,7 +4,6 @@ import (
"encoding/base64"
"encoding/json"
"fmt"
"net/url"
"strings"
)
@@ -17,28 +16,6 @@ type Auth0Tenant struct {
Users []string `json:"users"`
}
func (a *Auth0Tenant) String() string {
u := url.URL{
Scheme: "auth0",
Host: a.Domain,
}
if len(a.ClientID) != 0 {
u.User = url.User(a.ClientID)
}
q := url.Values{}
q.Set("aud", a.Audience)
for _, user := range a.Users {
q.Add("user", user)
}
u.RawQuery = q.Encode()
return u.String()
}
type TenantList struct {
p *[]Auth0Tenant
separator string
@@ -55,34 +32,18 @@ func NewTenantList(p *[]Auth0Tenant, val []Auth0Tenant, separator string) *Tenan
return v
}
// Set allows to set a tenant list in two formats:
// - a separator separated list of bas64 encoded Auth0Tenant JSON objects
// - a separator separated list of Auth0Tenant in URL representation: auth0://[clientid]@[domain]?aud=[audience]&user=...&user=...
func (s *TenantList) Set(val string) error {
list := []Auth0Tenant{}
for i, elm := range strings.Split(val, s.separator) {
data, err := base64.StdEncoding.DecodeString(elm)
if err != nil {
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
}
t := Auth0Tenant{}
if strings.HasPrefix(elm, "auth0://") {
data, err := url.Parse(elm)
if err != nil {
return fmt.Errorf("invalid url encoding of tenant %d: %w", i, err)
}
t.Domain = data.Host
t.ClientID = data.User.Username()
t.Audience = data.Query().Get("aud")
t.Users = data.Query()["user"]
} else {
data, err := base64.StdEncoding.DecodeString(elm)
if err != nil {
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
}
if err := json.Unmarshal(data, &t); err != nil {
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
}
if err := json.Unmarshal(data, &t); err != nil {
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
}
list = append(list, t)
@@ -101,10 +62,10 @@ func (s *TenantList) String() string {
list := []string{}
for _, t := range *s.p {
list = append(list, t.String())
list = append(list, fmt.Sprintf("%s (%d users)", t.Domain, len(t.Users)))
}
return strings.Join(list, s.separator)
return strings.Join(list, ",")
}
func (s *TenantList) Validate() error {

View File

@@ -1,43 +0,0 @@
package value
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAuth0Value(t *testing.T) {
tenants := []Auth0Tenant{}
v := NewTenantList(&tenants, nil, " ")
require.Equal(t, "(empty)", v.String())
v.Set("auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3")
require.Equal(t, []Auth0Tenant{
{
Domain: "domain",
ClientID: "clientid",
Audience: "audience",
Users: []string{"user1", "user2"},
},
{
Domain: "domain2",
Audience: "audience2",
Users: []string{"user3"},
},
}, tenants)
require.Equal(t, "auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3", v.String())
require.NoError(t, v.Validate())
v.Set("eyJkb21haW4iOiJkYXRhcmhlaS5ldS5hdXRoMC5jb20iLCJhdWRpZW5jZSI6Imh0dHBzOi8vZGF0YXJoZWkuY29tL2NvcmUiLCJ1c2VycyI6WyJhdXRoMHx4eHgiXX0=")
require.Equal(t, []Auth0Tenant{
{
Domain: "datarhei.eu.auth0.com",
ClientID: "",
Audience: "https://datarhei.com/core",
Users: []string{"auth0|xxx"},
},
}, tenants)
require.Equal(t, "auth0://datarhei.eu.auth0.com?aud=https%3A%2F%2Fdatarhei.com%2Fcore&user=auth0%7Cxxx", v.String())
require.NoError(t, v.Validate())
}

View File

@@ -34,10 +34,6 @@ func (u *MustDir) Validate() error {
return fmt.Errorf("path name must not be empty")
}
if err := os.MkdirAll(val, 0750); err != nil {
return fmt.Errorf("%s can't be created (%w)", val, err)
}
finfo, err := os.Stat(val)
if err != nil {
return fmt.Errorf("%s does not exist", val)

View File

@@ -3,7 +3,7 @@ package value
import (
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
)
func TestIntValue(t *testing.T) {
@@ -11,19 +11,19 @@ func TestIntValue(t *testing.T) {
ivar := NewInt(&i, 11)
require.Equal(t, "11", ivar.String())
require.Equal(t, nil, ivar.Validate())
require.Equal(t, false, ivar.IsEmpty())
assert.Equal(t, "11", ivar.String())
assert.Equal(t, nil, ivar.Validate())
assert.Equal(t, false, ivar.IsEmpty())
i = 42
require.Equal(t, "42", ivar.String())
require.Equal(t, nil, ivar.Validate())
require.Equal(t, false, ivar.IsEmpty())
assert.Equal(t, "42", ivar.String())
assert.Equal(t, nil, ivar.Validate())
assert.Equal(t, false, ivar.IsEmpty())
ivar.Set("77")
require.Equal(t, int(77), i)
assert.Equal(t, int(77), i)
}
type testdata struct {
@@ -37,22 +37,22 @@ func TestCopyStruct(t *testing.T) {
NewInt(&data1.value1, 1)
NewInt(&data1.value2, 2)
require.Equal(t, int(1), data1.value1)
require.Equal(t, int(2), data1.value2)
assert.Equal(t, int(1), data1.value1)
assert.Equal(t, int(2), data1.value2)
data2 := testdata{}
val21 := NewInt(&data2.value1, 3)
val22 := NewInt(&data2.value2, 4)
require.Equal(t, int(3), data2.value1)
require.Equal(t, int(4), data2.value2)
assert.Equal(t, int(3), data2.value1)
assert.Equal(t, int(4), data2.value2)
data2 = data1
require.Equal(t, int(1), data2.value1)
require.Equal(t, int(2), data2.value2)
assert.Equal(t, int(1), data2.value1)
assert.Equal(t, int(2), data2.value2)
require.Equal(t, "1", val21.String())
require.Equal(t, "2", val22.String())
assert.Equal(t, "1", val21.String())
assert.Equal(t, "2", val22.String())
}

2
go.mod
View File

@@ -26,7 +26,6 @@ require (
github.com/swaggo/swag v1.8.7
github.com/vektah/gqlparser/v2 v2.5.1
github.com/xeipuuv/gojsonschema v1.2.0
go.uber.org/zap v1.23.0
golang.org/x/mod v0.6.0
)
@@ -80,6 +79,7 @@ require (
github.com/yusufpapurcu/wmi v1.2.2 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.23.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sys v0.1.0 // indirect

View File

@@ -12,7 +12,7 @@ import (
func (r *queryResolver) Log(ctx context.Context) ([]string, error) {
if r.LogBuffer == nil {
r.LogBuffer = log.NewBufferWriter(1)
r.LogBuffer = log.NewBufferWriter(log.Lsilent, 1)
}
events := r.LogBuffer.Events()

View File

@@ -22,7 +22,7 @@ func NewLog(buffer log.BufferWriter) *LogHandler {
}
if l.buffer == nil {
l.buffer = log.NewBufferWriter(1)
l.buffer = log.NewBufferWriter(log.Lsilent, 1)
}
return l

View File

@@ -51,7 +51,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg")
}
if len(process.Input) == 0 || len(process.Output) == 0 {
if len(process.Input) == 0 && len(process.Output) == 0 {
return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined")
}
@@ -189,14 +189,6 @@ func (h *RestreamHandler) Update(c echo.Context) error {
Autostart: true,
}
current, err := h.restream.GetProcess(id)
if err != nil {
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
}
// Prefill the config with the current values
process.Unmarshal(current.Config)
if err := util.ShouldBindJSON(c, &process); err != nil {
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
}

View File

@@ -14,29 +14,28 @@ import (
type Level uint
const (
Lsilent Level = 0b0000
Lerror Level = 0b0001
Lwarn Level = 0b0010
Linfo Level = 0b0100
Ldebug Level = 0b1000
Lsilent Level = 0
Lerror Level = 1
Lwarn Level = 2
Linfo Level = 3
Ldebug Level = 4
)
// String returns a string representing the log level.
func (level Level) String() string {
switch level {
case Lsilent:
return "SILENT"
case Lerror:
return "ERROR"
case Lwarn:
return "WARN"
case Linfo:
return "INFO"
case Ldebug:
return "DEBUG"
default:
names := []string{
"SILENT",
"ERROR",
"WARN",
"INFO",
"DEBUG",
}
if level > Ldebug {
return `¯\_(ツ)_/¯`
}
return names[level]
}
func (level *Level) MarshalJSON() ([]byte, error) {
@@ -98,9 +97,6 @@ type Logger interface {
// Write implements the io.Writer interface such that it can be used in e.g. the
// the log/Logger facility. Messages will be printed with debug level.
Write(p []byte) (int, error)
// Close closes the underlying writer.
Close()
}
// logger is an implementation of the Logger interface.
@@ -188,10 +184,6 @@ func (l *logger) Write(p []byte) (int, error) {
return newEvent(l).Write(p)
}
func (l *logger) Close() {
l.output.Close()
}
type Event struct {
logger *logger
@@ -360,6 +352,12 @@ func (l *Event) Write(p []byte) (int, error) {
return len(p), nil
}
func (l *Event) Close() {
l.logger.Close()
type Eventx struct {
Time time.Time `json:"ts"`
Level Level `json:"level"`
Component string `json:"component"`
Reference string `json:"ref"`
Message string `json:"message"`
Caller string `json:"caller"`
Detail interface{} `json:"detail"`
}

View File

@@ -5,25 +5,25 @@ import (
"bytes"
"testing"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/assert"
)
func TestLoglevelNames(t *testing.T) {
require.Equal(t, "DEBUG", Ldebug.String())
require.Equal(t, "ERROR", Lerror.String())
require.Equal(t, "WARN", Lwarn.String())
require.Equal(t, "INFO", Linfo.String())
require.Equal(t, `SILENT`, Lsilent.String())
assert.Equal(t, "DEBUG", Ldebug.String())
assert.Equal(t, "ERROR", Lerror.String())
assert.Equal(t, "WARN", Lwarn.String())
assert.Equal(t, "INFO", Linfo.String())
assert.Equal(t, `SILENT`, Lsilent.String())
}
func TestLogColorToNotTTY(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
w := NewLevelWriter(NewConsoleWriter(writer, true), Linfo).(*levelWriter).writer.(*syncWriter)
w := NewConsoleWriter(writer, Linfo, true).(*syncWriter)
formatter := w.writer.(*consoleWriter).formatter.(*consoleFormatter)
require.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
assert.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
}
func TestLogContext(t *testing.T) {
@@ -31,7 +31,7 @@ func TestLogContext(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("component").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
logger := New("component").WithOutput(NewConsoleWriter(writer, Ldebug, false))
logger.Debug().Log("debug")
logger.Info().Log("info")
@@ -53,19 +53,19 @@ func TestLogContext(t *testing.T) {
lenWithoutCtx := buffer.Len()
buffer.Reset()
require.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
assert.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
}
func TestLogClone(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
logger.Info().Log("info")
writer.Flush()
require.Contains(t, buffer.String(), `component="test"`)
assert.Contains(t, buffer.String(), `component="test"`)
buffer.Reset()
@@ -74,33 +74,33 @@ func TestLogClone(t *testing.T) {
logger2.Info().Log("info")
writer.Flush()
require.Contains(t, buffer.String(), `component="tset"`)
assert.Contains(t, buffer.String(), `component="tset"`)
}
func TestLogSilent(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lsilent))
logger := New("test").WithOutput(NewConsoleWriter(writer, Lsilent, false))
logger.Debug().Log("debug")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
}
@@ -108,26 +108,26 @@ func TestLogDebug(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
logger := New("test").WithOutput(NewConsoleWriter(writer, Ldebug, false))
logger.Debug().Log("debug")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -135,26 +135,26 @@ func TestLogInfo(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
logger.Debug().Log("debug")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -162,26 +162,26 @@ func TestLogWarn(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lwarn))
logger := New("test").WithOutput(NewConsoleWriter(writer, Lwarn, false))
logger.Debug().Log("debug")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}
@@ -189,25 +189,25 @@ func TestLogError(t *testing.T) {
var buffer bytes.Buffer
writer := bufio.NewWriter(&buffer)
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lerror))
logger := New("test").WithOutput(NewConsoleWriter(writer, Lerror, false))
logger.Debug().Log("debug")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Info().Log("info")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Warn().Log("warn")
writer.Flush()
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
buffer.Reset()
logger.Error().Log("error")
writer.Flush()
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
buffer.Reset()
}

View File

@@ -1,43 +0,0 @@
package log
import (
"io"
"os"
"github.com/mattn/go-isatty"
)
type consoleOutput struct {
writer io.Writer
formatter Formatter
}
func NewConsoleOutput(w io.Writer, useColor bool) Writer {
writer := &consoleOutput{
writer: w,
}
color := useColor
if color {
if w, ok := w.(*os.File); ok {
if !isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd()) {
color = false
}
} else {
color = false
}
}
writer.formatter = NewConsoleFormatter(color)
return NewSyncWriter(writer)
}
func (w *consoleOutput) Write(e *Event) error {
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *consoleOutput) Close() {}

View File

@@ -13,50 +13,18 @@ import (
type Writer interface {
Write(e *Event) error
Close()
}
type discardWriter struct{}
func NewDiscardWriter() Writer {
return &discardWriter{}
}
func (w *discardWriter) Write(e *Event) error { return nil }
func (w *discardWriter) Close() {}
type levelWriter struct {
writer Writer
level Level
}
func NewLevelWriter(w Writer, level Level) Writer {
return &levelWriter{
writer: w,
level: level,
}
}
func (w *levelWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
return w.writer.Write(e)
}
func (w *levelWriter) Close() {
w.writer.Close()
}
type jsonWriter struct {
writer io.Writer
level Level
formatter Formatter
}
func NewJSONWriter(w io.Writer) Writer {
func NewJSONWriter(w io.Writer, level Level) Writer {
writer := &jsonWriter{
writer: w,
level: level,
formatter: NewJSONFormatter(),
}
@@ -64,21 +32,25 @@ func NewJSONWriter(w io.Writer) Writer {
}
func (w *jsonWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *jsonWriter) Close() {}
type consoleWriter struct {
writer io.Writer
level Level
formatter Formatter
}
func NewConsoleWriter(w io.Writer, useColor bool) Writer {
func NewConsoleWriter(w io.Writer, level Level, useColor bool) Writer {
writer := &consoleWriter{
writer: w,
level: level,
}
color := useColor
@@ -99,13 +71,15 @@ func NewConsoleWriter(w io.Writer, useColor bool) Writer {
}
func (w *consoleWriter) Write(e *Event) error {
if w.level < e.Level || e.Level == Lsilent {
return nil
}
_, err := w.writer.Write(w.formatter.Bytes(e))
return err
}
func (w *consoleWriter) Close() {}
type topicWriter struct {
writer Writer
topics map[string]struct{}
@@ -138,10 +112,6 @@ func (w *topicWriter) Write(e *Event) error {
return err
}
func (w *topicWriter) Close() {
w.writer.Close()
}
type levelRewriter struct {
writer Writer
rules []levelRewriteRule
@@ -212,10 +182,6 @@ rules:
return w.writer.Write(e)
}
func (w *levelRewriter) Close() {
w.writer.Close()
}
type syncWriter struct {
mu sync.Mutex
writer Writer
@@ -227,15 +193,11 @@ func NewSyncWriter(writer Writer) Writer {
}
}
func (w *syncWriter) Write(e *Event) error {
w.mu.Lock()
defer w.mu.Unlock()
func (s *syncWriter) Write(e *Event) error {
s.mu.Lock()
defer s.mu.Unlock()
return w.writer.Write(e)
}
func (w *syncWriter) Close() {
w.writer.Close()
return s.writer.Write(e)
}
type multiWriter struct {
@@ -250,8 +212,8 @@ func NewMultiWriter(writer ...Writer) Writer {
return mw
}
func (w *multiWriter) Write(e *Event) error {
for _, w := range w.writer {
func (m *multiWriter) Write(e *Event) error {
for _, w := range m.writer {
if err := w.Write(e); err != nil {
return err
}
@@ -260,12 +222,6 @@ func (w *multiWriter) Write(e *Event) error {
return nil
}
func (w *multiWriter) Close() {
for _, w := range w.writer {
w.Close()
}
}
type BufferWriter interface {
Writer
Events() []*Event
@@ -274,10 +230,13 @@ type BufferWriter interface {
type bufferWriter struct {
lines *ring.Ring
lock sync.RWMutex
level Level
}
func NewBufferWriter(lines int) BufferWriter {
b := &bufferWriter{}
func NewBufferWriter(level Level, lines int) BufferWriter {
b := &bufferWriter{
level: level,
}
if lines > 0 {
b.lines = ring.New(lines)
@@ -286,31 +245,33 @@ func NewBufferWriter(lines int) BufferWriter {
return b
}
func (w *bufferWriter) Write(e *Event) error {
w.lock.Lock()
defer w.lock.Unlock()
func (b *bufferWriter) Write(e *Event) error {
if b.level < e.Level || e.Level == Lsilent {
return nil
}
if w.lines != nil {
w.lines.Value = e.clone()
w.lines = w.lines.Next()
b.lock.Lock()
defer b.lock.Unlock()
if b.lines != nil {
b.lines.Value = e.clone()
b.lines = b.lines.Next()
}
return nil
}
func (w *bufferWriter) Close() {}
func (w *bufferWriter) Events() []*Event {
func (b *bufferWriter) Events() []*Event {
var lines = []*Event{}
if w.lines == nil {
if b.lines == nil {
return lines
}
w.lock.RLock()
defer w.lock.RUnlock()
b.lock.RLock()
defer b.lock.RUnlock()
w.lines.Do(func(l interface{}) {
b.lines.Do(func(l interface{}) {
if l == nil {
return
}
@@ -320,32 +281,3 @@ func (w *bufferWriter) Events() []*Event {
return lines
}
type fileWriter struct {
writer *os.File
formatter Formatter
}
func NewFileWriter(path string, formatter Formatter) Writer {
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR|os.O_SYNC, 0600)
if err != nil {
return NewDiscardWriter()
}
writer := &fileWriter{
writer: file,
formatter: formatter,
}
return NewSyncWriter(writer)
}
func (w *fileWriter) Write(e *Event) error {
_, err := w.writer.Write(append(w.formatter.Bytes(e), '\n'))
return err
}
func (w *fileWriter) Close() {
w.writer.Close()
}

17
main.go
View File

@@ -5,26 +5,15 @@ import (
"os/signal"
"github.com/datarhei/core/v16/app/api"
"github.com/datarhei/core/v16/config/store"
"github.com/datarhei/core/v16/log"
_ "github.com/joho/godotenv/autoload"
)
func main() {
logger := log.New("Core").WithOutput(
log.NewLevelWriter(
log.NewConsoleWriter(
os.Stderr,
true,
),
log.Lwarn,
),
)
logger := log.New("Core").WithOutput(log.NewConsoleWriter(os.Stderr, log.Lwarn, true))
configfile := store.Location(os.Getenv("CORE_CONFIGFILE"))
app, err := api.New(configfile, os.Stderr)
app, err := api.New(os.Getenv("CORE_CONFIGFILE"), os.Stderr)
if err != nil {
logger.Error().WithError(err).Log("Failed to create new API")
os.Exit(1)
@@ -62,8 +51,6 @@ func main() {
signal.Notify(quit, os.Interrupt)
<-quit
logger.Close()
// Stop the app
app.Destroy()
}

View File

@@ -9,13 +9,12 @@ import (
type Replacer interface {
// RegisterTemplate registers a template for a specific placeholder. Template
// may contain placeholders as well of the form {name}. They will be replaced
// by the parameters of the placeholder (see Replace). If a parameter is not of
// a template is not present, default values can be provided.
RegisterTemplate(placeholder, template string, defaults map[string]string)
// by the parameters of the placeholder (see Replace).
RegisterTemplate(placeholder, template string)
// RegisterTemplateFunc does the same as RegisterTemplate, but the template
// is returned by the template function.
RegisterTemplateFunc(placeholder string, template func() string, defaults map[string]string)
RegisterTemplateFunc(placeholder string, template func() string)
// Replace replaces all occurences of placeholder in str with value. The placeholder is of the
// form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^
@@ -29,13 +28,8 @@ type Replacer interface {
Replace(str, placeholder, value string) string
}
type template struct {
fn func() string
defaults map[string]string
}
type replacer struct {
templates map[string]template
templates map[string]func() string
re *regexp.Regexp
templateRe *regexp.Regexp
@@ -44,7 +38,7 @@ type replacer struct {
// New returns a Replacer
func New() Replacer {
r := &replacer{
templates: make(map[string]template),
templates: make(map[string]func() string),
re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`),
templateRe: regexp.MustCompile(`{([a-z]+)}`),
}
@@ -52,18 +46,12 @@ func New() Replacer {
return r
}
func (r *replacer) RegisterTemplate(placeholder, tmpl string, defaults map[string]string) {
r.templates[placeholder] = template{
fn: func() string { return tmpl },
defaults: defaults,
}
func (r *replacer) RegisterTemplate(placeholder, template string) {
r.templates[placeholder] = func() string { return template }
}
func (r *replacer) RegisterTemplateFunc(placeholder string, tmplFn func() string, defaults map[string]string) {
r.templates[placeholder] = template{
fn: tmplFn,
defaults: defaults,
}
func (r *replacer) RegisterTemplateFunc(placeholder string, template func() string) {
r.templates[placeholder] = template
}
func (r *replacer) Replace(str, placeholder, value string) string {
@@ -75,20 +63,16 @@ func (r *replacer) Replace(str, placeholder, value string) string {
// We need a copy from the value
v := value
var tmpl template = template{
fn: func() string { return v },
}
// Check for a registered template
if len(v) == 0 {
t, ok := r.templates[placeholder]
tmplFunc, ok := r.templates[placeholder]
if ok {
tmpl = t
v = tmplFunc()
}
}
v = tmpl.fn()
v = r.compileTemplate(v, matches[3], tmpl.defaults)
v = r.compileTemplate(v, matches[3])
if len(matches[2]) != 0 {
// If there's a character to escape, we also have to escape the
@@ -113,18 +97,13 @@ func (r *replacer) Replace(str, placeholder, value string) string {
// placeholder name and will be replaced with the value. The resulting string is "Hello World!".
// If a placeholder name is not present in the params string, it will not be replaced. The key
// and values can be escaped as in net/url.QueryEscape.
func (r *replacer) compileTemplate(str, params string, defaults map[string]string) string {
if len(params) == 0 && len(defaults) == 0 {
func (r *replacer) compileTemplate(str, params string) string {
if len(params) == 0 {
return str
}
p := make(map[string]string)
// Copy the defaults
for key, value := range defaults {
p[key] = value
}
// taken from net/url.ParseQuery
for params != "" {
var key string

View File

@@ -34,7 +34,7 @@ func TestReplace(t *testing.T) {
func TestReplaceTemplate(t *testing.T) {
r := New()
r.RegisterTemplate("foobar", "Hello {who}! {what}?", nil)
r.RegisterTemplate("foobar", "Hello {who}! {what}?")
replaced := r.Replace("{foobar,who=World}", "foobar", "")
require.Equal(t, "Hello World! {what}?", replaced)
@@ -46,20 +46,6 @@ func TestReplaceTemplate(t *testing.T) {
require.Equal(t, "Hello World! E=mc\\\\:2?", replaced)
}
func TestReplaceTemplateDefaults(t *testing.T) {
r := New()
r.RegisterTemplate("foobar", "Hello {who}! {what}?", map[string]string{
"who": "someone",
"what": "something",
})
replaced := r.Replace("{foobar}", "foobar", "")
require.Equal(t, "Hello someone! something?", replaced)
replaced = r.Replace("{foobar,who=World}", "foobar", "")
require.Equal(t, "Hello World! something?", replaced)
}
func TestReplaceCompileTemplate(t *testing.T) {
samples := [][3]string{
{"Hello {who}!", "who=World", "Hello World!"},
@@ -72,27 +58,7 @@ func TestReplaceCompileTemplate(t *testing.T) {
r := New().(*replacer)
for _, e := range samples {
replaced := r.compileTemplate(e[0], e[1], nil)
require.Equal(t, e[2], replaced, e[0])
}
}
func TestReplaceCompileTemplateDefaults(t *testing.T) {
samples := [][3]string{
{"Hello {who}!", "", "Hello someone!"},
{"Hello {who}!", "who=World", "Hello World!"},
{"Hello {who}! {what}?", "who=World", "Hello World! something?"},
{"Hello {who}! {what}?", "who=World,what=Yeah", "Hello World! Yeah?"},
{"Hello {who}! {what}?", "who=World,what=", "Hello World! ?"},
}
r := New().(*replacer)
for _, e := range samples {
replaced := r.compileTemplate(e[0], e[1], map[string]string{
"who": "someone",
"what": "something",
})
replaced := r.compileTemplate(e[0], e[1])
require.Equal(t, e[2], replaced, e[0])
}
}

View File

@@ -381,7 +381,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
}
// Adjust the timestamp such that the stream starts from 0
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
demuxer := &pktque.FilterDemuxer{
Filter: filters,

View File

@@ -9,8 +9,6 @@ import (
"net/http"
"strings"
"time"
"github.com/datarhei/core/v16/log"
)
type API interface {
@@ -21,7 +19,6 @@ type Config struct {
URL string
Token string
Client *http.Client
Logger log.Logger
}
type api struct {
@@ -32,8 +29,6 @@ type api struct {
accessTokenType string
client *http.Client
logger log.Logger
}
func New(config Config) (API, error) {
@@ -41,11 +36,6 @@ func New(config Config) (API, error) {
url: config.URL,
token: config.Token,
client: config.Client,
logger: config.Logger,
}
if a.logger == nil {
a.logger = log.New("")
}
if !strings.HasSuffix(a.url, "/") {
@@ -105,7 +95,7 @@ func (c *copyReader) Read(p []byte) (int, error) {
if err == io.EOF {
c.reader = c.copy
c.copy = &bytes.Buffer{}
c.copy = new(bytes.Buffer)
}
return i, err

View File

@@ -55,7 +55,7 @@ func New(config Config) (Service, error) {
}
if s.logger == nil {
s.logger = log.New("")
s.logger = log.New("Service")
}
s.logger = s.logger.WithField("url", config.URL)
@@ -214,10 +214,7 @@ func (s *service) collect() (time.Duration, error) {
return 15 * time.Minute, fmt.Errorf("failed to send monitor data to service: %w", err)
}
s.logger.Debug().WithFields(log.Fields{
"next": r.Next,
"data": data,
}).Log("Sent monitor data")
s.logger.Debug().WithField("next", r.Next).Log("Sent monitor data")
if r.Next == 0 {
r.Next = 5 * 60
@@ -233,8 +230,6 @@ func (s *service) Start() {
go s.tick(ctx, time.Second)
s.stopOnce = sync.Once{}
s.logger.Info().Log("Connected")
})
}
@@ -242,8 +237,6 @@ func (s *service) Stop() {
s.stopOnce.Do(func() {
s.stopTicker()
s.startOnce = sync.Once{}
s.logger.Info().Log("Disconnected")
})
}