mirror of
https://github.com/asticode/go-astiencoder.git
synced 2025-12-24 13:57:53 +08:00
Added event logger with message merging
This commit is contained in:
@@ -8,7 +8,9 @@ import (
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/asticode/go-astiencoder"
|
||||
astilibav "github.com/asticode/go-astiencoder/libav"
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
@@ -35,9 +37,6 @@ func main() {
|
||||
|
||||
// Create workflow server
|
||||
ws := astiencoder.NewServer(astiencoder.ServerOptions{Logger: l})
|
||||
|
||||
// Adapt event handler
|
||||
astiencoder.LoggerEventHandlerAdapter(l, eh)
|
||||
ws.EventHandlerAdapter(eh)
|
||||
|
||||
// Create stater
|
||||
@@ -46,6 +45,9 @@ func main() {
|
||||
// Create encoder
|
||||
e := newEncoder(c.Encoder, eh, ws, l, s)
|
||||
|
||||
// Log event handler
|
||||
defer eh.Log(l, astilibav.WithLog(astiav.LogLevelInfo)).Start(e.w.Context()).Close()
|
||||
|
||||
// Handle signals
|
||||
e.w.HandleSignals()
|
||||
|
||||
|
||||
180
event.go
180
event.go
@@ -1,13 +1,5 @@
|
||||
package astiencoder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
// Default event names
|
||||
var (
|
||||
EventNameError = "astiencoder.error"
|
||||
@@ -45,178 +37,6 @@ func EventError(target interface{}, err error) Event {
|
||||
}
|
||||
}
|
||||
|
||||
// EventHandler represents an event handler
|
||||
type EventHandler struct {
|
||||
// Indexed by target then by event name then by listener idx
|
||||
// We use a map[int]Listener so that deletion is as smooth as possible
|
||||
cs map[interface{}]map[string]map[int]EventCallback
|
||||
idx int
|
||||
m *sync.Mutex
|
||||
}
|
||||
|
||||
// EventCallback represents an event callback
|
||||
type EventCallback func(e Event) (deleteListener bool)
|
||||
|
||||
// NewEventHandler creates a new event handler
|
||||
func NewEventHandler() *EventHandler {
|
||||
return &EventHandler{
|
||||
cs: make(map[interface{}]map[string]map[int]EventCallback),
|
||||
m: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a new callback for a specific target and event name
|
||||
func (h *EventHandler) Add(target interface{}, eventName string, c EventCallback) {
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
if _, ok := h.cs[target]; !ok {
|
||||
h.cs[target] = make(map[string]map[int]EventCallback)
|
||||
}
|
||||
if _, ok := h.cs[target][eventName]; !ok {
|
||||
h.cs[target][eventName] = make(map[int]EventCallback)
|
||||
}
|
||||
h.idx++
|
||||
h.cs[target][eventName][h.idx] = c
|
||||
}
|
||||
|
||||
// AddForEventName adds a new callback for a specific event name
|
||||
func (h *EventHandler) AddForEventName(eventName string, c EventCallback) {
|
||||
h.Add(nil, eventName, c)
|
||||
}
|
||||
|
||||
// AddForTarget adds a new callback for a specific target
|
||||
func (h *EventHandler) AddForTarget(target interface{}, c EventCallback) {
|
||||
h.Add(target, "", c)
|
||||
}
|
||||
|
||||
// AddForAll adds a new callback for all events
|
||||
func (h *EventHandler) AddForAll(c EventCallback) {
|
||||
h.Add(nil, "", c)
|
||||
}
|
||||
|
||||
func (h *EventHandler) del(target interface{}, eventName string, idx int) {
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
if _, ok := h.cs[target]; !ok {
|
||||
return
|
||||
}
|
||||
if _, ok := h.cs[target][eventName]; !ok {
|
||||
return
|
||||
}
|
||||
delete(h.cs[target][eventName], idx)
|
||||
}
|
||||
|
||||
type eventHandlerCallback struct {
|
||||
c EventCallback
|
||||
eventName string
|
||||
idx int
|
||||
target interface{}
|
||||
}
|
||||
|
||||
func (h *EventHandler) callbacks(target interface{}, eventName string) (cs []eventHandlerCallback) {
|
||||
// Lock
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
|
||||
// Index callbacks
|
||||
ics := make(map[int]eventHandlerCallback)
|
||||
var idxs []int
|
||||
targets := []interface{}{nil}
|
||||
if target != nil {
|
||||
targets = append(targets, target)
|
||||
}
|
||||
for _, target := range targets {
|
||||
if _, ok := h.cs[target]; ok {
|
||||
eventNames := []string{""}
|
||||
if eventName != "" {
|
||||
eventNames = append(eventNames, eventName)
|
||||
}
|
||||
for _, eventName := range eventNames {
|
||||
if _, ok := h.cs[target][eventName]; ok {
|
||||
for idx, c := range h.cs[target][eventName] {
|
||||
ics[idx] = eventHandlerCallback{
|
||||
c: c,
|
||||
eventName: eventName,
|
||||
idx: idx,
|
||||
target: target,
|
||||
}
|
||||
idxs = append(idxs, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort
|
||||
sort.Ints(idxs)
|
||||
|
||||
// Append
|
||||
for _, idx := range idxs {
|
||||
cs = append(cs, ics[idx])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Emit emits an event
|
||||
func (h *EventHandler) Emit(e Event) {
|
||||
for _, c := range h.callbacks(e.Target, e.Name) {
|
||||
if c.c(e) {
|
||||
h.del(c.target, c.eventName, c.idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// LoggerEventHandlerAdapter adapts the event handler so that it logs the events properly
|
||||
func LoggerEventHandlerAdapter(i astikit.StdLogger, h *EventHandler) {
|
||||
// Create logger
|
||||
l := astikit.AdaptStdLogger(i)
|
||||
|
||||
// Error
|
||||
h.AddForEventName(EventNameError, func(e Event) bool {
|
||||
var t string
|
||||
if v, ok := e.Target.(Node); ok {
|
||||
t = v.Metadata().Name
|
||||
} else if v, ok := e.Target.(*Workflow); ok {
|
||||
t = v.Name()
|
||||
} else if e.Target != nil {
|
||||
t = fmt.Sprintf("%p", e.Target)
|
||||
}
|
||||
if len(t) > 0 {
|
||||
t = "(" + t + ")"
|
||||
}
|
||||
l.Errorf("%s%s", e.Payload.(error), t)
|
||||
return false
|
||||
})
|
||||
|
||||
// Node
|
||||
h.AddForEventName(EventNameNodeClosed, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is closed", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodePaused, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is paused", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodeStarted, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is started", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodeStopped, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is stopped", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
|
||||
// Workflow
|
||||
h.AddForEventName(EventNameWorkflowStarted, func(e Event) bool {
|
||||
l.Infof("astiencoder: workflow %s is started", e.Target.(*Workflow).Name())
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameWorkflowStopped, func(e Event) bool {
|
||||
l.Infof("astiencoder: workflow %s is stopped", e.Target.(*Workflow).Name())
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
// EventTypeTransformer represents a function capable of transforming an event type to an event name
|
||||
type EventTypeTransformer func(eventType string) string
|
||||
|
||||
|
||||
188
event_handler.go
Normal file
188
event_handler.go
Normal file
@@ -0,0 +1,188 @@
|
||||
package astiencoder
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"sync"
|
||||
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
// EventHandler represents an event handler
|
||||
type EventHandler struct {
|
||||
// Indexed by target then by event name then by listener idx
|
||||
// We use a map[int]Listener so that deletion is as smooth as possible
|
||||
cs map[interface{}]map[string]map[int]EventCallback
|
||||
idx int
|
||||
m *sync.Mutex
|
||||
}
|
||||
|
||||
// EventCallback represents an event callback
|
||||
type EventCallback func(e Event) (deleteListener bool)
|
||||
|
||||
// NewEventHandler creates a new event handler
|
||||
func NewEventHandler() *EventHandler {
|
||||
return &EventHandler{
|
||||
cs: make(map[interface{}]map[string]map[int]EventCallback),
|
||||
m: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
// Add adds a new callback for a specific target and event name
|
||||
func (h *EventHandler) Add(target interface{}, eventName string, c EventCallback) {
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
if _, ok := h.cs[target]; !ok {
|
||||
h.cs[target] = make(map[string]map[int]EventCallback)
|
||||
}
|
||||
if _, ok := h.cs[target][eventName]; !ok {
|
||||
h.cs[target][eventName] = make(map[int]EventCallback)
|
||||
}
|
||||
h.idx++
|
||||
h.cs[target][eventName][h.idx] = c
|
||||
}
|
||||
|
||||
// AddForEventName adds a new callback for a specific event name
|
||||
func (h *EventHandler) AddForEventName(eventName string, c EventCallback) {
|
||||
h.Add(nil, eventName, c)
|
||||
}
|
||||
|
||||
// AddForTarget adds a new callback for a specific target
|
||||
func (h *EventHandler) AddForTarget(target interface{}, c EventCallback) {
|
||||
h.Add(target, "", c)
|
||||
}
|
||||
|
||||
// AddForAll adds a new callback for all events
|
||||
func (h *EventHandler) AddForAll(c EventCallback) {
|
||||
h.Add(nil, "", c)
|
||||
}
|
||||
|
||||
func (h *EventHandler) del(target interface{}, eventName string, idx int) {
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
if _, ok := h.cs[target]; !ok {
|
||||
return
|
||||
}
|
||||
if _, ok := h.cs[target][eventName]; !ok {
|
||||
return
|
||||
}
|
||||
delete(h.cs[target][eventName], idx)
|
||||
}
|
||||
|
||||
type eventHandlerCallback struct {
|
||||
c EventCallback
|
||||
eventName string
|
||||
idx int
|
||||
target interface{}
|
||||
}
|
||||
|
||||
func (h *EventHandler) callbacks(target interface{}, eventName string) (cs []eventHandlerCallback) {
|
||||
// Lock
|
||||
h.m.Lock()
|
||||
defer h.m.Unlock()
|
||||
|
||||
// Index callbacks
|
||||
ics := make(map[int]eventHandlerCallback)
|
||||
var idxs []int
|
||||
targets := []interface{}{nil}
|
||||
if target != nil {
|
||||
targets = append(targets, target)
|
||||
}
|
||||
for _, target := range targets {
|
||||
if _, ok := h.cs[target]; ok {
|
||||
eventNames := []string{""}
|
||||
if eventName != "" {
|
||||
eventNames = append(eventNames, eventName)
|
||||
}
|
||||
for _, eventName := range eventNames {
|
||||
if _, ok := h.cs[target][eventName]; ok {
|
||||
for idx, c := range h.cs[target][eventName] {
|
||||
ics[idx] = eventHandlerCallback{
|
||||
c: c,
|
||||
eventName: eventName,
|
||||
idx: idx,
|
||||
target: target,
|
||||
}
|
||||
idxs = append(idxs, idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Sort
|
||||
sort.Ints(idxs)
|
||||
|
||||
// Append
|
||||
for _, idx := range idxs {
|
||||
cs = append(cs, ics[idx])
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Emit emits an event
|
||||
func (h *EventHandler) Emit(e Event) {
|
||||
for _, c := range h.callbacks(e.Target, e.Name) {
|
||||
if c.c(e) {
|
||||
h.del(c.target, c.eventName, c.idx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
type EventHandlerLogOption func(*EventHandler, *EventLogger)
|
||||
|
||||
func (h *EventHandler) Log(i astikit.StdLogger, opts ...EventHandlerLogOption) (l *EventLogger) {
|
||||
// Create event logger
|
||||
l = newEventLogger(i)
|
||||
|
||||
// Loop through options
|
||||
for _, opt := range opts {
|
||||
opt(h, l)
|
||||
}
|
||||
|
||||
// Error
|
||||
h.AddForEventName(EventNameError, func(e Event) bool {
|
||||
var t string
|
||||
if v, ok := e.Target.(Node); ok {
|
||||
t = v.Metadata().Name
|
||||
} else if v, ok := e.Target.(*Workflow); ok {
|
||||
t = v.Name()
|
||||
} else if e.Target != nil {
|
||||
t = fmt.Sprintf("%p", e.Target)
|
||||
}
|
||||
if len(t) > 0 {
|
||||
t = "(" + t + ")"
|
||||
}
|
||||
l.Errorf("%s%s", e.Payload.(error), t)
|
||||
return false
|
||||
})
|
||||
|
||||
// Node
|
||||
h.AddForEventName(EventNameNodeClosed, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is closed", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodePaused, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is paused", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodeStarted, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is started", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameNodeStopped, func(e Event) bool {
|
||||
l.Infof("astiencoder: node %s (%s) is stopped", e.Target.(Node).Metadata().Name, e.Target.(Node).Metadata().Label)
|
||||
return false
|
||||
})
|
||||
|
||||
// Workflow
|
||||
h.AddForEventName(EventNameWorkflowStarted, func(e Event) bool {
|
||||
l.Infof("astiencoder: workflow %s is started", e.Target.(*Workflow).Name())
|
||||
return false
|
||||
})
|
||||
h.AddForEventName(EventNameWorkflowStopped, func(e Event) bool {
|
||||
l.Infof("astiencoder: workflow %s is stopped", e.Target.(*Workflow).Name())
|
||||
return false
|
||||
})
|
||||
return
|
||||
}
|
||||
@@ -3,10 +3,10 @@ package astiencoder
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestEvent(t *testing.T) {
|
||||
func TestEventHandler(t *testing.T) {
|
||||
// Setup
|
||||
eh := NewEventHandler()
|
||||
var es []string
|
||||
@@ -38,7 +38,7 @@ func TestEvent(t *testing.T) {
|
||||
Name: "test-1",
|
||||
Target: "test-1",
|
||||
})
|
||||
assert.Equal(t, []string{"1", "3", "5"}, es)
|
||||
require.Equal(t, []string{"1", "3", "5"}, es)
|
||||
es = []string(nil)
|
||||
|
||||
// Emit #2
|
||||
@@ -46,5 +46,5 @@ func TestEvent(t *testing.T) {
|
||||
Name: "test-2",
|
||||
Target: "test-2",
|
||||
})
|
||||
assert.Equal(t, []string{"2", "4", "5"}, es)
|
||||
require.Equal(t, []string{"2", "4", "5"}, es)
|
||||
}
|
||||
200
event_logger.go
Normal file
200
event_logger.go
Normal file
@@ -0,0 +1,200 @@
|
||||
package astiencoder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
type logLevel string
|
||||
|
||||
const (
|
||||
logLevelDebug logLevel = "debug"
|
||||
logLevelError logLevel = "error"
|
||||
logLevelInfo logLevel = "info"
|
||||
logLevelWarn logLevel = "warn"
|
||||
)
|
||||
|
||||
type EventLogger struct {
|
||||
cancel context.CancelFunc
|
||||
ctx context.Context
|
||||
is map[string]*eventLoggerItem // Indexed by key
|
||||
l astikit.CompleteLogger
|
||||
m *sync.Mutex // Locks p
|
||||
messageMergingPeriod time.Duration
|
||||
}
|
||||
|
||||
type eventLoggerItem struct {
|
||||
count int
|
||||
createdAt time.Time
|
||||
key string
|
||||
l logLevel
|
||||
}
|
||||
|
||||
func newEventLoggerItem(key string, l logLevel) *eventLoggerItem {
|
||||
return &eventLoggerItem{
|
||||
createdAt: time.Now(),
|
||||
key: key,
|
||||
l: l,
|
||||
}
|
||||
}
|
||||
|
||||
func WithMessageMerging(period time.Duration) EventHandlerLogOption {
|
||||
return func(_ *EventHandler, l *EventLogger) {
|
||||
l.messageMergingPeriod = period
|
||||
}
|
||||
}
|
||||
|
||||
func newEventLogger(i astikit.StdLogger) *EventLogger {
|
||||
return &EventLogger{
|
||||
is: make(map[string]*eventLoggerItem),
|
||||
l: astikit.AdaptStdLogger(i),
|
||||
m: &sync.Mutex{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *EventLogger) Start(ctx context.Context) *EventLogger {
|
||||
// Create context
|
||||
l.ctx, l.cancel = context.WithCancel(ctx)
|
||||
|
||||
// No need to start anything
|
||||
if l.messageMergingPeriod == 0 {
|
||||
return l
|
||||
}
|
||||
|
||||
// Execute in a goroutine since this is blocking
|
||||
go func() {
|
||||
// Create ticker
|
||||
t := time.NewTicker(200 * time.Millisecond)
|
||||
defer t.Stop()
|
||||
|
||||
// Loop
|
||||
for {
|
||||
select {
|
||||
case <-t.C:
|
||||
l.tick()
|
||||
case <-l.ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return l
|
||||
}
|
||||
|
||||
func (l *EventLogger) Close() {
|
||||
if l.cancel != nil {
|
||||
l.cancel()
|
||||
}
|
||||
l.purge()
|
||||
}
|
||||
|
||||
func (l *EventLogger) tick() {
|
||||
// Lock
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
|
||||
// Get now
|
||||
n := time.Now()
|
||||
|
||||
// Loop through items
|
||||
for k, i := range l.is {
|
||||
// Period has been reached
|
||||
if n.Sub(i.createdAt) > l.messageMergingPeriod {
|
||||
l.dumpItem(k, i)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (l *EventLogger) purge() {
|
||||
// Lock
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
|
||||
// Loop through items
|
||||
for k, i := range l.is {
|
||||
l.dumpItem(k, i)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *EventLogger) dumpItem(k string, i *eventLoggerItem) {
|
||||
if i.count > 0 {
|
||||
l.write(fmt.Sprintf("astiencoder: message repeated %d time(s): %s", i.count, i.key), i.l)
|
||||
}
|
||||
delete(l.is, k)
|
||||
}
|
||||
|
||||
func (l *EventLogger) process(key, msg string, lv logLevel) {
|
||||
// Merge messages
|
||||
if l.messageMergingPeriod > 0 {
|
||||
// Merge
|
||||
if stop := l.merge(key, lv); stop {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Write
|
||||
l.write(msg, lv)
|
||||
}
|
||||
|
||||
func (l *EventLogger) merge(key string, lv logLevel) (stop bool) {
|
||||
// Lock
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
|
||||
// Create final key
|
||||
k := string(lv) + ":" + key
|
||||
|
||||
// Check whether item exists
|
||||
i, ok := l.is[k]
|
||||
if ok {
|
||||
i.count++
|
||||
return true
|
||||
}
|
||||
|
||||
// Create item
|
||||
l.is[k] = newEventLoggerItem(key, lv)
|
||||
return false
|
||||
}
|
||||
|
||||
func (l *EventLogger) write(msg string, lv logLevel) {
|
||||
switch lv {
|
||||
case logLevelDebug:
|
||||
l.l.Debug(msg)
|
||||
case logLevelError:
|
||||
l.l.Error(msg)
|
||||
case logLevelWarn:
|
||||
l.l.Warn(msg)
|
||||
default:
|
||||
l.l.Info(msg)
|
||||
}
|
||||
}
|
||||
|
||||
func (l *EventLogger) Debugk(key, msg string) {
|
||||
l.process(key, msg, logLevelDebug)
|
||||
}
|
||||
|
||||
func (l *EventLogger) Errorf(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.process(msg, msg, logLevelError)
|
||||
}
|
||||
|
||||
func (l *EventLogger) Errork(key, msg string) {
|
||||
l.process(key, msg, logLevelError)
|
||||
}
|
||||
|
||||
func (l *EventLogger) Infof(format string, v ...interface{}) {
|
||||
msg := fmt.Sprintf(format, v...)
|
||||
l.process(msg, msg, logLevelInfo)
|
||||
}
|
||||
|
||||
func (l *EventLogger) Infok(key, msg string) {
|
||||
l.process(key, msg, logLevelInfo)
|
||||
}
|
||||
|
||||
func (l *EventLogger) Warnk(key, msg string) {
|
||||
l.process(key, msg, logLevelWarn)
|
||||
|
||||
}
|
||||
120
event_logger_test.go
Normal file
120
event_logger_test.go
Normal file
@@ -0,0 +1,120 @@
|
||||
package astiencoder
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockedLogger struct {
|
||||
m *sync.Mutex
|
||||
msgs map[string]int
|
||||
}
|
||||
|
||||
func newMockedLogger() *mockedLogger {
|
||||
return &mockedLogger{
|
||||
m: &sync.Mutex{},
|
||||
msgs: make(map[string]int),
|
||||
}
|
||||
}
|
||||
|
||||
func (l *mockedLogger) Fatal(v ...interface{}) {
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
l.msgs[fmt.Sprint(v...)]++
|
||||
os.Exit(1)
|
||||
}
|
||||
func (l *mockedLogger) Fatalf(format string, v ...interface{}) {
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
l.msgs[fmt.Sprintf(format, v...)]++
|
||||
os.Exit(1)
|
||||
}
|
||||
func (l *mockedLogger) Print(v ...interface{}) {
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
l.msgs[fmt.Sprint(v...)]++
|
||||
}
|
||||
func (l *mockedLogger) Printf(format string, v ...interface{}) {
|
||||
l.m.Lock()
|
||||
defer l.m.Unlock()
|
||||
l.msgs[fmt.Sprintf(format, v...)]++
|
||||
}
|
||||
|
||||
func TestEventLogger(t *testing.T) {
|
||||
ml := newMockedLogger()
|
||||
l := newEventLogger(ml)
|
||||
WithMessageMerging(500*time.Millisecond)(nil, l)
|
||||
l.Start(context.Background())
|
||||
go func() {
|
||||
l.Errorf("errorf-%d", 1)
|
||||
l.Errorf("errorf-%d", 1)
|
||||
l.Errorf("errorf-%d", 2)
|
||||
l.Errorf("errorf-%d", 3)
|
||||
l.Errorf("errorf-%d", 3)
|
||||
l.Errorf("errorf-%d", 3)
|
||||
l.Infof("infof-%d", 1)
|
||||
l.Infof("infof-%d", 1)
|
||||
l.Infof("infof-%d", 2)
|
||||
l.Infof("infof-%d", 3)
|
||||
l.Infof("infof-%d", 3)
|
||||
l.Infof("infof-%d", 3)
|
||||
l.Debugk("debugk-%d", "debugk-1")
|
||||
l.Debugk("debugk-%d", "debugk-2")
|
||||
l.Debugk("debugk-%d", "debugk-3")
|
||||
l.Errork("errork-%d", "errork-1")
|
||||
l.Errork("errork-%d", "errork-2")
|
||||
l.Errork("errork-%d", "errork-3")
|
||||
l.Infok("infok-%d", "infok-1")
|
||||
l.Infok("infok-%d", "infok-2")
|
||||
l.Infok("infok-%d", "infok-3")
|
||||
l.Warnk("warnk-%d", "warnk-1")
|
||||
l.Warnk("warnk-%d", "warnk-2")
|
||||
l.Warnk("warnk-%d", "warnk-3")
|
||||
l.Errorf("msg")
|
||||
l.Errorf("msg")
|
||||
l.Infof("msg")
|
||||
l.Infof("msg")
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
ml.m.Lock()
|
||||
require.Equal(t, map[string]int{
|
||||
"astiencoder: message repeated 1 time(s): errorf-1": 1,
|
||||
"astiencoder: message repeated 1 time(s): infof-1": 1,
|
||||
"astiencoder: message repeated 2 time(s): debugk-%d": 1,
|
||||
"astiencoder: message repeated 2 time(s): errork-%d": 1,
|
||||
"astiencoder: message repeated 2 time(s): errorf-3": 1,
|
||||
"astiencoder: message repeated 2 time(s): infok-%d": 1,
|
||||
"astiencoder: message repeated 2 time(s): infof-3": 1,
|
||||
"astiencoder: message repeated 1 time(s): msg": 2,
|
||||
"astiencoder: message repeated 2 time(s): warnk-%d": 1,
|
||||
"debugk-1": 1,
|
||||
"errork-1": 1,
|
||||
"errorf-1": 1,
|
||||
"errorf-2": 1,
|
||||
"errorf-3": 1,
|
||||
"infok-1": 1,
|
||||
"infof-1": 1,
|
||||
"infof-2": 1,
|
||||
"infof-3": 1,
|
||||
"msg": 2,
|
||||
"warnk-1": 1,
|
||||
}, ml.msgs)
|
||||
ml.msgs = map[string]int{}
|
||||
ml.m.Unlock()
|
||||
l.Infof("purge-%d", 1)
|
||||
l.Infof("purge-%d", 1)
|
||||
l.Infof("purge-%d", 1)
|
||||
l.Close()
|
||||
ml.m.Lock()
|
||||
require.Equal(t, map[string]int{
|
||||
"astiencoder: message repeated 2 time(s): purge-1": 1,
|
||||
"purge-1": 1,
|
||||
}, ml.msgs)
|
||||
ml.m.Unlock()
|
||||
}
|
||||
2
go.mod
2
go.mod
@@ -4,7 +4,7 @@ go 1.13
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v0.3.1
|
||||
github.com/asticode/go-astiav v0.3.0
|
||||
github.com/asticode/go-astiav v0.3.1
|
||||
github.com/asticode/go-astikit v0.28.2
|
||||
github.com/asticode/go-astiws v1.5.0
|
||||
github.com/gorilla/websocket v1.4.1
|
||||
|
||||
4
go.sum
4
go.sum
@@ -2,8 +2,8 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ
|
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
|
||||
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
|
||||
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
|
||||
github.com/asticode/go-astiav v0.3.0 h1:H+jAHFeid06TwMCXBn8I9MCWfx3UmBFMd2Et3U928XY=
|
||||
github.com/asticode/go-astiav v0.3.0/go.mod h1:phvUnSSlV91S/PELeLkDisYiRLOssxWOsj4oDrqM/54=
|
||||
github.com/asticode/go-astiav v0.3.1 h1:EYplmgrdhElsPiuLmPEOVSILmm0Ix+gIQ8g/jMr3xBY=
|
||||
github.com/asticode/go-astiav v0.3.1/go.mod h1:phvUnSSlV91S/PELeLkDisYiRLOssxWOsj4oDrqM/54=
|
||||
github.com/asticode/go-astikit v0.1.0/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
|
||||
github.com/asticode/go-astikit v0.28.2 h1:c2shjqarbZwcQGQ7GPfchG2sSOL/7NHGbdgHTx43RH8=
|
||||
github.com/asticode/go-astikit v0.28.2/go.mod h1:h4ly7idim1tNhaVkdVBeXQZEE3L0xblP7fCWbgwipF0=
|
||||
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -25,7 +24,7 @@ func TestFrameRestamperWithFrameDuration(t *testing.T) {
|
||||
} {
|
||||
f.SetPts(ft.input)
|
||||
r.Restamp(f)
|
||||
assert.Equal(t, ft.output, f.Pts())
|
||||
require.Equal(t, ft.output, f.Pts())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -52,6 +51,6 @@ func TestFrameRestamperWithModulo(t *testing.T) {
|
||||
} {
|
||||
f.SetPts(ft.input)
|
||||
r.Restamp(f)
|
||||
assert.Equal(t, ft.output, f.Pts())
|
||||
require.Equal(t, ft.output, f.Pts())
|
||||
}
|
||||
}
|
||||
|
||||
112
libav/log.go
112
libav/log.go
@@ -1,86 +1,76 @@
|
||||
package astilibav
|
||||
|
||||
import (
|
||||
"regexp"
|
||||
"strings"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/asticode/go-astiencoder"
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
type EventLog struct {
|
||||
Fmt string
|
||||
Format string
|
||||
Level astiav.LogLevel
|
||||
Msg string
|
||||
Parent string
|
||||
}
|
||||
|
||||
// TODO Process parent and update event's target
|
||||
func HandleLogs(eh *astiencoder.EventHandler) {
|
||||
astiav.SetLogCallback(func(level astiav.LogLevel, fmt, msg, parent string) {
|
||||
// Emit event
|
||||
eh.Emit(astiencoder.Event{
|
||||
Name: EventNameLog,
|
||||
Payload: EventLog{
|
||||
Fmt: fmt,
|
||||
Level: level,
|
||||
Msg: msg,
|
||||
Parent: parent,
|
||||
},
|
||||
func WithLog(lvl astiav.LogLevel) astiencoder.EventHandlerLogOption {
|
||||
return func(h *astiencoder.EventHandler, l *astiencoder.EventLogger) {
|
||||
// Set log level
|
||||
astiav.SetLogLevel(lvl)
|
||||
|
||||
// Set log callback
|
||||
// TODO Process parent and update event's target
|
||||
astiav.SetLogCallback(func(level astiav.LogLevel, fmt, msg, parent string) {
|
||||
// Emit event
|
||||
h.Emit(astiencoder.Event{
|
||||
Name: EventNameLog,
|
||||
Payload: EventLog{
|
||||
Format: fmt,
|
||||
Level: level,
|
||||
Msg: msg,
|
||||
Parent: parent,
|
||||
},
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
type LoggerEventHandlerAdapterOptions struct {
|
||||
IgnoredLogMessages []*regexp.Regexp
|
||||
}
|
||||
|
||||
func LoggerEventHandlerAdapter(o LoggerEventHandlerAdapterOptions, i astikit.StdLogger, h *astiencoder.EventHandler) {
|
||||
h.AddForEventName(EventNameLog, loggerEventHandlerCallback(o, astikit.AdaptStdLogger(i)))
|
||||
}
|
||||
|
||||
func loggerEventHandlerCallback(o LoggerEventHandlerAdapterOptions, l astikit.CompleteLogger) astiencoder.EventCallback {
|
||||
return func(e astiencoder.Event) bool {
|
||||
if v, ok := e.Payload.(EventLog); ok {
|
||||
// Sanitize
|
||||
msg := strings.TrimSpace(v.Msg)
|
||||
if msg == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
// Check ignored messages
|
||||
for _, r := range o.IgnoredLogMessages {
|
||||
if len(r.FindIndex([]byte(msg))) > 0 {
|
||||
// Handle log
|
||||
h.AddForEventName(EventNameLog, func(e astiencoder.Event) bool {
|
||||
if v, ok := e.Payload.(EventLog); ok {
|
||||
// Sanitize
|
||||
format := strings.TrimSpace(v.Format)
|
||||
msg := strings.TrimSpace(v.Msg)
|
||||
if msg == "" {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Add prefix
|
||||
msg = "astilibav: " + msg
|
||||
// Add prefix
|
||||
format = "astilibav: " + format
|
||||
msg = "astilibav: " + msg
|
||||
|
||||
// Add parent
|
||||
if strings.Index(v.Parent, "0x") == 0 {
|
||||
msg += " (" + v.Parent + ")"
|
||||
}
|
||||
|
||||
// Add level
|
||||
switch v.Level {
|
||||
case astiav.LogLevelDebug, astiav.LogLevelVerbose:
|
||||
l.Debug(msg)
|
||||
case astiav.LogLevelInfo:
|
||||
l.Info(msg)
|
||||
case astiav.LogLevelError, astiav.LogLevelFatal, astiav.LogLevelPanic:
|
||||
if v.Level == astiav.LogLevelFatal {
|
||||
msg = "FATAL! " + msg
|
||||
} else if v.Level == astiav.LogLevelPanic {
|
||||
msg = "PANIC! " + msg
|
||||
// Add parent
|
||||
if strings.Index(v.Parent, "0x") == 0 {
|
||||
msg += " (" + v.Parent + ")"
|
||||
}
|
||||
|
||||
// Add level
|
||||
switch v.Level {
|
||||
case astiav.LogLevelDebug, astiav.LogLevelVerbose:
|
||||
l.Debugk(format, msg)
|
||||
case astiav.LogLevelInfo:
|
||||
l.Infok(format, msg)
|
||||
case astiav.LogLevelError, astiav.LogLevelFatal, astiav.LogLevelPanic:
|
||||
if v.Level == astiav.LogLevelFatal {
|
||||
msg = "FATAL! " + msg
|
||||
} else if v.Level == astiav.LogLevelPanic {
|
||||
msg = "PANIC! " + msg
|
||||
}
|
||||
l.Errork(format, msg)
|
||||
case astiav.LogLevelWarning:
|
||||
l.Warnk(format, msg)
|
||||
}
|
||||
l.Error(msg)
|
||||
case astiav.LogLevelWarning:
|
||||
l.Warn(msg)
|
||||
}
|
||||
}
|
||||
return false
|
||||
return false
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,47 +0,0 @@
|
||||
package astilibav
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"regexp"
|
||||
"testing"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/asticode/go-astiencoder"
|
||||
"github.com/asticode/go-astikit"
|
||||
)
|
||||
|
||||
type mockedStdLogger struct{ ss []string }
|
||||
|
||||
func newMockedStdLogger() *mockedStdLogger {
|
||||
return &mockedStdLogger{
|
||||
ss: []string{},
|
||||
}
|
||||
}
|
||||
|
||||
func (l *mockedStdLogger) Fatal(v ...interface{}) { l.Print(v...) }
|
||||
|
||||
func (l *mockedStdLogger) Fatalf(format string, v ...interface{}) { l.Printf(format, v...) }
|
||||
|
||||
func (l *mockedStdLogger) Print(v ...interface{}) { l.ss = append(l.ss, fmt.Sprint(v...)) }
|
||||
|
||||
func (l *mockedStdLogger) Printf(format string, v ...interface{}) {
|
||||
l.ss = append(l.ss, fmt.Sprintf(format, v...))
|
||||
}
|
||||
|
||||
func TestLog(t *testing.T) {
|
||||
l := newMockedStdLogger()
|
||||
c := loggerEventHandlerCallback(LoggerEventHandlerAdapterOptions{
|
||||
IgnoredLogMessages: []*regexp.Regexp{
|
||||
regexp.MustCompile("^test2$"),
|
||||
regexp.MustCompile(`[\w]+_pattern`),
|
||||
},
|
||||
}, astikit.AdaptStdLogger(l))
|
||||
c(astiencoder.Event{Payload: EventLog{Level: astiav.LogLevelInfo, Msg: "test1"}})
|
||||
c(astiencoder.Event{Payload: EventLog{Level: astiav.LogLevelInfo, Msg: "test2"}})
|
||||
c(astiencoder.Event{Payload: EventLog{Level: astiav.LogLevelInfo, Msg: "test3"}})
|
||||
c(astiencoder.Event{Payload: EventLog{Level: astiav.LogLevelInfo, Msg: "test_pattern"}})
|
||||
if e, g := []string{"astilibav: test1", "astilibav: test3"}, l.ss; !reflect.DeepEqual(e, g) {
|
||||
t.Errorf("expected %+v, got %+v", e, g)
|
||||
}
|
||||
}
|
||||
@@ -4,7 +4,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/asticode/go-astiav"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
@@ -33,7 +32,7 @@ func TestPktRestamperStartFromZero(t *testing.T) {
|
||||
pkt.SetPts(ft.inputPts)
|
||||
pkt.SetStreamIndex(ft.streamIdx)
|
||||
r.Restamp(pkt)
|
||||
assert.Equal(t, ft.outputDts, pkt.Dts())
|
||||
assert.Equal(t, ft.outputPts, pkt.Pts())
|
||||
require.Equal(t, ft.outputDts, pkt.Dts())
|
||||
require.Equal(t, ft.outputPts, pkt.Pts())
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user