mirror of
https://github.com/datarhei/core.git
synced 2025-10-03 23:26:39 +08:00
Compare commits
18 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
4a12b0293f | ||
![]() |
f472fe150f | ||
![]() |
1bbb7a9c1f | ||
![]() |
37e00407cc | ||
![]() |
17c9f6ef13 | ||
![]() |
ff6b0d9584 | ||
![]() |
378a3cd9cf | ||
![]() |
992b04d180 | ||
![]() |
391681447e | ||
![]() |
59aa6af767 | ||
![]() |
c44fb30a84 | ||
![]() |
0cd8be130c | ||
![]() |
65a617c2af | ||
![]() |
8a1dc59a81 | ||
![]() |
ee2a188be8 | ||
![]() |
1a9ef8b7c9 | ||
![]() |
d0262cc887 | ||
![]() |
18be75d013 |
34
.github/workflows/go-tests.yml
vendored
34
.github/workflows/go-tests.yml
vendored
@@ -3,20 +3,20 @@ name: tests
|
||||
on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: '1.18'
|
||||
- name: Run coverage
|
||||
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v2
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
files: coverage.out
|
||||
flags: unit-linux
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 2
|
||||
- uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: "1.19"
|
||||
- name: Run coverage
|
||||
run: go test -coverprofile=coverage.out -covermode=atomic -v ./...
|
||||
- name: Upload coverage to Codecov
|
||||
uses: codecov/codecov-action@v2
|
||||
with:
|
||||
token: ${{ secrets.CODECOV_TOKEN }}
|
||||
files: coverage.out
|
||||
flags: unit-linux
|
||||
|
134
app/api/api.go
134
app/api/api.go
@@ -6,9 +6,11 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
golog "log"
|
||||
"math"
|
||||
gonet "net"
|
||||
gohttp "net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"runtime/debug"
|
||||
"sync"
|
||||
@@ -39,6 +41,7 @@ import (
|
||||
"github.com/datarhei/core/v16/update"
|
||||
|
||||
"github.com/caddyserver/certmagic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// The API interface is the implementation for the restreamer API.
|
||||
@@ -145,7 +148,12 @@ func (a *api) Reload() error {
|
||||
a.errorChan = make(chan error, 1)
|
||||
}
|
||||
|
||||
logger := log.New("Core").WithOutput(log.NewConsoleWriter(a.log.writer, log.Lwarn, true))
|
||||
logger := log.New("Core").WithOutput(
|
||||
log.NewLevelWriter(
|
||||
log.NewConsoleWriter(a.log.writer, true),
|
||||
log.Lwarn,
|
||||
),
|
||||
)
|
||||
|
||||
store, err := configstore.NewJSON(a.config.path, func() {
|
||||
a.errorChan <- ErrConfigReload
|
||||
@@ -181,31 +189,54 @@ func (a *api) Reload() error {
|
||||
break
|
||||
}
|
||||
|
||||
buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines)
|
||||
buffer := log.NewBufferWriter(cfg.Log.MaxLines)
|
||||
var writer log.Writer
|
||||
|
||||
logger = logger.WithOutput(log.NewLevelRewriter(
|
||||
log.NewMultiWriter(
|
||||
log.NewTopicWriter(
|
||||
log.NewConsoleWriter(a.log.writer, loglevel, true),
|
||||
cfg.Log.Topics,
|
||||
),
|
||||
buffer,
|
||||
),
|
||||
[]log.LevelRewriteRule{
|
||||
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
|
||||
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
|
||||
{
|
||||
Level: log.Ldebug,
|
||||
Component: "HTTP",
|
||||
Match: map[string]string{
|
||||
"client": "^(::1|127.0.0.1)$",
|
||||
"method": "^(PUT|POST|DELETE)$",
|
||||
"status_text": "^Unauthorized$",
|
||||
"user_agent": "^Lavf/",
|
||||
if cfg.Log.Target.Output == "stdout" {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stdout,
|
||||
true,
|
||||
)
|
||||
} else if cfg.Log.Target.Output == "file" {
|
||||
writer = log.NewFileWriter(
|
||||
cfg.Log.Target.Path,
|
||||
log.NewJSONFormatter(),
|
||||
)
|
||||
} else {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stderr,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
logger = logger.WithOutput(
|
||||
log.NewLevelWriter(
|
||||
log.NewLevelRewriter(
|
||||
log.NewMultiWriter(
|
||||
log.NewTopicWriter(
|
||||
writer,
|
||||
cfg.Log.Topics,
|
||||
),
|
||||
buffer,
|
||||
),
|
||||
[]log.LevelRewriteRule{
|
||||
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
|
||||
// ts=2022-04-28T07:24:27Z level=WARN component="HTTP" address=":8080" client="::1" latency_ms=0 method="PUT" path="/memfs/00a10a69-416a-4cd5-9d4f-6d88ed3dd7f5_0917.ts" proto="HTTP/1.1" size_bytes=65 status=401 status_text="Unauthorized" user_agent="Lavf/58.76.100"
|
||||
{
|
||||
Level: log.Ldebug,
|
||||
Component: "HTTP",
|
||||
Match: map[string]string{
|
||||
"client": "^(::1|127.0.0.1)$",
|
||||
"method": "^(PUT|POST|DELETE)$",
|
||||
"status_text": "^Unauthorized$",
|
||||
"user_agent": "^Lavf/",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
))
|
||||
),
|
||||
loglevel,
|
||||
),
|
||||
)
|
||||
|
||||
logfields := log.Fields{
|
||||
"application": app.Name,
|
||||
@@ -227,6 +258,8 @@ func (a *api) Reload() error {
|
||||
|
||||
logger.Info().WithFields(logfields).Log("")
|
||||
|
||||
logger.Info().WithField("path", a.config.path).Log("Read config file")
|
||||
|
||||
configlogger := logger.WithComponent("Config")
|
||||
cfg.Messages(func(level string, v configvars.Variable, message string) {
|
||||
configlogger = configlogger.WithFields(log.Fields{
|
||||
@@ -443,8 +476,8 @@ func (a *api) start() error {
|
||||
a.replacer = replace.New()
|
||||
|
||||
{
|
||||
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base())
|
||||
a.replacer.RegisterTemplate("memfs", a.memfs.Base())
|
||||
a.replacer.RegisterTemplate("diskfs", a.diskfs.Base(), nil)
|
||||
a.replacer.RegisterTemplate("memfs", a.memfs.Base(), nil)
|
||||
|
||||
host, port, _ := gonet.SplitHostPort(cfg.RTMP.Address)
|
||||
if len(host) == 0 {
|
||||
@@ -461,21 +494,23 @@ func (a *api) start() error {
|
||||
template += "?token=" + cfg.RTMP.Token
|
||||
}
|
||||
|
||||
a.replacer.RegisterTemplate("rtmp", template)
|
||||
a.replacer.RegisterTemplate("rtmp", template, nil)
|
||||
|
||||
host, port, _ = gonet.SplitHostPort(cfg.SRT.Address)
|
||||
if len(host) == 0 {
|
||||
host = "localhost"
|
||||
}
|
||||
|
||||
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&streamid=#!:m={mode},r={name}"
|
||||
template = "srt://" + host + ":" + port + "?mode=caller&transtype=live&latency={latency}&streamid={name},mode:{mode}"
|
||||
if len(cfg.SRT.Token) != 0 {
|
||||
template += ",token=" + cfg.SRT.Token
|
||||
template += ",token:" + cfg.SRT.Token
|
||||
}
|
||||
if len(cfg.SRT.Passphrase) != 0 {
|
||||
template += "&passphrase=" + cfg.SRT.Passphrase
|
||||
}
|
||||
a.replacer.RegisterTemplate("srt", template)
|
||||
a.replacer.RegisterTemplate("srt", template, map[string]string{
|
||||
"latency": "20000", // 20 milliseconds, FFmpeg requires microseconds
|
||||
})
|
||||
}
|
||||
|
||||
store := store.NewJSONStore(store.JSONConfig{
|
||||
@@ -655,26 +690,28 @@ func (a *api) start() error {
|
||||
if cfg.TLS.Enable {
|
||||
if cfg.TLS.Auto {
|
||||
if len(cfg.Host.Name) == 0 {
|
||||
return fmt.Errorf("at least one host must be provided in host.name or RS_HOST_NAME")
|
||||
return fmt.Errorf("at least one host must be provided in host.name or CORE_HOST_NAME")
|
||||
}
|
||||
|
||||
certmagic.Default.Storage = &certmagic.FileStorage{
|
||||
Path: cfg.DB.Dir + "/cert",
|
||||
}
|
||||
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
|
||||
certmagic.Default.Logger = zap.NewNop()
|
||||
|
||||
certmagic.DefaultACME.Agreed = true
|
||||
certmagic.DefaultACME.Email = cfg.TLS.Email
|
||||
certmagic.DefaultACME.CA = certmagic.LetsEncryptProductionCA
|
||||
certmagic.DefaultACME.DisableHTTPChallenge = false
|
||||
certmagic.DefaultACME.DisableTLSALPNChallenge = true
|
||||
certmagic.DefaultACME.Logger = nil
|
||||
|
||||
certmagic.Default.Storage = &certmagic.FileStorage{
|
||||
Path: cfg.DB.Dir + "/cert",
|
||||
}
|
||||
certmagic.Default.DefaultServerName = cfg.Host.Name[0]
|
||||
certmagic.Default.Logger = nil
|
||||
certmagic.DefaultACME.Logger = zap.NewNop()
|
||||
|
||||
magic := certmagic.NewDefault()
|
||||
acme := certmagic.NewACMEIssuer(magic, certmagic.DefaultACME)
|
||||
acme.Logger = zap.NewNop()
|
||||
|
||||
magic.Issuers = []certmagic.Issuer{acme}
|
||||
magic.Logger = zap.NewNop()
|
||||
|
||||
autocertManager = magic
|
||||
|
||||
@@ -713,6 +750,19 @@ func (a *api) start() error {
|
||||
if err != nil {
|
||||
logger.Error().WithField("error", err).Log("Failed to acquire certificate")
|
||||
certerror = true
|
||||
/*
|
||||
problems, err := letsdebug.Check(host, letsdebug.HTTP01)
|
||||
if err != nil {
|
||||
logger.Error().WithField("error", err).Log("Failed to debug certificate acquisition")
|
||||
}
|
||||
|
||||
for _, p := range problems {
|
||||
logger.Error().WithFields(log.Fields{
|
||||
"name": p.Name,
|
||||
"detail": p.Detail,
|
||||
}).Log(p.Explanation)
|
||||
}
|
||||
*/
|
||||
break
|
||||
}
|
||||
|
||||
@@ -1101,6 +1151,12 @@ func (a *api) start() error {
|
||||
}(ctx)
|
||||
}
|
||||
|
||||
if cfg.Debug.MemoryLimit > 0 {
|
||||
debug.SetMemoryLimit(cfg.Debug.MemoryLimit * 1024 * 1024)
|
||||
} else {
|
||||
debug.SetMemoryLimit(math.MaxInt64)
|
||||
}
|
||||
|
||||
// Start the restream processes
|
||||
restream.Start()
|
||||
|
||||
@@ -1270,4 +1326,6 @@ func (a *api) Destroy() {
|
||||
a.memfs.DeleteAll()
|
||||
a.memfs = nil
|
||||
}
|
||||
|
||||
a.log.logger.core.Close()
|
||||
}
|
||||
|
@@ -17,12 +17,19 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := log.New("Migration").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithFields(log.Fields{
|
||||
logger := log.New("Migration").WithOutput(
|
||||
log.NewLevelWriter(
|
||||
log.NewConsoleWriter(os.Stderr, true),
|
||||
log.Linfo,
|
||||
),
|
||||
).WithFields(log.Fields{
|
||||
"from": "ffmpeg4",
|
||||
"to": "ffmpeg5",
|
||||
})
|
||||
|
||||
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
|
||||
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
|
||||
|
||||
configstore, err := cfgstore.NewJSON(configfile, nil)
|
||||
if err != nil {
|
||||
logger.Error().WithError(err).Log("Loading configuration failed")
|
||||
os.Exit(1)
|
||||
@@ -63,6 +70,27 @@ func doMigration(logger log.Logger, configstore cfgstore.Store) error {
|
||||
return fmt.Errorf("the configuration contains errors: %v", messages)
|
||||
}
|
||||
|
||||
var writer log.Writer
|
||||
|
||||
if cfg.Log.Target.Output == "stdout" {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stdout,
|
||||
true,
|
||||
)
|
||||
} else if cfg.Log.Target.Output == "file" {
|
||||
writer = log.NewFileWriter(
|
||||
cfg.Log.Target.Path,
|
||||
log.NewJSONFormatter(),
|
||||
)
|
||||
} else {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stderr,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
logger = logger.WithOutput(writer)
|
||||
|
||||
ff, err := ffmpeg.New(ffmpeg.Config{
|
||||
Binary: cfg.FFmpeg.Binary,
|
||||
})
|
||||
|
@@ -13,9 +13,16 @@ import (
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := log.New("Import").WithOutput(log.NewConsoleWriter(os.Stderr, log.Linfo, true)).WithField("version", "v1")
|
||||
logger := log.New("Import").WithOutput(
|
||||
log.NewLevelWriter(
|
||||
log.NewConsoleWriter(os.Stderr, true),
|
||||
log.Linfo,
|
||||
),
|
||||
).WithField("version", "v1")
|
||||
|
||||
configstore, err := cfgstore.NewJSON(os.Getenv("CORE_CONFIGFILE"), nil)
|
||||
configfile := cfgstore.Location(os.Getenv("CORE_CONFIGFILE"))
|
||||
|
||||
configstore, err := cfgstore.NewJSON(configfile, nil)
|
||||
if err != nil {
|
||||
logger.Error().WithError(err).Log("Loading configuration failed")
|
||||
os.Exit(1)
|
||||
@@ -31,8 +38,6 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
|
||||
logger = log.New("")
|
||||
}
|
||||
|
||||
logger.Info().Log("Database import")
|
||||
|
||||
cfg := configstore.Get()
|
||||
|
||||
// Merging the persisted config with the environment variables
|
||||
@@ -58,6 +63,27 @@ func doImport(logger log.Logger, configstore cfgstore.Store) error {
|
||||
return fmt.Errorf("the configuration contains errors: %v", messages)
|
||||
}
|
||||
|
||||
var writer log.Writer
|
||||
|
||||
if cfg.Log.Target.Output == "stdout" {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stdout,
|
||||
true,
|
||||
)
|
||||
} else if cfg.Log.Target.Output == "file" {
|
||||
writer = log.NewFileWriter(
|
||||
cfg.Log.Target.Path,
|
||||
log.NewJSONFormatter(),
|
||||
)
|
||||
} else {
|
||||
writer = log.NewConsoleWriter(
|
||||
os.Stderr,
|
||||
true,
|
||||
)
|
||||
}
|
||||
|
||||
logger = logger.WithOutput(writer)
|
||||
|
||||
logger.Info().Log("Checking for database ...")
|
||||
|
||||
// Check if there's a v1.json from the old Restreamer
|
||||
|
@@ -141,6 +141,8 @@ func (d *Config) init() {
|
||||
d.vars.Register(value.NewString(&d.Log.Level, "info"), "log.level", "CORE_LOG_LEVEL", nil, "Loglevel: silent, error, warn, info, debug", false, false)
|
||||
d.vars.Register(value.NewStringList(&d.Log.Topics, []string{}, ","), "log.topics", "CORE_LOG_TOPICS", nil, "Show only selected log topics", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Log.MaxLines, 1000), "log.max_lines", "CORE_LOG_MAXLINES", nil, "Number of latest log lines to keep in memory", false, false)
|
||||
d.vars.Register(value.NewString(&d.Log.Target.Output, "stderr"), "log.target.output", "CORE_LOG_TARGET_OUTPUT", nil, "Where to write the logs to: stdout, stderr, file", false, false)
|
||||
d.vars.Register(value.NewString(&d.Log.Target.Path, ""), "log.target.path", "CORE_LOG_TARGET_PATH", nil, "Path to log file if output is 'file'", false, false)
|
||||
|
||||
// DB
|
||||
d.vars.Register(value.NewMustDir(&d.DB.Dir, "./config"), "db.dir", "CORE_DB_DIR", nil, "Directory for holding the operational data", false, false)
|
||||
@@ -232,6 +234,7 @@ func (d *Config) init() {
|
||||
// Debug
|
||||
d.vars.Register(value.NewBool(&d.Debug.Profiling, false), "debug.profiling", "CORE_DEBUG_PROFILING", nil, "Enable profiling endpoint on /profiling", false, false)
|
||||
d.vars.Register(value.NewInt(&d.Debug.ForceGC, 0), "debug.force_gc", "CORE_DEBUG_FORCEGC", nil, "Number of seconds between forcing GC to return memory to the OS", false, false)
|
||||
d.vars.Register(value.NewInt64(&d.Debug.MemoryLimit, 0), "debug.memory_limit_mbytes", "CORE_DEBUG_MEMORY_LIMIT_MBYTES", nil, "Impose a soft memory limit for the core, in megabytes", false, false)
|
||||
|
||||
// Metrics
|
||||
d.vars.Register(value.NewBool(&d.Metrics.Enable, false), "metrics.enable", "CORE_METRICS_ENABLE", nil, "Enable collecting historic metrics data", false, false)
|
||||
|
@@ -22,6 +22,10 @@ type Data struct {
|
||||
Level string `json:"level" enums:"debug,info,warn,error,silent" jsonschema:"enum=debug,enum=info,enum=warn,enum=error,enum=silent"`
|
||||
Topics []string `json:"topics"`
|
||||
MaxLines int `json:"max_lines"`
|
||||
Target struct {
|
||||
Output string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
} `json:"target"` // discard, stderr, stdout, file:/path/to/file.log
|
||||
} `json:"log"`
|
||||
DB struct {
|
||||
Dir string `json:"dir"`
|
||||
@@ -135,8 +139,9 @@ type Data struct {
|
||||
MaxPort int `json:"max_port"`
|
||||
} `json:"playout"`
|
||||
Debug struct {
|
||||
Profiling bool `json:"profiling"`
|
||||
ForceGC int `json:"force_gc"`
|
||||
Profiling bool `json:"profiling"`
|
||||
ForceGC int `json:"force_gc"`
|
||||
MemoryLimit int64 `json:"memory_limit_mbytes"`
|
||||
} `json:"debug"`
|
||||
Metrics struct {
|
||||
Enable bool `json:"enable"`
|
||||
@@ -181,7 +186,6 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.Address = d.Address
|
||||
data.CheckForUpdates = d.CheckForUpdates
|
||||
|
||||
data.Log = d.Log
|
||||
data.DB = d.DB
|
||||
data.Host = d.Host
|
||||
data.API = d.API
|
||||
@@ -189,14 +193,11 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.SRT = d.SRT
|
||||
data.FFmpeg = d.FFmpeg
|
||||
data.Playout = d.Playout
|
||||
data.Debug = d.Debug
|
||||
data.Metrics = d.Metrics
|
||||
data.Sessions = d.Sessions
|
||||
data.Service = d.Service
|
||||
data.Router = d.Router
|
||||
|
||||
data.Log.Topics = copy.Slice(d.Log.Topics)
|
||||
|
||||
data.Host.Name = copy.Slice(d.Host.Name)
|
||||
|
||||
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
|
||||
@@ -228,6 +229,16 @@ func MergeV2toV3(data *Data, d *v2.Data) (*Data, error) {
|
||||
data.Storage.Memory = d.Storage.Memory
|
||||
|
||||
// Actual changes
|
||||
data.Log.Level = d.Log.Level
|
||||
data.Log.Topics = copy.Slice(d.Log.Topics)
|
||||
data.Log.MaxLines = d.Log.MaxLines
|
||||
data.Log.Target.Output = "stderr"
|
||||
data.Log.Target.Path = ""
|
||||
|
||||
data.Debug.Profiling = d.Debug.Profiling
|
||||
data.Debug.ForceGC = d.Debug.ForceGC
|
||||
data.Debug.MemoryLimit = 0
|
||||
|
||||
data.TLS.Enable = d.TLS.Enable
|
||||
data.TLS.Address = d.TLS.Address
|
||||
data.TLS.Auto = d.TLS.Auto
|
||||
@@ -259,7 +270,6 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.Address = d.Address
|
||||
data.CheckForUpdates = d.CheckForUpdates
|
||||
|
||||
data.Log = d.Log
|
||||
data.DB = d.DB
|
||||
data.Host = d.Host
|
||||
data.API = d.API
|
||||
@@ -267,14 +277,11 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.SRT = d.SRT
|
||||
data.FFmpeg = d.FFmpeg
|
||||
data.Playout = d.Playout
|
||||
data.Debug = d.Debug
|
||||
data.Metrics = d.Metrics
|
||||
data.Sessions = d.Sessions
|
||||
data.Service = d.Service
|
||||
data.Router = d.Router
|
||||
|
||||
data.Log.Topics = copy.Slice(d.Log.Topics)
|
||||
|
||||
data.Host.Name = copy.Slice(d.Host.Name)
|
||||
|
||||
data.API.Access.HTTP.Allow = copy.Slice(d.API.Access.HTTP.Allow)
|
||||
@@ -299,6 +306,13 @@ func DowngradeV3toV2(d *Data) (*v2.Data, error) {
|
||||
data.Router.Routes = copy.StringMap(d.Router.Routes)
|
||||
|
||||
// Actual changes
|
||||
data.Log.Level = d.Log.Level
|
||||
data.Log.Topics = copy.Slice(d.Log.Topics)
|
||||
data.Log.MaxLines = d.Log.MaxLines
|
||||
|
||||
data.Debug.Profiling = d.Debug.Profiling
|
||||
data.Debug.ForceGC = d.Debug.ForceGC
|
||||
|
||||
data.TLS.Enable = d.TLS.Enable
|
||||
data.TLS.Address = d.TLS.Address
|
||||
data.TLS.Auto = d.TLS.Auto
|
||||
|
@@ -118,6 +118,10 @@ func (c *jsonStore) load(cfg *config.Config) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(jsondata) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
data, err := migrate(jsondata)
|
||||
if err != nil {
|
||||
return err
|
||||
|
53
config/store/location.go
Normal file
53
config/store/location.go
Normal file
@@ -0,0 +1,53 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path"
|
||||
)
|
||||
|
||||
// Location returns the path to the config file. If no path is provided,
|
||||
// different standard location will be probed:
|
||||
// - os.UserConfigDir() + /datarhei-core/config.js
|
||||
// - os.UserHomeDir() + /.config/datarhei-core/config.js
|
||||
// - ./config/config.js
|
||||
// If the config doesn't exist in none of these locations, it will be assumed
|
||||
// at ./config/config.js
|
||||
func Location(filepath string) string {
|
||||
configfile := filepath
|
||||
if len(configfile) != 0 {
|
||||
return configfile
|
||||
}
|
||||
|
||||
locations := []string{}
|
||||
|
||||
if dir, err := os.UserConfigDir(); err == nil {
|
||||
locations = append(locations, dir+"/datarhei-core/config.js")
|
||||
}
|
||||
|
||||
if dir, err := os.UserHomeDir(); err == nil {
|
||||
locations = append(locations, dir+"/.config/datarhei-core/config.js")
|
||||
}
|
||||
|
||||
locations = append(locations, "./config/config.js")
|
||||
|
||||
for _, path := range locations {
|
||||
info, err := os.Stat(path)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if info.IsDir() {
|
||||
continue
|
||||
}
|
||||
|
||||
configfile = path
|
||||
}
|
||||
|
||||
if len(configfile) == 0 {
|
||||
configfile = "./config/config.js"
|
||||
}
|
||||
|
||||
os.MkdirAll(path.Dir(configfile), 0740)
|
||||
|
||||
return configfile
|
||||
}
|
@@ -4,6 +4,7 @@ import (
|
||||
"encoding/base64"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
)
|
||||
|
||||
@@ -16,6 +17,28 @@ type Auth0Tenant struct {
|
||||
Users []string `json:"users"`
|
||||
}
|
||||
|
||||
func (a *Auth0Tenant) String() string {
|
||||
u := url.URL{
|
||||
Scheme: "auth0",
|
||||
Host: a.Domain,
|
||||
}
|
||||
|
||||
if len(a.ClientID) != 0 {
|
||||
u.User = url.User(a.ClientID)
|
||||
}
|
||||
|
||||
q := url.Values{}
|
||||
q.Set("aud", a.Audience)
|
||||
|
||||
for _, user := range a.Users {
|
||||
q.Add("user", user)
|
||||
}
|
||||
|
||||
u.RawQuery = q.Encode()
|
||||
|
||||
return u.String()
|
||||
}
|
||||
|
||||
type TenantList struct {
|
||||
p *[]Auth0Tenant
|
||||
separator string
|
||||
@@ -32,18 +55,34 @@ func NewTenantList(p *[]Auth0Tenant, val []Auth0Tenant, separator string) *Tenan
|
||||
return v
|
||||
}
|
||||
|
||||
// Set allows to set a tenant list in two formats:
|
||||
// - a separator separated list of bas64 encoded Auth0Tenant JSON objects
|
||||
// - a separator separated list of Auth0Tenant in URL representation: auth0://[clientid]@[domain]?aud=[audience]&user=...&user=...
|
||||
func (s *TenantList) Set(val string) error {
|
||||
list := []Auth0Tenant{}
|
||||
|
||||
for i, elm := range strings.Split(val, s.separator) {
|
||||
data, err := base64.StdEncoding.DecodeString(elm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
|
||||
}
|
||||
|
||||
t := Auth0Tenant{}
|
||||
if err := json.Unmarshal(data, &t); err != nil {
|
||||
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
|
||||
|
||||
if strings.HasPrefix(elm, "auth0://") {
|
||||
data, err := url.Parse(elm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid url encoding of tenant %d: %w", i, err)
|
||||
}
|
||||
|
||||
t.Domain = data.Host
|
||||
t.ClientID = data.User.Username()
|
||||
t.Audience = data.Query().Get("aud")
|
||||
t.Users = data.Query()["user"]
|
||||
} else {
|
||||
data, err := base64.StdEncoding.DecodeString(elm)
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid base64 encoding of tenant %d: %w", i, err)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &t); err != nil {
|
||||
return fmt.Errorf("invalid JSON in tenant %d: %w", i, err)
|
||||
}
|
||||
}
|
||||
|
||||
list = append(list, t)
|
||||
@@ -62,10 +101,10 @@ func (s *TenantList) String() string {
|
||||
list := []string{}
|
||||
|
||||
for _, t := range *s.p {
|
||||
list = append(list, fmt.Sprintf("%s (%d users)", t.Domain, len(t.Users)))
|
||||
list = append(list, t.String())
|
||||
}
|
||||
|
||||
return strings.Join(list, ",")
|
||||
return strings.Join(list, s.separator)
|
||||
}
|
||||
|
||||
func (s *TenantList) Validate() error {
|
||||
|
43
config/value/auth0_test.go
Normal file
43
config/value/auth0_test.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package value
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestAuth0Value(t *testing.T) {
|
||||
tenants := []Auth0Tenant{}
|
||||
|
||||
v := NewTenantList(&tenants, nil, " ")
|
||||
require.Equal(t, "(empty)", v.String())
|
||||
|
||||
v.Set("auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3")
|
||||
require.Equal(t, []Auth0Tenant{
|
||||
{
|
||||
Domain: "domain",
|
||||
ClientID: "clientid",
|
||||
Audience: "audience",
|
||||
Users: []string{"user1", "user2"},
|
||||
},
|
||||
{
|
||||
Domain: "domain2",
|
||||
Audience: "audience2",
|
||||
Users: []string{"user3"},
|
||||
},
|
||||
}, tenants)
|
||||
require.Equal(t, "auth0://clientid@domain?aud=audience&user=user1&user=user2 auth0://domain2?aud=audience2&user=user3", v.String())
|
||||
require.NoError(t, v.Validate())
|
||||
|
||||
v.Set("eyJkb21haW4iOiJkYXRhcmhlaS5ldS5hdXRoMC5jb20iLCJhdWRpZW5jZSI6Imh0dHBzOi8vZGF0YXJoZWkuY29tL2NvcmUiLCJ1c2VycyI6WyJhdXRoMHx4eHgiXX0=")
|
||||
require.Equal(t, []Auth0Tenant{
|
||||
{
|
||||
Domain: "datarhei.eu.auth0.com",
|
||||
ClientID: "",
|
||||
Audience: "https://datarhei.com/core",
|
||||
Users: []string{"auth0|xxx"},
|
||||
},
|
||||
}, tenants)
|
||||
require.Equal(t, "auth0://datarhei.eu.auth0.com?aud=https%3A%2F%2Fdatarhei.com%2Fcore&user=auth0%7Cxxx", v.String())
|
||||
require.NoError(t, v.Validate())
|
||||
}
|
@@ -34,6 +34,10 @@ func (u *MustDir) Validate() error {
|
||||
return fmt.Errorf("path name must not be empty")
|
||||
}
|
||||
|
||||
if err := os.MkdirAll(val, 0750); err != nil {
|
||||
return fmt.Errorf("%s can't be created (%w)", val, err)
|
||||
}
|
||||
|
||||
finfo, err := os.Stat(val)
|
||||
if err != nil {
|
||||
return fmt.Errorf("%s does not exist", val)
|
||||
|
@@ -3,7 +3,7 @@ package value
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestIntValue(t *testing.T) {
|
||||
@@ -11,19 +11,19 @@ func TestIntValue(t *testing.T) {
|
||||
|
||||
ivar := NewInt(&i, 11)
|
||||
|
||||
assert.Equal(t, "11", ivar.String())
|
||||
assert.Equal(t, nil, ivar.Validate())
|
||||
assert.Equal(t, false, ivar.IsEmpty())
|
||||
require.Equal(t, "11", ivar.String())
|
||||
require.Equal(t, nil, ivar.Validate())
|
||||
require.Equal(t, false, ivar.IsEmpty())
|
||||
|
||||
i = 42
|
||||
|
||||
assert.Equal(t, "42", ivar.String())
|
||||
assert.Equal(t, nil, ivar.Validate())
|
||||
assert.Equal(t, false, ivar.IsEmpty())
|
||||
require.Equal(t, "42", ivar.String())
|
||||
require.Equal(t, nil, ivar.Validate())
|
||||
require.Equal(t, false, ivar.IsEmpty())
|
||||
|
||||
ivar.Set("77")
|
||||
|
||||
assert.Equal(t, int(77), i)
|
||||
require.Equal(t, int(77), i)
|
||||
}
|
||||
|
||||
type testdata struct {
|
||||
@@ -37,22 +37,22 @@ func TestCopyStruct(t *testing.T) {
|
||||
NewInt(&data1.value1, 1)
|
||||
NewInt(&data1.value2, 2)
|
||||
|
||||
assert.Equal(t, int(1), data1.value1)
|
||||
assert.Equal(t, int(2), data1.value2)
|
||||
require.Equal(t, int(1), data1.value1)
|
||||
require.Equal(t, int(2), data1.value2)
|
||||
|
||||
data2 := testdata{}
|
||||
|
||||
val21 := NewInt(&data2.value1, 3)
|
||||
val22 := NewInt(&data2.value2, 4)
|
||||
|
||||
assert.Equal(t, int(3), data2.value1)
|
||||
assert.Equal(t, int(4), data2.value2)
|
||||
require.Equal(t, int(3), data2.value1)
|
||||
require.Equal(t, int(4), data2.value2)
|
||||
|
||||
data2 = data1
|
||||
|
||||
assert.Equal(t, int(1), data2.value1)
|
||||
assert.Equal(t, int(2), data2.value2)
|
||||
require.Equal(t, int(1), data2.value1)
|
||||
require.Equal(t, int(2), data2.value2)
|
||||
|
||||
assert.Equal(t, "1", val21.String())
|
||||
assert.Equal(t, "2", val22.String())
|
||||
require.Equal(t, "1", val21.String())
|
||||
require.Equal(t, "2", val22.String())
|
||||
}
|
||||
|
2
go.mod
2
go.mod
@@ -26,6 +26,7 @@ require (
|
||||
github.com/swaggo/swag v1.8.7
|
||||
github.com/vektah/gqlparser/v2 v2.5.1
|
||||
github.com/xeipuuv/gojsonschema v1.2.0
|
||||
go.uber.org/zap v1.23.0
|
||||
golang.org/x/mod v0.6.0
|
||||
)
|
||||
|
||||
@@ -79,7 +80,6 @@ require (
|
||||
github.com/yusufpapurcu/wmi v1.2.2 // indirect
|
||||
go.uber.org/atomic v1.10.0 // indirect
|
||||
go.uber.org/multierr v1.8.0 // indirect
|
||||
go.uber.org/zap v1.23.0 // indirect
|
||||
golang.org/x/crypto v0.1.0 // indirect
|
||||
golang.org/x/net v0.1.0 // indirect
|
||||
golang.org/x/sys v0.1.0 // indirect
|
||||
|
@@ -12,7 +12,7 @@ import (
|
||||
|
||||
func (r *queryResolver) Log(ctx context.Context) ([]string, error) {
|
||||
if r.LogBuffer == nil {
|
||||
r.LogBuffer = log.NewBufferWriter(log.Lsilent, 1)
|
||||
r.LogBuffer = log.NewBufferWriter(1)
|
||||
}
|
||||
|
||||
events := r.LogBuffer.Events()
|
||||
|
@@ -22,7 +22,7 @@ func NewLog(buffer log.BufferWriter) *LogHandler {
|
||||
}
|
||||
|
||||
if l.buffer == nil {
|
||||
l.buffer = log.NewBufferWriter(log.Lsilent, 1)
|
||||
l.buffer = log.NewBufferWriter(1)
|
||||
}
|
||||
|
||||
return l
|
||||
|
@@ -51,7 +51,7 @@ func (h *RestreamHandler) Add(c echo.Context) error {
|
||||
return api.Err(http.StatusBadRequest, "Unsupported process type", "Supported process types are: ffmpeg")
|
||||
}
|
||||
|
||||
if len(process.Input) == 0 && len(process.Output) == 0 {
|
||||
if len(process.Input) == 0 || len(process.Output) == 0 {
|
||||
return api.Err(http.StatusBadRequest, "At least one input and one output need to be defined")
|
||||
}
|
||||
|
||||
@@ -189,6 +189,14 @@ func (h *RestreamHandler) Update(c echo.Context) error {
|
||||
Autostart: true,
|
||||
}
|
||||
|
||||
current, err := h.restream.GetProcess(id)
|
||||
if err != nil {
|
||||
return api.Err(http.StatusNotFound, "Process not found", "%s", id)
|
||||
}
|
||||
|
||||
// Prefill the config with the current values
|
||||
process.Unmarshal(current.Config)
|
||||
|
||||
if err := util.ShouldBindJSON(c, &process); err != nil {
|
||||
return api.Err(http.StatusBadRequest, "Invalid JSON", "%s", err)
|
||||
}
|
||||
|
50
log/log.go
50
log/log.go
@@ -14,28 +14,29 @@ import (
|
||||
type Level uint
|
||||
|
||||
const (
|
||||
Lsilent Level = 0
|
||||
Lerror Level = 1
|
||||
Lwarn Level = 2
|
||||
Linfo Level = 3
|
||||
Ldebug Level = 4
|
||||
Lsilent Level = 0b0000
|
||||
Lerror Level = 0b0001
|
||||
Lwarn Level = 0b0010
|
||||
Linfo Level = 0b0100
|
||||
Ldebug Level = 0b1000
|
||||
)
|
||||
|
||||
// String returns a string representing the log level.
|
||||
func (level Level) String() string {
|
||||
names := []string{
|
||||
"SILENT",
|
||||
"ERROR",
|
||||
"WARN",
|
||||
"INFO",
|
||||
"DEBUG",
|
||||
}
|
||||
|
||||
if level > Ldebug {
|
||||
switch level {
|
||||
case Lsilent:
|
||||
return "SILENT"
|
||||
case Lerror:
|
||||
return "ERROR"
|
||||
case Lwarn:
|
||||
return "WARN"
|
||||
case Linfo:
|
||||
return "INFO"
|
||||
case Ldebug:
|
||||
return "DEBUG"
|
||||
default:
|
||||
return `¯\_(ツ)_/¯`
|
||||
}
|
||||
|
||||
return names[level]
|
||||
}
|
||||
|
||||
func (level *Level) MarshalJSON() ([]byte, error) {
|
||||
@@ -97,6 +98,9 @@ type Logger interface {
|
||||
// Write implements the io.Writer interface such that it can be used in e.g. the
|
||||
// the log/Logger facility. Messages will be printed with debug level.
|
||||
Write(p []byte) (int, error)
|
||||
|
||||
// Close closes the underlying writer.
|
||||
Close()
|
||||
}
|
||||
|
||||
// logger is an implementation of the Logger interface.
|
||||
@@ -184,6 +188,10 @@ func (l *logger) Write(p []byte) (int, error) {
|
||||
return newEvent(l).Write(p)
|
||||
}
|
||||
|
||||
func (l *logger) Close() {
|
||||
l.output.Close()
|
||||
}
|
||||
|
||||
type Event struct {
|
||||
logger *logger
|
||||
|
||||
@@ -352,12 +360,6 @@ func (l *Event) Write(p []byte) (int, error) {
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
type Eventx struct {
|
||||
Time time.Time `json:"ts"`
|
||||
Level Level `json:"level"`
|
||||
Component string `json:"component"`
|
||||
Reference string `json:"ref"`
|
||||
Message string `json:"message"`
|
||||
Caller string `json:"caller"`
|
||||
Detail interface{} `json:"detail"`
|
||||
func (l *Event) Close() {
|
||||
l.logger.Close()
|
||||
}
|
||||
|
@@ -5,25 +5,25 @@ import (
|
||||
"bytes"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestLoglevelNames(t *testing.T) {
|
||||
assert.Equal(t, "DEBUG", Ldebug.String())
|
||||
assert.Equal(t, "ERROR", Lerror.String())
|
||||
assert.Equal(t, "WARN", Lwarn.String())
|
||||
assert.Equal(t, "INFO", Linfo.String())
|
||||
assert.Equal(t, `SILENT`, Lsilent.String())
|
||||
require.Equal(t, "DEBUG", Ldebug.String())
|
||||
require.Equal(t, "ERROR", Lerror.String())
|
||||
require.Equal(t, "WARN", Lwarn.String())
|
||||
require.Equal(t, "INFO", Linfo.String())
|
||||
require.Equal(t, `SILENT`, Lsilent.String())
|
||||
}
|
||||
|
||||
func TestLogColorToNotTTY(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
w := NewConsoleWriter(writer, Linfo, true).(*syncWriter)
|
||||
w := NewLevelWriter(NewConsoleWriter(writer, true), Linfo).(*levelWriter).writer.(*syncWriter)
|
||||
formatter := w.writer.(*consoleWriter).formatter.(*consoleFormatter)
|
||||
|
||||
assert.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
|
||||
require.NotEqual(t, true, formatter.color, "Color should not be used on a buffer logger")
|
||||
}
|
||||
|
||||
func TestLogContext(t *testing.T) {
|
||||
@@ -31,7 +31,7 @@ func TestLogContext(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("component").WithOutput(NewConsoleWriter(writer, Ldebug, false))
|
||||
logger := New("component").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
logger.Info().Log("info")
|
||||
@@ -53,19 +53,19 @@ func TestLogContext(t *testing.T) {
|
||||
lenWithoutCtx := buffer.Len()
|
||||
buffer.Reset()
|
||||
|
||||
assert.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
|
||||
require.Greater(t, lenWithCtx, lenWithoutCtx, "Log line length without context is not shorter than with context")
|
||||
}
|
||||
|
||||
func TestLogClone(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
|
||||
assert.Contains(t, buffer.String(), `component="test"`)
|
||||
require.Contains(t, buffer.String(), `component="test"`)
|
||||
|
||||
buffer.Reset()
|
||||
|
||||
@@ -74,33 +74,33 @@ func TestLogClone(t *testing.T) {
|
||||
logger2.Info().Log("info")
|
||||
writer.Flush()
|
||||
|
||||
assert.Contains(t, buffer.String(), `component="tset"`)
|
||||
require.Contains(t, buffer.String(), `component="tset"`)
|
||||
}
|
||||
|
||||
func TestLogSilent(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Lsilent, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lsilent))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Warn().Log("warn")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Error().Log("error")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
@@ -108,26 +108,26 @@ func TestLogDebug(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Ldebug, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Ldebug))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Warn().Log("warn")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Error().Log("error")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
@@ -135,26 +135,26 @@ func TestLogInfo(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Linfo, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Linfo))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Warn().Log("warn")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Error().Log("error")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
@@ -162,26 +162,26 @@ func TestLogWarn(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Lwarn, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lwarn))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Warn().Log("warn")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Error().Log("error")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
}
|
||||
|
||||
@@ -189,25 +189,25 @@ func TestLogError(t *testing.T) {
|
||||
var buffer bytes.Buffer
|
||||
writer := bufio.NewWriter(&buffer)
|
||||
|
||||
logger := New("test").WithOutput(NewConsoleWriter(writer, Lerror, false))
|
||||
logger := New("test").WithOutput(NewLevelWriter(NewConsoleWriter(writer, false), Lerror))
|
||||
|
||||
logger.Debug().Log("debug")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Info().Log("info")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Warn().Log("warn")
|
||||
writer.Flush()
|
||||
assert.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
require.Equal(t, 0, buffer.Len(), "Buffer should be empty")
|
||||
buffer.Reset()
|
||||
|
||||
logger.Error().Log("error")
|
||||
writer.Flush()
|
||||
assert.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
require.NotEqual(t, 0, buffer.Len(), "Buffer should not be empty")
|
||||
buffer.Reset()
|
||||
}
|
||||
|
43
log/output.go
Normal file
43
log/output.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package log
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
|
||||
"github.com/mattn/go-isatty"
|
||||
)
|
||||
|
||||
type consoleOutput struct {
|
||||
writer io.Writer
|
||||
formatter Formatter
|
||||
}
|
||||
|
||||
func NewConsoleOutput(w io.Writer, useColor bool) Writer {
|
||||
writer := &consoleOutput{
|
||||
writer: w,
|
||||
}
|
||||
|
||||
color := useColor
|
||||
|
||||
if color {
|
||||
if w, ok := w.(*os.File); ok {
|
||||
if !isatty.IsTerminal(w.Fd()) && !isatty.IsCygwinTerminal(w.Fd()) {
|
||||
color = false
|
||||
}
|
||||
} else {
|
||||
color = false
|
||||
}
|
||||
}
|
||||
|
||||
writer.formatter = NewConsoleFormatter(color)
|
||||
|
||||
return NewSyncWriter(writer)
|
||||
}
|
||||
|
||||
func (w *consoleOutput) Write(e *Event) error {
|
||||
_, err := w.writer.Write(w.formatter.Bytes(e))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *consoleOutput) Close() {}
|
148
log/writer.go
148
log/writer.go
@@ -13,18 +13,50 @@ import (
|
||||
|
||||
type Writer interface {
|
||||
Write(e *Event) error
|
||||
Close()
|
||||
}
|
||||
|
||||
type discardWriter struct{}
|
||||
|
||||
func NewDiscardWriter() Writer {
|
||||
return &discardWriter{}
|
||||
}
|
||||
|
||||
func (w *discardWriter) Write(e *Event) error { return nil }
|
||||
func (w *discardWriter) Close() {}
|
||||
|
||||
type levelWriter struct {
|
||||
writer Writer
|
||||
level Level
|
||||
}
|
||||
|
||||
func NewLevelWriter(w Writer, level Level) Writer {
|
||||
return &levelWriter{
|
||||
writer: w,
|
||||
level: level,
|
||||
}
|
||||
}
|
||||
|
||||
func (w *levelWriter) Write(e *Event) error {
|
||||
if w.level < e.Level || e.Level == Lsilent {
|
||||
return nil
|
||||
}
|
||||
|
||||
return w.writer.Write(e)
|
||||
}
|
||||
|
||||
func (w *levelWriter) Close() {
|
||||
w.writer.Close()
|
||||
}
|
||||
|
||||
type jsonWriter struct {
|
||||
writer io.Writer
|
||||
level Level
|
||||
formatter Formatter
|
||||
}
|
||||
|
||||
func NewJSONWriter(w io.Writer, level Level) Writer {
|
||||
func NewJSONWriter(w io.Writer) Writer {
|
||||
writer := &jsonWriter{
|
||||
writer: w,
|
||||
level: level,
|
||||
formatter: NewJSONFormatter(),
|
||||
}
|
||||
|
||||
@@ -32,25 +64,21 @@ func NewJSONWriter(w io.Writer, level Level) Writer {
|
||||
}
|
||||
|
||||
func (w *jsonWriter) Write(e *Event) error {
|
||||
if w.level < e.Level || e.Level == Lsilent {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := w.writer.Write(w.formatter.Bytes(e))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *jsonWriter) Close() {}
|
||||
|
||||
type consoleWriter struct {
|
||||
writer io.Writer
|
||||
level Level
|
||||
formatter Formatter
|
||||
}
|
||||
|
||||
func NewConsoleWriter(w io.Writer, level Level, useColor bool) Writer {
|
||||
func NewConsoleWriter(w io.Writer, useColor bool) Writer {
|
||||
writer := &consoleWriter{
|
||||
writer: w,
|
||||
level: level,
|
||||
}
|
||||
|
||||
color := useColor
|
||||
@@ -71,15 +99,13 @@ func NewConsoleWriter(w io.Writer, level Level, useColor bool) Writer {
|
||||
}
|
||||
|
||||
func (w *consoleWriter) Write(e *Event) error {
|
||||
if w.level < e.Level || e.Level == Lsilent {
|
||||
return nil
|
||||
}
|
||||
|
||||
_, err := w.writer.Write(w.formatter.Bytes(e))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *consoleWriter) Close() {}
|
||||
|
||||
type topicWriter struct {
|
||||
writer Writer
|
||||
topics map[string]struct{}
|
||||
@@ -112,6 +138,10 @@ func (w *topicWriter) Write(e *Event) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *topicWriter) Close() {
|
||||
w.writer.Close()
|
||||
}
|
||||
|
||||
type levelRewriter struct {
|
||||
writer Writer
|
||||
rules []levelRewriteRule
|
||||
@@ -182,6 +212,10 @@ rules:
|
||||
return w.writer.Write(e)
|
||||
}
|
||||
|
||||
func (w *levelRewriter) Close() {
|
||||
w.writer.Close()
|
||||
}
|
||||
|
||||
type syncWriter struct {
|
||||
mu sync.Mutex
|
||||
writer Writer
|
||||
@@ -193,11 +227,15 @@ func NewSyncWriter(writer Writer) Writer {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *syncWriter) Write(e *Event) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
func (w *syncWriter) Write(e *Event) error {
|
||||
w.mu.Lock()
|
||||
defer w.mu.Unlock()
|
||||
|
||||
return s.writer.Write(e)
|
||||
return w.writer.Write(e)
|
||||
}
|
||||
|
||||
func (w *syncWriter) Close() {
|
||||
w.writer.Close()
|
||||
}
|
||||
|
||||
type multiWriter struct {
|
||||
@@ -212,8 +250,8 @@ func NewMultiWriter(writer ...Writer) Writer {
|
||||
return mw
|
||||
}
|
||||
|
||||
func (m *multiWriter) Write(e *Event) error {
|
||||
for _, w := range m.writer {
|
||||
func (w *multiWriter) Write(e *Event) error {
|
||||
for _, w := range w.writer {
|
||||
if err := w.Write(e); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -222,6 +260,12 @@ func (m *multiWriter) Write(e *Event) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *multiWriter) Close() {
|
||||
for _, w := range w.writer {
|
||||
w.Close()
|
||||
}
|
||||
}
|
||||
|
||||
type BufferWriter interface {
|
||||
Writer
|
||||
Events() []*Event
|
||||
@@ -230,13 +274,10 @@ type BufferWriter interface {
|
||||
type bufferWriter struct {
|
||||
lines *ring.Ring
|
||||
lock sync.RWMutex
|
||||
level Level
|
||||
}
|
||||
|
||||
func NewBufferWriter(level Level, lines int) BufferWriter {
|
||||
b := &bufferWriter{
|
||||
level: level,
|
||||
}
|
||||
func NewBufferWriter(lines int) BufferWriter {
|
||||
b := &bufferWriter{}
|
||||
|
||||
if lines > 0 {
|
||||
b.lines = ring.New(lines)
|
||||
@@ -245,33 +286,31 @@ func NewBufferWriter(level Level, lines int) BufferWriter {
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *bufferWriter) Write(e *Event) error {
|
||||
if b.level < e.Level || e.Level == Lsilent {
|
||||
return nil
|
||||
}
|
||||
func (w *bufferWriter) Write(e *Event) error {
|
||||
w.lock.Lock()
|
||||
defer w.lock.Unlock()
|
||||
|
||||
b.lock.Lock()
|
||||
defer b.lock.Unlock()
|
||||
|
||||
if b.lines != nil {
|
||||
b.lines.Value = e.clone()
|
||||
b.lines = b.lines.Next()
|
||||
if w.lines != nil {
|
||||
w.lines.Value = e.clone()
|
||||
w.lines = w.lines.Next()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bufferWriter) Events() []*Event {
|
||||
func (w *bufferWriter) Close() {}
|
||||
|
||||
func (w *bufferWriter) Events() []*Event {
|
||||
var lines = []*Event{}
|
||||
|
||||
if b.lines == nil {
|
||||
if w.lines == nil {
|
||||
return lines
|
||||
}
|
||||
|
||||
b.lock.RLock()
|
||||
defer b.lock.RUnlock()
|
||||
w.lock.RLock()
|
||||
defer w.lock.RUnlock()
|
||||
|
||||
b.lines.Do(func(l interface{}) {
|
||||
w.lines.Do(func(l interface{}) {
|
||||
if l == nil {
|
||||
return
|
||||
}
|
||||
@@ -281,3 +320,32 @@ func (b *bufferWriter) Events() []*Event {
|
||||
|
||||
return lines
|
||||
}
|
||||
|
||||
type fileWriter struct {
|
||||
writer *os.File
|
||||
formatter Formatter
|
||||
}
|
||||
|
||||
func NewFileWriter(path string, formatter Formatter) Writer {
|
||||
file, err := os.OpenFile(path, os.O_APPEND|os.O_CREATE|os.O_RDWR|os.O_SYNC, 0600)
|
||||
if err != nil {
|
||||
return NewDiscardWriter()
|
||||
}
|
||||
|
||||
writer := &fileWriter{
|
||||
writer: file,
|
||||
formatter: formatter,
|
||||
}
|
||||
|
||||
return NewSyncWriter(writer)
|
||||
}
|
||||
|
||||
func (w *fileWriter) Write(e *Event) error {
|
||||
_, err := w.writer.Write(append(w.formatter.Bytes(e), '\n'))
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func (w *fileWriter) Close() {
|
||||
w.writer.Close()
|
||||
}
|
||||
|
17
main.go
17
main.go
@@ -5,15 +5,26 @@ import (
|
||||
"os/signal"
|
||||
|
||||
"github.com/datarhei/core/v16/app/api"
|
||||
"github.com/datarhei/core/v16/config/store"
|
||||
"github.com/datarhei/core/v16/log"
|
||||
|
||||
_ "github.com/joho/godotenv/autoload"
|
||||
)
|
||||
|
||||
func main() {
|
||||
logger := log.New("Core").WithOutput(log.NewConsoleWriter(os.Stderr, log.Lwarn, true))
|
||||
logger := log.New("Core").WithOutput(
|
||||
log.NewLevelWriter(
|
||||
log.NewConsoleWriter(
|
||||
os.Stderr,
|
||||
true,
|
||||
),
|
||||
log.Lwarn,
|
||||
),
|
||||
)
|
||||
|
||||
app, err := api.New(os.Getenv("CORE_CONFIGFILE"), os.Stderr)
|
||||
configfile := store.Location(os.Getenv("CORE_CONFIGFILE"))
|
||||
|
||||
app, err := api.New(configfile, os.Stderr)
|
||||
if err != nil {
|
||||
logger.Error().WithError(err).Log("Failed to create new API")
|
||||
os.Exit(1)
|
||||
@@ -51,6 +62,8 @@ func main() {
|
||||
signal.Notify(quit, os.Interrupt)
|
||||
<-quit
|
||||
|
||||
logger.Close()
|
||||
|
||||
// Stop the app
|
||||
app.Destroy()
|
||||
}
|
||||
|
@@ -9,12 +9,13 @@ import (
|
||||
type Replacer interface {
|
||||
// RegisterTemplate registers a template for a specific placeholder. Template
|
||||
// may contain placeholders as well of the form {name}. They will be replaced
|
||||
// by the parameters of the placeholder (see Replace).
|
||||
RegisterTemplate(placeholder, template string)
|
||||
// by the parameters of the placeholder (see Replace). If a parameter is not of
|
||||
// a template is not present, default values can be provided.
|
||||
RegisterTemplate(placeholder, template string, defaults map[string]string)
|
||||
|
||||
// RegisterTemplateFunc does the same as RegisterTemplate, but the template
|
||||
// is returned by the template function.
|
||||
RegisterTemplateFunc(placeholder string, template func() string)
|
||||
RegisterTemplateFunc(placeholder string, template func() string, defaults map[string]string)
|
||||
|
||||
// Replace replaces all occurences of placeholder in str with value. The placeholder is of the
|
||||
// form {placeholder}. It is possible to escape a characters in value with \\ by appending a ^
|
||||
@@ -28,8 +29,13 @@ type Replacer interface {
|
||||
Replace(str, placeholder, value string) string
|
||||
}
|
||||
|
||||
type template struct {
|
||||
fn func() string
|
||||
defaults map[string]string
|
||||
}
|
||||
|
||||
type replacer struct {
|
||||
templates map[string]func() string
|
||||
templates map[string]template
|
||||
|
||||
re *regexp.Regexp
|
||||
templateRe *regexp.Regexp
|
||||
@@ -38,7 +44,7 @@ type replacer struct {
|
||||
// New returns a Replacer
|
||||
func New() Replacer {
|
||||
r := &replacer{
|
||||
templates: make(map[string]func() string),
|
||||
templates: make(map[string]template),
|
||||
re: regexp.MustCompile(`{([a-z]+)(?:\^(.))?(?:,(.*?))?}`),
|
||||
templateRe: regexp.MustCompile(`{([a-z]+)}`),
|
||||
}
|
||||
@@ -46,12 +52,18 @@ func New() Replacer {
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *replacer) RegisterTemplate(placeholder, template string) {
|
||||
r.templates[placeholder] = func() string { return template }
|
||||
func (r *replacer) RegisterTemplate(placeholder, tmpl string, defaults map[string]string) {
|
||||
r.templates[placeholder] = template{
|
||||
fn: func() string { return tmpl },
|
||||
defaults: defaults,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *replacer) RegisterTemplateFunc(placeholder string, template func() string) {
|
||||
r.templates[placeholder] = template
|
||||
func (r *replacer) RegisterTemplateFunc(placeholder string, tmplFn func() string, defaults map[string]string) {
|
||||
r.templates[placeholder] = template{
|
||||
fn: tmplFn,
|
||||
defaults: defaults,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *replacer) Replace(str, placeholder, value string) string {
|
||||
@@ -63,16 +75,20 @@ func (r *replacer) Replace(str, placeholder, value string) string {
|
||||
|
||||
// We need a copy from the value
|
||||
v := value
|
||||
var tmpl template = template{
|
||||
fn: func() string { return v },
|
||||
}
|
||||
|
||||
// Check for a registered template
|
||||
if len(v) == 0 {
|
||||
tmplFunc, ok := r.templates[placeholder]
|
||||
t, ok := r.templates[placeholder]
|
||||
if ok {
|
||||
v = tmplFunc()
|
||||
tmpl = t
|
||||
}
|
||||
}
|
||||
|
||||
v = r.compileTemplate(v, matches[3])
|
||||
v = tmpl.fn()
|
||||
v = r.compileTemplate(v, matches[3], tmpl.defaults)
|
||||
|
||||
if len(matches[2]) != 0 {
|
||||
// If there's a character to escape, we also have to escape the
|
||||
@@ -97,13 +113,18 @@ func (r *replacer) Replace(str, placeholder, value string) string {
|
||||
// placeholder name and will be replaced with the value. The resulting string is "Hello World!".
|
||||
// If a placeholder name is not present in the params string, it will not be replaced. The key
|
||||
// and values can be escaped as in net/url.QueryEscape.
|
||||
func (r *replacer) compileTemplate(str, params string) string {
|
||||
if len(params) == 0 {
|
||||
func (r *replacer) compileTemplate(str, params string, defaults map[string]string) string {
|
||||
if len(params) == 0 && len(defaults) == 0 {
|
||||
return str
|
||||
}
|
||||
|
||||
p := make(map[string]string)
|
||||
|
||||
// Copy the defaults
|
||||
for key, value := range defaults {
|
||||
p[key] = value
|
||||
}
|
||||
|
||||
// taken from net/url.ParseQuery
|
||||
for params != "" {
|
||||
var key string
|
||||
|
@@ -34,7 +34,7 @@ func TestReplace(t *testing.T) {
|
||||
|
||||
func TestReplaceTemplate(t *testing.T) {
|
||||
r := New()
|
||||
r.RegisterTemplate("foobar", "Hello {who}! {what}?")
|
||||
r.RegisterTemplate("foobar", "Hello {who}! {what}?", nil)
|
||||
|
||||
replaced := r.Replace("{foobar,who=World}", "foobar", "")
|
||||
require.Equal(t, "Hello World! {what}?", replaced)
|
||||
@@ -46,6 +46,20 @@ func TestReplaceTemplate(t *testing.T) {
|
||||
require.Equal(t, "Hello World! E=mc\\\\:2?", replaced)
|
||||
}
|
||||
|
||||
func TestReplaceTemplateDefaults(t *testing.T) {
|
||||
r := New()
|
||||
r.RegisterTemplate("foobar", "Hello {who}! {what}?", map[string]string{
|
||||
"who": "someone",
|
||||
"what": "something",
|
||||
})
|
||||
|
||||
replaced := r.Replace("{foobar}", "foobar", "")
|
||||
require.Equal(t, "Hello someone! something?", replaced)
|
||||
|
||||
replaced = r.Replace("{foobar,who=World}", "foobar", "")
|
||||
require.Equal(t, "Hello World! something?", replaced)
|
||||
}
|
||||
|
||||
func TestReplaceCompileTemplate(t *testing.T) {
|
||||
samples := [][3]string{
|
||||
{"Hello {who}!", "who=World", "Hello World!"},
|
||||
@@ -58,7 +72,27 @@ func TestReplaceCompileTemplate(t *testing.T) {
|
||||
r := New().(*replacer)
|
||||
|
||||
for _, e := range samples {
|
||||
replaced := r.compileTemplate(e[0], e[1])
|
||||
replaced := r.compileTemplate(e[0], e[1], nil)
|
||||
require.Equal(t, e[2], replaced, e[0])
|
||||
}
|
||||
}
|
||||
|
||||
func TestReplaceCompileTemplateDefaults(t *testing.T) {
|
||||
samples := [][3]string{
|
||||
{"Hello {who}!", "", "Hello someone!"},
|
||||
{"Hello {who}!", "who=World", "Hello World!"},
|
||||
{"Hello {who}! {what}?", "who=World", "Hello World! something?"},
|
||||
{"Hello {who}! {what}?", "who=World,what=Yeah", "Hello World! Yeah?"},
|
||||
{"Hello {who}! {what}?", "who=World,what=", "Hello World! ?"},
|
||||
}
|
||||
|
||||
r := New().(*replacer)
|
||||
|
||||
for _, e := range samples {
|
||||
replaced := r.compileTemplate(e[0], e[1], map[string]string{
|
||||
"who": "someone",
|
||||
"what": "something",
|
||||
})
|
||||
require.Equal(t, e[2], replaced, e[0])
|
||||
}
|
||||
}
|
||||
|
@@ -381,7 +381,7 @@ func (s *server) handlePlay(conn *rtmp.Conn) {
|
||||
}
|
||||
|
||||
// Adjust the timestamp such that the stream starts from 0
|
||||
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: true})
|
||||
filters = append(filters, &pktque.FixTime{StartFromZero: true, MakeIncrement: false})
|
||||
|
||||
demuxer := &pktque.FilterDemuxer{
|
||||
Filter: filters,
|
||||
|
@@ -9,6 +9,8 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/datarhei/core/v16/log"
|
||||
)
|
||||
|
||||
type API interface {
|
||||
@@ -19,6 +21,7 @@ type Config struct {
|
||||
URL string
|
||||
Token string
|
||||
Client *http.Client
|
||||
Logger log.Logger
|
||||
}
|
||||
|
||||
type api struct {
|
||||
@@ -29,6 +32,8 @@ type api struct {
|
||||
accessTokenType string
|
||||
|
||||
client *http.Client
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func New(config Config) (API, error) {
|
||||
@@ -36,6 +41,11 @@ func New(config Config) (API, error) {
|
||||
url: config.URL,
|
||||
token: config.Token,
|
||||
client: config.Client,
|
||||
logger: config.Logger,
|
||||
}
|
||||
|
||||
if a.logger == nil {
|
||||
a.logger = log.New("")
|
||||
}
|
||||
|
||||
if !strings.HasSuffix(a.url, "/") {
|
||||
@@ -95,7 +105,7 @@ func (c *copyReader) Read(p []byte) (int, error) {
|
||||
|
||||
if err == io.EOF {
|
||||
c.reader = c.copy
|
||||
c.copy = new(bytes.Buffer)
|
||||
c.copy = &bytes.Buffer{}
|
||||
}
|
||||
|
||||
return i, err
|
||||
|
@@ -55,7 +55,7 @@ func New(config Config) (Service, error) {
|
||||
}
|
||||
|
||||
if s.logger == nil {
|
||||
s.logger = log.New("Service")
|
||||
s.logger = log.New("")
|
||||
}
|
||||
|
||||
s.logger = s.logger.WithField("url", config.URL)
|
||||
@@ -214,7 +214,10 @@ func (s *service) collect() (time.Duration, error) {
|
||||
return 15 * time.Minute, fmt.Errorf("failed to send monitor data to service: %w", err)
|
||||
}
|
||||
|
||||
s.logger.Debug().WithField("next", r.Next).Log("Sent monitor data")
|
||||
s.logger.Debug().WithFields(log.Fields{
|
||||
"next": r.Next,
|
||||
"data": data,
|
||||
}).Log("Sent monitor data")
|
||||
|
||||
if r.Next == 0 {
|
||||
r.Next = 5 * 60
|
||||
@@ -230,6 +233,8 @@ func (s *service) Start() {
|
||||
go s.tick(ctx, time.Second)
|
||||
|
||||
s.stopOnce = sync.Once{}
|
||||
|
||||
s.logger.Info().Log("Connected")
|
||||
})
|
||||
}
|
||||
|
||||
@@ -237,6 +242,8 @@ func (s *service) Stop() {
|
||||
s.stopOnce.Do(func() {
|
||||
s.stopTicker()
|
||||
s.startOnce = sync.Once{}
|
||||
|
||||
s.logger.Info().Log("Disconnected")
|
||||
})
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user