diff --git a/app/api/api.go b/app/api/api.go index ad0291be..e523a08f 100644 --- a/app/api/api.go +++ b/app/api/api.go @@ -92,6 +92,7 @@ type api struct { log struct { writer io.Writer buffer log.BufferWriter + events log.ChannelWriter logger struct { core log.Logger main log.Logger @@ -190,6 +191,7 @@ func (a *api) Reload() error { } buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines) + events := log.NewChannelWriter() logger = logger.WithOutput(log.NewLevelRewriter( log.NewMultiWriter( @@ -198,6 +200,7 @@ func (a *api) Reload() error { cfg.Log.Topics, ), buffer, + events, ), []log.LevelRewriteRule{ // FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level @@ -271,6 +274,7 @@ func (a *api) Reload() error { a.config.config = cfg a.log.logger.core = logger a.log.buffer = buffer + a.log.events = events return nil } @@ -1025,6 +1029,7 @@ func (a *api) start() error { serverConfig := http.Config{ Logger: a.log.logger.main, LogBuffer: a.log.buffer, + LogEvents: a.log.events, Restream: a.restream, Metrics: a.metrics, Prometheus: a.prom, @@ -1436,6 +1441,7 @@ func (a *api) stop() { a.state = "idle" logger.Info().Log("Complete") + logger.Close() } func (a *api) Stop() { diff --git a/docs/docs.go b/docs/docs.go index 3ab93859..3f3ba947 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -310,6 +310,50 @@ const docTemplate = `{ } } }, + "/api/v3/events": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Stream of event of whats happening in the core", + "consumes": [ + "application/x-json-stream", + "text/event-stream" + ], + "produces": [ + "application/x-json-stream", + "text/event-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Stream of events", + "operationId": "events", + "parameters": [ + { + "description": "Event filters", + "name": "filters", + "in": "body", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.EventFilter" + } + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Event" + } + } + } + } + }, "/api/v3/fs": { "get": { "security": [ @@ -2878,6 +2922,44 @@ const docTemplate = `{ } } }, + "api.Event": { + "type": "object", + "properties": { + "data": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "event": { + "type": "string" + }, + "level": { + "type": "integer" + }, + "message": { + "type": "string" + }, + "ts": { + "type": "integer", + "format": "int64" + } + } + }, + "api.EventFilter": { + "type": "object", + "properties": { + "event": { + "type": "string" + }, + "filter": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + }, "api.FileInfo": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 4dfc2a69..6d5c56ee 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -303,6 +303,50 @@ } } }, + "/api/v3/events": { + "post": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "Stream of event of whats happening in the core", + "consumes": [ + "application/x-json-stream", + "text/event-stream" + ], + "produces": [ + "application/x-json-stream", + "text/event-stream" + ], + "tags": [ + "v16.?.?" + ], + "summary": "Stream of events", + "operationId": "events", + "parameters": [ + { + "description": "Event filters", + "name": "filters", + "in": "body", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/api.EventFilter" + } + } + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/api.Event" + } + } + } + } + }, "/api/v3/fs": { "get": { "security": [ @@ -2871,6 +2915,44 @@ } } }, + "api.Event": { + "type": "object", + "properties": { + "data": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "event": { + "type": "string" + }, + "level": { + "type": "integer" + }, + "message": { + "type": "string" + }, + "ts": { + "type": "integer", + "format": "int64" + } + } + }, + "api.EventFilter": { + "type": "object", + "properties": { + "event": { + "type": "string" + }, + "filter": { + "type": "object", + "additionalProperties": { + "type": "string" + } + } + } + }, "api.FileInfo": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index ec307640..85c1ff26 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -439,6 +439,31 @@ definitions: message: type: string type: object + api.Event: + properties: + data: + additionalProperties: + type: string + type: object + event: + type: string + level: + type: integer + message: + type: string + ts: + format: int64 + type: integer + type: object + api.EventFilter: + properties: + event: + type: string + filter: + additionalProperties: + type: string + type: object + type: object api.FileInfo: properties: last_modified: @@ -2166,6 +2191,34 @@ paths: summary: Reload the currently active configuration tags: - v16.7.2 + /api/v3/events: + post: + consumes: + - application/x-json-stream + - text/event-stream + description: Stream of event of whats happening in the core + operationId: events + parameters: + - description: Event filters + in: body + name: filters + schema: + items: + $ref: '#/definitions/api.EventFilter' + type: array + produces: + - application/x-json-stream + - text/event-stream + responses: + "200": + description: OK + schema: + $ref: '#/definitions/api.Event' + security: + - ApiKeyAuth: [] + summary: Stream of events + tags: + - v16.?.? /api/v3/fs: get: description: Listall registered filesystems diff --git a/http/api/event.go b/http/api/event.go new file mode 100644 index 00000000..433a1d52 --- /dev/null +++ b/http/api/event.go @@ -0,0 +1,90 @@ +package api + +import ( + "encoding/json" + "fmt" + "regexp" + + "github.com/datarhei/core/v16/log" +) + +type Event struct { + Timestamp int64 `json:"ts" format:"int64"` + Level int `json:"level"` + Component string `json:"event"` + Message string `json:"message"` + + Data map[string]string `json:"data"` +} + +func (e *Event) Marshal(le *log.Event) { + e.Timestamp = le.Time.UnixMilli() + e.Level = int(le.Level) + e.Component = le.Component + e.Message = le.Message + + e.Data = make(map[string]string) + + for k, v := range le.Data { + var value string + + switch val := v.(type) { + case string: + value = val + case error: + value = val.Error() + default: + if s, ok := v.(fmt.Stringer); ok { + value = s.String() + } else { + if jsonvalue, err := json.Marshal(v); err == nil { + value = string(jsonvalue) + } else { + value = err.Error() + } + } + } + + e.Data[k] = value + } +} + +func (e *Event) Filter(ef *EventFilter) bool { + if e.Component != ef.Component { + return false + } + + for k, r := range ef.filter { + v, ok := e.Data[k] + if !ok { + continue + } + + if !r.MatchString(v) { + return false + } + } + + return true +} + +type EventFilter struct { + Component string `json:"event"` + Filter map[string]string `json:"filter"` + filter map[string]*regexp.Regexp +} + +func (ef *EventFilter) compile() error { + ef.filter = make(map[string]*regexp.Regexp) + + for k, v := range ef.Filter { + r, err := regexp.Compile(v) + if err != nil { + return err + } + + ef.filter[k] = r + } + + return nil +} diff --git a/http/api/process.go b/http/api/process.go index 391b968a..5db7eb20 100644 --- a/http/api/process.go +++ b/http/api/process.go @@ -4,6 +4,7 @@ import ( "encoding/json" "github.com/datarhei/core/v16/restream/app" + "github.com/lithammer/shortuuid/v4" ) diff --git a/http/handler/api/events.go b/http/handler/api/events.go new file mode 100644 index 00000000..c0a514b2 --- /dev/null +++ b/http/handler/api/events.go @@ -0,0 +1,110 @@ +package api + +import ( + "encoding/json" + "net/http" + "strings" + "time" + + "github.com/datarhei/core/v16/http/api" + "github.com/datarhei/core/v16/log" + + "github.com/labstack/echo/v4" +) + +// The EventsHandler type provides handler functions for retrieving details +// about the API version and build infos. +type EventsHandler struct { + events log.ChannelWriter +} + +// NewEvents returns a new About type +func NewEvents(events log.ChannelWriter) *EventsHandler { + return &EventsHandler{ + events: events, + } +} + +// Events returns a stream of event +// @Summary Stream of events +// @Description Stream of event of whats happening in the core +// @ID events +// @Tags v16.?.? +// @Accept text/event-stream +// @Accept json-stream +// @Produce text/event-stream +// @Produce json-stream +// @Param filters body []api.EventFilter false "Event filters" +// @Success 200 {object} api.Event +// @Security ApiKeyAuth +// @Router /api/v3/events [post] +func (h *EventsHandler) Events(c echo.Context) error { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + req := c.Request() + + contentType := "text/event-stream" + accept := req.Header.Get(echo.HeaderAccept) + if strings.Contains(accept, "application/x-json-stream") { + contentType = "application/x-json-stream" + } + + res := c.Response() + + res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8") + res.Header().Set(echo.HeaderCacheControl, "no-store") + res.WriteHeader(http.StatusOK) + + evts, cancel := h.events.Subscribe() + defer cancel() + + enc := json.NewEncoder(res) + enc.SetIndent("", "") + + done := make(chan struct{}) + + event := api.Event{} + + if contentType == "text/event-stream" { + res.Write([]byte(":keepalive\n\n")) + res.Flush() + + for { + select { + case <-done: + return nil + case <-ticker.C: + res.Write([]byte(":keepalive\n\n")) + res.Flush() + case e := <-evts: + event.Marshal(&e) + res.Write([]byte("event: " + strings.ToLower(event.Component) + "\ndata: ")) + if err := enc.Encode(event); err != nil { + close(done) + } + res.Write([]byte("\n")) + res.Flush() + } + } + } else { + res.Write([]byte("{\"event\": \"keepalive\"}\n")) + res.Flush() + + for { + select { + case <-done: + return nil + case <-ticker.C: + res.Write([]byte("{\"event\": \"keepalive\"}\n")) + res.Flush() + case e := <-evts: + event.Marshal(&e) + if err := enc.Encode(event); err != nil { + close(done) + } + res.Flush() + } + } + } +} diff --git a/http/middleware/log/log.go b/http/middleware/log/log.go index ef8eb5ce..a07090c4 100644 --- a/http/middleware/log/log.go +++ b/http/middleware/log/log.go @@ -116,6 +116,13 @@ func (w *sizeWriter) Write(body []byte) (int, error) { return n, err } +func (w *sizeWriter) Flush() { + flusher, ok := w.ResponseWriter.(http.Flusher) + if ok { + flusher.Flush() + } +} + type sizeReadCloser struct { io.ReadCloser diff --git a/http/middleware/session/HTTP.go b/http/middleware/session/HTTP.go index 2de3097e..17142066 100644 --- a/http/middleware/session/HTTP.go +++ b/http/middleware/session/HTTP.go @@ -114,4 +114,9 @@ func (w *fakeWriter) Write(body []byte) (int, error) { return n, err } -func (w *fakeWriter) Flush() {} +func (w *fakeWriter) Flush() { + flusher, ok := w.ResponseWriter.(http.Flusher) + if ok { + flusher.Flush() + } +} diff --git a/http/server.go b/http/server.go index 007368de..ae02f26e 100644 --- a/http/server.go +++ b/http/server.go @@ -76,6 +76,7 @@ var ListenAndServe = http.ListenAndServe type Config struct { Logger log.Logger LogBuffer log.BufferWriter + LogEvents log.ChannelWriter Restream restream.Restreamer Metrics monitor.HistoryReader Prometheus prometheus.Reader @@ -116,6 +117,7 @@ type server struct { v3handler struct { log *api.LogHandler + events *api.EventsHandler restream *api.RestreamHandler playout *api.PlayoutHandler rtmp *api.RTMPHandler @@ -228,6 +230,10 @@ func NewServer(config Config) (Server, error) { config.LogBuffer, ) + s.v3handler.events = api.NewEvents( + config.LogEvents, + ) + if config.Restream != nil { s.v3handler.restream = api.NewRestream( config.Restream, @@ -649,9 +655,18 @@ func (s *server) setRoutesV3(v3 *echo.Group) { } // v3 Log - v3.GET("/log", s.v3handler.log.Log) + if s.v3handler.log != nil { + v3.GET("/log", s.v3handler.log.Log) + } // v3 Metrics - v3.GET("/metrics", s.v3handler.resources.Describe) - v3.POST("/metrics", s.v3handler.resources.Metrics) + if s.v3handler.resources != nil { + v3.GET("/metrics", s.v3handler.resources.Describe) + v3.POST("/metrics", s.v3handler.resources.Metrics) + } + + // v3 Events + if s.v3handler.events != nil { + v3.POST("/events", s.v3handler.events.Events) + } } diff --git a/log/log.go b/log/log.go index 14a78e2c..db5e03b2 100644 --- a/log/log.go +++ b/log/log.go @@ -97,6 +97,8 @@ type Logger interface { // Write implements the io.Writer interface such that it can be used in e.g. the // the log/Logger facility. Messages will be printed with debug level. Write(p []byte) (int, error) + + Close() } // logger is an implementation of the Logger interface. @@ -114,6 +116,10 @@ func New(component string) Logger { return l } +func (l *logger) Close() { + l.output.Close() +} + func (l *logger) clone() *logger { clone := &logger{ output: l.output, @@ -199,6 +205,10 @@ func newEvent(l *logger) Logger { return e } +func (e *Event) Close() { + e.logger.Close() +} + func (e *Event) WithOutput(w Writer) Logger { return e.logger.WithOutput(w) } @@ -342,13 +352,3 @@ func (l *Event) Write(p []byte) (int, error) { return len(p), nil } - -type Eventx struct { - Time time.Time `json:"ts"` - Level Level `json:"level"` - Component string `json:"component"` - Reference string `json:"ref"` - Message string `json:"message"` - Caller string `json:"caller"` - Detail interface{} `json:"detail"` -} diff --git a/log/writer.go b/log/writer.go index f04da26d..0727989e 100644 --- a/log/writer.go +++ b/log/writer.go @@ -2,17 +2,21 @@ package log import ( "container/ring" + "context" + "fmt" "io" "os" "regexp" "strings" "sync" + "github.com/lithammer/shortuuid/v4" "github.com/mattn/go-isatty" ) type Writer interface { Write(e *Event) error + Close() } type jsonWriter struct { @@ -41,6 +45,8 @@ func (w *jsonWriter) Write(e *Event) error { return err } +func (w *jsonWriter) Close() {} + type consoleWriter struct { writer io.Writer level Level @@ -80,6 +86,8 @@ func (w *consoleWriter) Write(e *Event) error { return err } +func (w *consoleWriter) Close() {} + type topicWriter struct { writer Writer topics map[string]struct{} @@ -112,6 +120,10 @@ func (w *topicWriter) Write(e *Event) error { return err } +func (w *topicWriter) Close() { + w.writer.Close() +} + type levelRewriter struct { writer Writer rules []levelRewriteRule @@ -182,6 +194,10 @@ rules: return w.writer.Write(e) } +func (w *levelRewriter) Close() { + w.writer.Close() +} + type syncWriter struct { mu sync.Mutex writer Writer @@ -193,11 +209,18 @@ func NewSyncWriter(writer Writer) Writer { } } -func (s *syncWriter) Write(e *Event) error { - s.mu.Lock() - defer s.mu.Unlock() +func (w *syncWriter) Write(e *Event) error { + w.mu.Lock() + defer w.mu.Unlock() - return s.writer.Write(e) + return w.writer.Write(e) +} + +func (w *syncWriter) Close() { + w.mu.Lock() + defer w.mu.Unlock() + + w.writer.Close() } type multiWriter struct { @@ -212,9 +235,9 @@ func NewMultiWriter(writer ...Writer) Writer { return mw } -func (m *multiWriter) Write(e *Event) error { - for _, w := range m.writer { - if err := w.Write(e); err != nil { +func (w *multiWriter) Write(e *Event) error { + for _, writer := range w.writer { + if err := writer.Write(e); err != nil { return err } } @@ -222,6 +245,12 @@ func (m *multiWriter) Write(e *Event) error { return nil } +func (w *multiWriter) Close() { + for _, writer := range w.writer { + writer.Close() + } +} + type BufferWriter interface { Writer Events() []*Event @@ -245,33 +274,40 @@ func NewBufferWriter(level Level, lines int) BufferWriter { return b } -func (b *bufferWriter) Write(e *Event) error { - if b.level < e.Level || e.Level == Lsilent { +func (w *bufferWriter) Write(e *Event) error { + if w.level < e.Level || e.Level == Lsilent { return nil } - b.lock.Lock() - defer b.lock.Unlock() + w.lock.Lock() + defer w.lock.Unlock() - if b.lines != nil { - b.lines.Value = e.clone() - b.lines = b.lines.Next() + if w.lines != nil { + w.lines.Value = e.clone() + w.lines = w.lines.Next() } return nil } -func (b *bufferWriter) Events() []*Event { +func (w *bufferWriter) Close() { + w.lock.Lock() + defer w.lock.Unlock() + + w.lines = nil +} + +func (w *bufferWriter) Events() []*Event { var lines = []*Event{} - if b.lines == nil { + if w.lines == nil { return lines } - b.lock.RLock() - defer b.lock.RUnlock() + w.lock.RLock() + defer w.lock.RUnlock() - b.lines.Do(func(l interface{}) { + w.lines.Do(func(l interface{}) { if l == nil { return } @@ -281,3 +317,102 @@ func (b *bufferWriter) Events() []*Event { return lines } + +type ChannelWriter interface { + Writer + + Subscribe() (<-chan Event, func()) +} + +type channelWriter struct { + publisher chan Event + + ctx context.Context + cancel context.CancelFunc + + subscriber map[string]chan Event + subscriberLock sync.Mutex +} + +func NewChannelWriter() ChannelWriter { + w := &channelWriter{ + publisher: make(chan Event, 1024), + subscriber: make(map[string]chan Event), + } + + w.ctx, w.cancel = context.WithCancel(context.Background()) + + go w.broadcast() + + return w +} + +func (w *channelWriter) Write(e *Event) error { + event := e.clone() + event.logger = nil + + select { + case w.publisher <- *e: + default: + return fmt.Errorf("publisher queue full") + } + + return nil +} + +func (w *channelWriter) Close() { + w.cancel() + + close(w.publisher) + + w.subscriberLock.Lock() + for _, c := range w.subscriber { + close(c) + } + w.subscriber = make(map[string]chan Event) + w.subscriberLock.Unlock() +} + +func (w *channelWriter) Subscribe() (<-chan Event, func()) { + l := make(chan Event, 1024) + + var id string = "" + + w.subscriberLock.Lock() + for { + id = shortuuid.New() + if _, ok := w.subscriber[id]; !ok { + w.subscriber[id] = l + break + } + } + w.subscriberLock.Unlock() + + unsubscribe := func() { + w.subscriberLock.Lock() + delete(w.subscriber, id) + w.subscriberLock.Unlock() + } + + return l, unsubscribe +} + +func (w *channelWriter) broadcast() { + for { + select { + case <-w.ctx.Done(): + return + case e := <-w.publisher: + w.subscriberLock.Lock() + for _, c := range w.subscriber { + pp := e.clone() + + select { + case c <- *pp: + default: + } + } + w.subscriberLock.Unlock() + } + } +}