Add /api/v3/events endpoint

This commit is contained in:
Ingo Oppermann
2023-03-23 11:11:47 +01:00
parent 2330270af2
commit b84fdddd81
12 changed files with 619 additions and 33 deletions

View File

@@ -92,6 +92,7 @@ type api struct {
log struct { log struct {
writer io.Writer writer io.Writer
buffer log.BufferWriter buffer log.BufferWriter
events log.ChannelWriter
logger struct { logger struct {
core log.Logger core log.Logger
main log.Logger main log.Logger
@@ -190,6 +191,7 @@ func (a *api) Reload() error {
} }
buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines) buffer := log.NewBufferWriter(loglevel, cfg.Log.MaxLines)
events := log.NewChannelWriter()
logger = logger.WithOutput(log.NewLevelRewriter( logger = logger.WithOutput(log.NewLevelRewriter(
log.NewMultiWriter( log.NewMultiWriter(
@@ -198,6 +200,7 @@ func (a *api) Reload() error {
cfg.Log.Topics, cfg.Log.Topics,
), ),
buffer, buffer,
events,
), ),
[]log.LevelRewriteRule{ []log.LevelRewriteRule{
// FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level // FFmpeg annoyance, move all warnings about unathorized access to memfs from ffmpeg to debug level
@@ -271,6 +274,7 @@ func (a *api) Reload() error {
a.config.config = cfg a.config.config = cfg
a.log.logger.core = logger a.log.logger.core = logger
a.log.buffer = buffer a.log.buffer = buffer
a.log.events = events
return nil return nil
} }
@@ -1025,6 +1029,7 @@ func (a *api) start() error {
serverConfig := http.Config{ serverConfig := http.Config{
Logger: a.log.logger.main, Logger: a.log.logger.main,
LogBuffer: a.log.buffer, LogBuffer: a.log.buffer,
LogEvents: a.log.events,
Restream: a.restream, Restream: a.restream,
Metrics: a.metrics, Metrics: a.metrics,
Prometheus: a.prom, Prometheus: a.prom,
@@ -1436,6 +1441,7 @@ func (a *api) stop() {
a.state = "idle" a.state = "idle"
logger.Info().Log("Complete") logger.Info().Log("Complete")
logger.Close()
} }
func (a *api) Stop() { func (a *api) Stop() {

View File

@@ -310,6 +310,50 @@ const docTemplate = `{
} }
} }
}, },
"/api/v3/events": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Stream of event of whats happening in the core",
"consumes": [
"application/x-json-stream",
"text/event-stream"
],
"produces": [
"application/x-json-stream",
"text/event-stream"
],
"tags": [
"v16.?.?"
],
"summary": "Stream of events",
"operationId": "events",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Event"
}
}
}
}
},
"/api/v3/fs": { "/api/v3/fs": {
"get": { "get": {
"security": [ "security": [
@@ -2878,6 +2922,44 @@ const docTemplate = `{
} }
} }
}, },
"api.Event": {
"type": "object",
"properties": {
"data": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"event": {
"type": "string"
},
"level": {
"type": "integer"
},
"message": {
"type": "string"
},
"ts": {
"type": "integer",
"format": "int64"
}
}
},
"api.EventFilter": {
"type": "object",
"properties": {
"event": {
"type": "string"
},
"filter": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
"api.FileInfo": { "api.FileInfo": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -303,6 +303,50 @@
} }
} }
}, },
"/api/v3/events": {
"post": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "Stream of event of whats happening in the core",
"consumes": [
"application/x-json-stream",
"text/event-stream"
],
"produces": [
"application/x-json-stream",
"text/event-stream"
],
"tags": [
"v16.?.?"
],
"summary": "Stream of events",
"operationId": "events",
"parameters": [
{
"description": "Event filters",
"name": "filters",
"in": "body",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/api.EventFilter"
}
}
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/api.Event"
}
}
}
}
},
"/api/v3/fs": { "/api/v3/fs": {
"get": { "get": {
"security": [ "security": [
@@ -2871,6 +2915,44 @@
} }
} }
}, },
"api.Event": {
"type": "object",
"properties": {
"data": {
"type": "object",
"additionalProperties": {
"type": "string"
}
},
"event": {
"type": "string"
},
"level": {
"type": "integer"
},
"message": {
"type": "string"
},
"ts": {
"type": "integer",
"format": "int64"
}
}
},
"api.EventFilter": {
"type": "object",
"properties": {
"event": {
"type": "string"
},
"filter": {
"type": "object",
"additionalProperties": {
"type": "string"
}
}
}
},
"api.FileInfo": { "api.FileInfo": {
"type": "object", "type": "object",
"properties": { "properties": {

View File

@@ -439,6 +439,31 @@ definitions:
message: message:
type: string type: string
type: object type: object
api.Event:
properties:
data:
additionalProperties:
type: string
type: object
event:
type: string
level:
type: integer
message:
type: string
ts:
format: int64
type: integer
type: object
api.EventFilter:
properties:
event:
type: string
filter:
additionalProperties:
type: string
type: object
type: object
api.FileInfo: api.FileInfo:
properties: properties:
last_modified: last_modified:
@@ -2166,6 +2191,34 @@ paths:
summary: Reload the currently active configuration summary: Reload the currently active configuration
tags: tags:
- v16.7.2 - v16.7.2
/api/v3/events:
post:
consumes:
- application/x-json-stream
- text/event-stream
description: Stream of event of whats happening in the core
operationId: events
parameters:
- description: Event filters
in: body
name: filters
schema:
items:
$ref: '#/definitions/api.EventFilter'
type: array
produces:
- application/x-json-stream
- text/event-stream
responses:
"200":
description: OK
schema:
$ref: '#/definitions/api.Event'
security:
- ApiKeyAuth: []
summary: Stream of events
tags:
- v16.?.?
/api/v3/fs: /api/v3/fs:
get: get:
description: Listall registered filesystems description: Listall registered filesystems

90
http/api/event.go Normal file
View File

@@ -0,0 +1,90 @@
package api
import (
"encoding/json"
"fmt"
"regexp"
"github.com/datarhei/core/v16/log"
)
type Event struct {
Timestamp int64 `json:"ts" format:"int64"`
Level int `json:"level"`
Component string `json:"event"`
Message string `json:"message"`
Data map[string]string `json:"data"`
}
func (e *Event) Marshal(le *log.Event) {
e.Timestamp = le.Time.UnixMilli()
e.Level = int(le.Level)
e.Component = le.Component
e.Message = le.Message
e.Data = make(map[string]string)
for k, v := range le.Data {
var value string
switch val := v.(type) {
case string:
value = val
case error:
value = val.Error()
default:
if s, ok := v.(fmt.Stringer); ok {
value = s.String()
} else {
if jsonvalue, err := json.Marshal(v); err == nil {
value = string(jsonvalue)
} else {
value = err.Error()
}
}
}
e.Data[k] = value
}
}
func (e *Event) Filter(ef *EventFilter) bool {
if e.Component != ef.Component {
return false
}
for k, r := range ef.filter {
v, ok := e.Data[k]
if !ok {
continue
}
if !r.MatchString(v) {
return false
}
}
return true
}
type EventFilter struct {
Component string `json:"event"`
Filter map[string]string `json:"filter"`
filter map[string]*regexp.Regexp
}
func (ef *EventFilter) compile() error {
ef.filter = make(map[string]*regexp.Regexp)
for k, v := range ef.Filter {
r, err := regexp.Compile(v)
if err != nil {
return err
}
ef.filter[k] = r
}
return nil
}

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"github.com/datarhei/core/v16/restream/app" "github.com/datarhei/core/v16/restream/app"
"github.com/lithammer/shortuuid/v4" "github.com/lithammer/shortuuid/v4"
) )

110
http/handler/api/events.go Normal file
View File

@@ -0,0 +1,110 @@
package api
import (
"encoding/json"
"net/http"
"strings"
"time"
"github.com/datarhei/core/v16/http/api"
"github.com/datarhei/core/v16/log"
"github.com/labstack/echo/v4"
)
// The EventsHandler type provides handler functions for retrieving details
// about the API version and build infos.
type EventsHandler struct {
events log.ChannelWriter
}
// NewEvents returns a new About type
func NewEvents(events log.ChannelWriter) *EventsHandler {
return &EventsHandler{
events: events,
}
}
// Events returns a stream of event
// @Summary Stream of events
// @Description Stream of event of whats happening in the core
// @ID events
// @Tags v16.?.?
// @Accept text/event-stream
// @Accept json-stream
// @Produce text/event-stream
// @Produce json-stream
// @Param filters body []api.EventFilter false "Event filters"
// @Success 200 {object} api.Event
// @Security ApiKeyAuth
// @Router /api/v3/events [post]
func (h *EventsHandler) Events(c echo.Context) error {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
req := c.Request()
contentType := "text/event-stream"
accept := req.Header.Get(echo.HeaderAccept)
if strings.Contains(accept, "application/x-json-stream") {
contentType = "application/x-json-stream"
}
res := c.Response()
res.Header().Set(echo.HeaderContentType, contentType+"; charset=UTF-8")
res.Header().Set(echo.HeaderCacheControl, "no-store")
res.WriteHeader(http.StatusOK)
evts, cancel := h.events.Subscribe()
defer cancel()
enc := json.NewEncoder(res)
enc.SetIndent("", "")
done := make(chan struct{})
event := api.Event{}
if contentType == "text/event-stream" {
res.Write([]byte(":keepalive\n\n"))
res.Flush()
for {
select {
case <-done:
return nil
case <-ticker.C:
res.Write([]byte(":keepalive\n\n"))
res.Flush()
case e := <-evts:
event.Marshal(&e)
res.Write([]byte("event: " + strings.ToLower(event.Component) + "\ndata: "))
if err := enc.Encode(event); err != nil {
close(done)
}
res.Write([]byte("\n"))
res.Flush()
}
}
} else {
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
for {
select {
case <-done:
return nil
case <-ticker.C:
res.Write([]byte("{\"event\": \"keepalive\"}\n"))
res.Flush()
case e := <-evts:
event.Marshal(&e)
if err := enc.Encode(event); err != nil {
close(done)
}
res.Flush()
}
}
}
}

View File

@@ -116,6 +116,13 @@ func (w *sizeWriter) Write(body []byte) (int, error) {
return n, err return n, err
} }
func (w *sizeWriter) Flush() {
flusher, ok := w.ResponseWriter.(http.Flusher)
if ok {
flusher.Flush()
}
}
type sizeReadCloser struct { type sizeReadCloser struct {
io.ReadCloser io.ReadCloser

View File

@@ -114,4 +114,9 @@ func (w *fakeWriter) Write(body []byte) (int, error) {
return n, err return n, err
} }
func (w *fakeWriter) Flush() {} func (w *fakeWriter) Flush() {
flusher, ok := w.ResponseWriter.(http.Flusher)
if ok {
flusher.Flush()
}
}

View File

@@ -76,6 +76,7 @@ var ListenAndServe = http.ListenAndServe
type Config struct { type Config struct {
Logger log.Logger Logger log.Logger
LogBuffer log.BufferWriter LogBuffer log.BufferWriter
LogEvents log.ChannelWriter
Restream restream.Restreamer Restream restream.Restreamer
Metrics monitor.HistoryReader Metrics monitor.HistoryReader
Prometheus prometheus.Reader Prometheus prometheus.Reader
@@ -116,6 +117,7 @@ type server struct {
v3handler struct { v3handler struct {
log *api.LogHandler log *api.LogHandler
events *api.EventsHandler
restream *api.RestreamHandler restream *api.RestreamHandler
playout *api.PlayoutHandler playout *api.PlayoutHandler
rtmp *api.RTMPHandler rtmp *api.RTMPHandler
@@ -228,6 +230,10 @@ func NewServer(config Config) (Server, error) {
config.LogBuffer, config.LogBuffer,
) )
s.v3handler.events = api.NewEvents(
config.LogEvents,
)
if config.Restream != nil { if config.Restream != nil {
s.v3handler.restream = api.NewRestream( s.v3handler.restream = api.NewRestream(
config.Restream, config.Restream,
@@ -649,9 +655,18 @@ func (s *server) setRoutesV3(v3 *echo.Group) {
} }
// v3 Log // v3 Log
v3.GET("/log", s.v3handler.log.Log) if s.v3handler.log != nil {
v3.GET("/log", s.v3handler.log.Log)
}
// v3 Metrics // v3 Metrics
v3.GET("/metrics", s.v3handler.resources.Describe) if s.v3handler.resources != nil {
v3.POST("/metrics", s.v3handler.resources.Metrics) v3.GET("/metrics", s.v3handler.resources.Describe)
v3.POST("/metrics", s.v3handler.resources.Metrics)
}
// v3 Events
if s.v3handler.events != nil {
v3.POST("/events", s.v3handler.events.Events)
}
} }

View File

@@ -97,6 +97,8 @@ type Logger interface {
// Write implements the io.Writer interface such that it can be used in e.g. the // Write implements the io.Writer interface such that it can be used in e.g. the
// the log/Logger facility. Messages will be printed with debug level. // the log/Logger facility. Messages will be printed with debug level.
Write(p []byte) (int, error) Write(p []byte) (int, error)
Close()
} }
// logger is an implementation of the Logger interface. // logger is an implementation of the Logger interface.
@@ -114,6 +116,10 @@ func New(component string) Logger {
return l return l
} }
func (l *logger) Close() {
l.output.Close()
}
func (l *logger) clone() *logger { func (l *logger) clone() *logger {
clone := &logger{ clone := &logger{
output: l.output, output: l.output,
@@ -199,6 +205,10 @@ func newEvent(l *logger) Logger {
return e return e
} }
func (e *Event) Close() {
e.logger.Close()
}
func (e *Event) WithOutput(w Writer) Logger { func (e *Event) WithOutput(w Writer) Logger {
return e.logger.WithOutput(w) return e.logger.WithOutput(w)
} }
@@ -342,13 +352,3 @@ func (l *Event) Write(p []byte) (int, error) {
return len(p), nil return len(p), nil
} }
type Eventx struct {
Time time.Time `json:"ts"`
Level Level `json:"level"`
Component string `json:"component"`
Reference string `json:"ref"`
Message string `json:"message"`
Caller string `json:"caller"`
Detail interface{} `json:"detail"`
}

View File

@@ -2,17 +2,21 @@ package log
import ( import (
"container/ring" "container/ring"
"context"
"fmt"
"io" "io"
"os" "os"
"regexp" "regexp"
"strings" "strings"
"sync" "sync"
"github.com/lithammer/shortuuid/v4"
"github.com/mattn/go-isatty" "github.com/mattn/go-isatty"
) )
type Writer interface { type Writer interface {
Write(e *Event) error Write(e *Event) error
Close()
} }
type jsonWriter struct { type jsonWriter struct {
@@ -41,6 +45,8 @@ func (w *jsonWriter) Write(e *Event) error {
return err return err
} }
func (w *jsonWriter) Close() {}
type consoleWriter struct { type consoleWriter struct {
writer io.Writer writer io.Writer
level Level level Level
@@ -80,6 +86,8 @@ func (w *consoleWriter) Write(e *Event) error {
return err return err
} }
func (w *consoleWriter) Close() {}
type topicWriter struct { type topicWriter struct {
writer Writer writer Writer
topics map[string]struct{} topics map[string]struct{}
@@ -112,6 +120,10 @@ func (w *topicWriter) Write(e *Event) error {
return err return err
} }
func (w *topicWriter) Close() {
w.writer.Close()
}
type levelRewriter struct { type levelRewriter struct {
writer Writer writer Writer
rules []levelRewriteRule rules []levelRewriteRule
@@ -182,6 +194,10 @@ rules:
return w.writer.Write(e) return w.writer.Write(e)
} }
func (w *levelRewriter) Close() {
w.writer.Close()
}
type syncWriter struct { type syncWriter struct {
mu sync.Mutex mu sync.Mutex
writer Writer writer Writer
@@ -193,11 +209,18 @@ func NewSyncWriter(writer Writer) Writer {
} }
} }
func (s *syncWriter) Write(e *Event) error { func (w *syncWriter) Write(e *Event) error {
s.mu.Lock() w.mu.Lock()
defer s.mu.Unlock() defer w.mu.Unlock()
return s.writer.Write(e) return w.writer.Write(e)
}
func (w *syncWriter) Close() {
w.mu.Lock()
defer w.mu.Unlock()
w.writer.Close()
} }
type multiWriter struct { type multiWriter struct {
@@ -212,9 +235,9 @@ func NewMultiWriter(writer ...Writer) Writer {
return mw return mw
} }
func (m *multiWriter) Write(e *Event) error { func (w *multiWriter) Write(e *Event) error {
for _, w := range m.writer { for _, writer := range w.writer {
if err := w.Write(e); err != nil { if err := writer.Write(e); err != nil {
return err return err
} }
} }
@@ -222,6 +245,12 @@ func (m *multiWriter) Write(e *Event) error {
return nil return nil
} }
func (w *multiWriter) Close() {
for _, writer := range w.writer {
writer.Close()
}
}
type BufferWriter interface { type BufferWriter interface {
Writer Writer
Events() []*Event Events() []*Event
@@ -245,33 +274,40 @@ func NewBufferWriter(level Level, lines int) BufferWriter {
return b return b
} }
func (b *bufferWriter) Write(e *Event) error { func (w *bufferWriter) Write(e *Event) error {
if b.level < e.Level || e.Level == Lsilent { if w.level < e.Level || e.Level == Lsilent {
return nil return nil
} }
b.lock.Lock() w.lock.Lock()
defer b.lock.Unlock() defer w.lock.Unlock()
if b.lines != nil { if w.lines != nil {
b.lines.Value = e.clone() w.lines.Value = e.clone()
b.lines = b.lines.Next() w.lines = w.lines.Next()
} }
return nil return nil
} }
func (b *bufferWriter) Events() []*Event { func (w *bufferWriter) Close() {
w.lock.Lock()
defer w.lock.Unlock()
w.lines = nil
}
func (w *bufferWriter) Events() []*Event {
var lines = []*Event{} var lines = []*Event{}
if b.lines == nil { if w.lines == nil {
return lines return lines
} }
b.lock.RLock() w.lock.RLock()
defer b.lock.RUnlock() defer w.lock.RUnlock()
b.lines.Do(func(l interface{}) { w.lines.Do(func(l interface{}) {
if l == nil { if l == nil {
return return
} }
@@ -281,3 +317,102 @@ func (b *bufferWriter) Events() []*Event {
return lines return lines
} }
type ChannelWriter interface {
Writer
Subscribe() (<-chan Event, func())
}
type channelWriter struct {
publisher chan Event
ctx context.Context
cancel context.CancelFunc
subscriber map[string]chan Event
subscriberLock sync.Mutex
}
func NewChannelWriter() ChannelWriter {
w := &channelWriter{
publisher: make(chan Event, 1024),
subscriber: make(map[string]chan Event),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
go w.broadcast()
return w
}
func (w *channelWriter) Write(e *Event) error {
event := e.clone()
event.logger = nil
select {
case w.publisher <- *e:
default:
return fmt.Errorf("publisher queue full")
}
return nil
}
func (w *channelWriter) Close() {
w.cancel()
close(w.publisher)
w.subscriberLock.Lock()
for _, c := range w.subscriber {
close(c)
}
w.subscriber = make(map[string]chan Event)
w.subscriberLock.Unlock()
}
func (w *channelWriter) Subscribe() (<-chan Event, func()) {
l := make(chan Event, 1024)
var id string = ""
w.subscriberLock.Lock()
for {
id = shortuuid.New()
if _, ok := w.subscriber[id]; !ok {
w.subscriber[id] = l
break
}
}
w.subscriberLock.Unlock()
unsubscribe := func() {
w.subscriberLock.Lock()
delete(w.subscriber, id)
w.subscriberLock.Unlock()
}
return l, unsubscribe
}
func (w *channelWriter) broadcast() {
for {
select {
case <-w.ctx.Done():
return
case e := <-w.publisher:
w.subscriberLock.Lock()
for _, c := range w.subscriber {
pp := e.clone()
select {
case c <- *pp:
default:
}
}
w.subscriberLock.Unlock()
}
}
}