Recording playback is now handled by JS

This commit is contained in:
Quentin Renard
2020-06-05 19:27:49 +02:00
parent 3238724a1c
commit 107cd34aa3
5 changed files with 182 additions and 340 deletions

288
server.go
View File

@@ -2,13 +2,13 @@ package astiencoder
import (
"context"
"encoding/base64"
"encoding/csv"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"os"
"path/filepath"
@@ -24,7 +24,6 @@ import (
type Server struct {
l astikit.SeverityLogger
p *serverPlayback
r *serverRecording
w *Workflow
ws *astiws.Manager
@@ -33,7 +32,6 @@ type Server struct {
func NewServer(l astikit.StdLogger) (s *Server) {
s = &Server{
l: astikit.AdaptStdLogger(l),
p: newServerPlayback(),
ws: astiws.NewManager(astiws.ManagerConfiguration{MaxMessageSize: 8192}, l),
}
s.r = newServerRecording(s.l)
@@ -51,9 +49,6 @@ func (s *Server) Handler() http.Handler {
// Add routes
r.Handler(http.MethodGet, "/", s.serveHomepage())
r.Handler(http.MethodGet, "/ok", s.serveOK())
r.Handler(http.MethodPost, "/playback/load", s.servePlaybackLoad())
r.Handler(http.MethodGet, "/playback/next", s.servePlaybackNext())
r.Handler(http.MethodGet, "/playback/unload", s.servePlaybackUnload())
r.Handler(http.MethodGet, "/recording/export", s.serveRecordingExport())
r.Handler(http.MethodGet, "/recording/start", s.serveRecordingStart())
r.Handler(http.MethodGet, "/recording/stop", s.serveRecordingStop())
@@ -132,10 +127,8 @@ func (s *Server) EventHandlerAdapter(eh *EventHandler) {
s.l.Error(fmt.Errorf("astiencoder: adding to recording failed: %w", err))
}
// Send if playback not loaded
if s.p.r == nil {
s.sendWebSocket(e.Name, p)
}
// Send
s.sendWebSocket(e.Name, p)
return false
})
}
@@ -256,16 +249,6 @@ type ServerWelcome struct {
func (s *Server) serveWelcome() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// Playback is loaded
if s.p.r != nil {
// Unload playback
if err := s.p.unload(); err != nil {
s.l.Error(fmt.Errorf("astiencoder: unloading playback failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
}
// Create body
b := ServerWelcome{Recording: s.r.w != nil}
@@ -347,16 +330,15 @@ func (r *serverRecording) start(dst string, sw ServerWorkflow, onDone func(path
// Create csv writer
r.w = csv.NewWriter(f)
// Marshal server workflow
var b []byte
if b, err = json.Marshal(sw); err != nil {
err = fmt.Errorf("astiencoder: marshaling failed: %w", err)
// Update started
atomic.StoreUint32(&r.s, 1)
// Add init
if err = r.add("init", sw); err != nil {
err = fmt.Errorf("astiencoder: adding init failed: %w", err)
return
}
// Write server workflow
r.w.Write([]string{"", "", string(b)})
// Execute the rest in a goroutine
go func() {
// Start chan
@@ -385,9 +367,6 @@ func (r *serverRecording) start(dst string, sw ServerWorkflow, onDone func(path
}
}
}()
// Update started
atomic.StoreUint32(&r.s, 1)
return
}
@@ -406,7 +385,7 @@ func (r *serverRecording) add(name string, payload interface{}) (err error) {
// Write
r.c.Add(func() {
r.w.Write([]string{strconv.Itoa(int(time.Now().UTC().Unix())), name, string(b)})
r.w.Write([]string{strconv.Itoa(int(time.Now().UTC().Unix())), name, base64.StdEncoding.EncodeToString(b)})
r.w.Flush()
})
return
@@ -495,250 +474,3 @@ func (r *serverRecording) export(rw http.ResponseWriter, w *Workflow) (err error
}
return
}
type serverPlayback struct {
b []serverPlaybackItem
f *os.File
r *csv.Reader
}
type serverPlaybackItem struct {
name string
payload []byte
time time.Time
}
func newServerPlayback() *serverPlayback {
return &serverPlayback{}
}
func (p *serverPlayback) load(req *http.Request) (payload []byte, err error) {
// Playbac already loaded
if p.r != nil {
return
}
// Get request file
var f multipart.File
var h *multipart.FileHeader
if f, h, err = req.FormFile("file"); err != nil {
if err != http.ErrMissingFile {
err = fmt.Errorf("astiencoder: getting request file failed: %w", err)
} else {
err = fmt.Errorf("astiencoder: request file is missing")
}
return
}
// Invalid content type
if c := h.Header.Get("Content-Type"); c != "text/csv" {
err = fmt.Errorf("astiencoder: invalid content type %s", c)
return
}
// Create file
if p.f, err = ioutil.TempFile("", "astiencoder*.csv"); err != nil {
err = fmt.Errorf("astiencoder: creating file failed: %w", err)
return
}
// Copy
if _, err = io.Copy(p.f, f); err != nil {
err = fmt.Errorf("astiencoder: copying failed: %w", err)
return
}
// Rewind
if _, err = p.f.Seek(0, 0); err != nil {
err = fmt.Errorf("astiencoder: seeking failed: %w", err)
return
}
// Create csv reader
p.r = csv.NewReader(p.f)
// Read
var line []string
if line, err = p.r.Read(); err != nil {
err = fmt.Errorf("astiencoder: reading failed: %w", err)
return
}
// Not enough columns
if len(line) < 3 {
err = fmt.Errorf("astiencoder: line have only %d columns", len(line))
return
}
// Parse payload
payload = []byte(line[2])
return
}
func (p *serverPlayback) next() (i serverPlaybackItem, err error) {
// Playback not loaded
if p.r == nil {
return
}
// Check buffer first
if len(p.b) > 0 {
i = p.b[0]
p.b = p.b[1:]
return
}
// Read
var line []string
if line, err = p.r.Read(); err != nil {
err = fmt.Errorf("astiencoder: reading failed: %w", err)
return
}
// Not enough columns
if len(line) < 3 {
err = fmt.Errorf("astiencoder: line have only %d columns", len(line))
return
}
// Parse time
var ti int
if ti, err = strconv.Atoi(line[0]); err != nil {
err = fmt.Errorf("astiencoder: atoi of %s failed: %w", line[0], err)
return
}
i.time = time.Unix(int64(ti), 0)
// Parse name and payload
i.name = line[1]
i.payload = []byte(line[2])
return
}
func (p *serverPlayback) unload() (err error) {
// Playback not loaded
if p.r == nil {
return
}
// Close file
if err = p.f.Close(); err != nil {
err = fmt.Errorf("astiencoder: closing previous file failed: %w", err)
return
}
// Remove file
if err = os.Remove(p.f.Name()); err != nil {
err = fmt.Errorf("astiencoder: removing previous file failed: %w", err)
return
}
// Reset
p.f = nil
p.r = nil
return
}
func (s *Server) servePlaybackLoad() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// Load
payload, err := s.p.load(r)
if err != nil {
s.l.Error(fmt.Errorf("astiencoder: loading playback failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
// Write
if _, err = rw.Write(payload); err != nil {
s.l.Error(fmt.Errorf("astiencoder: writing failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
})
}
type ServerPlaybackItems struct {
Done bool `json:"done"`
Items []ServerPlaybackItem `json:"items"`
}
type ServerPlaybackItem struct {
Name string `json:"name"`
Payload json.RawMessage `json:"payload"`
}
func newServerPlaybackItem(i serverPlaybackItem) ServerPlaybackItem {
return ServerPlaybackItem{
Name: i.name,
Payload: i.payload,
}
}
func (s *Server) servePlaybackNext() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// Create body
b := ServerPlaybackItems{
Items: []ServerPlaybackItem{},
}
// Loop
var firstAt time.Time
for {
// Next
i, err := s.p.next()
if err != nil {
if errors.Is(err, io.EOF) {
b.Done = true
break
} else {
s.l.Error(fmt.Errorf("astiencoder: next failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
}
// Create first at
if firstAt.IsZero() {
firstAt = i.time
}
// Event is too old
if i.time.Sub(firstAt) > 500*time.Millisecond {
// Append in buffer
s.p.b = append(s.p.b, i)
break
}
// Append event
b.Items = append(b.Items, newServerPlaybackItem(i))
}
// Write
if err := json.NewEncoder(rw).Encode(b); err != nil {
s.l.Error(fmt.Errorf("astiencoder: writing failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
})
}
func (s *Server) servePlaybackUnload() http.Handler {
return http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
// Unload
if err := s.p.unload(); err != nil {
s.l.Error(fmt.Errorf("astiencoder: unloading playback failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
// Write
if s.w != nil {
if err := json.NewEncoder(rw).Encode(s.newServerWorkflow()); err != nil {
s.l.Error(fmt.Errorf("astiencoder: writing failed: %w", err))
rw.WriteHeader(http.StatusInternalServerError)
return
}
}
})
}

File diff suppressed because one or more lines are too long

View File

@@ -156,6 +156,9 @@ footer {
}
footer > div:first-child {
align-items: center;
display: flex;
flex-direction: column;
width: 150px;
}
@@ -179,24 +182,38 @@ footer > div:first-child .fa {
}
footer.recording-started #recording-start, footer #recording-stop, footer #playback-unload,
footer.playback-loaded #playback-load, footer #playback-load input, footer #playback-next,
footer.playback-loaded.playback-done #playback-next, footer.recording-disabled #recording-start,
footer.playback-loaded #playback-load, footer #playback-load input, footer.recording-disabled #recording-start,
footer.recording-disabled #recording-stop {
display: none;
}
footer.recording-started #recording-stop, footer.playback-loaded #playback-unload, footer.playback-loaded #playback-next {
footer.recording-started #recording-stop, footer.playback-loaded #playback-unload {
display: flex;
}
footer > div:last-child {
display: flex;
display: none;
flex-direction: column;
flex-grow: 1;
padding-left: 10px;
}
footer.playback-loaded > div:last-child {
display: flex;
}
footer > div:last-child > div {
display: flex;
justify-content: center;
margin-bottom: 10px;
}
footer > div:last-child .fa {
cursor: pointer;
font-size: 20px;
margin: auto;
}
footer > div:last-child input[type="range"] {
width: 100%;
}

View File

@@ -37,7 +37,16 @@
</div>
</div>
<div>
<i id="playback-next" class="fa fa-forward" onclick="astiencoder.onPlaybackNextClick()"></i>
<div>
<i id="playback-previous" class="fa fa-backward" onclick="astiencoder.onPlaybackNextClick()"></i>
<i id="playback-next" class="fa fa-forward" onclick="astiencoder.onPlaybackNextClick()"></i>
</div>
<div>
<input id="progress" min="0" max="100" type="range" disabled/>
</div>
<div>
<span id="time"></span>
</div>
</div>
</footer>

View File

@@ -43,7 +43,8 @@ var astiencoder = {
}.bind(this)
})
},
onmessage (name, payload) {
onmessage (name, payload, force) {
if (this.playback.loaded && !force) return
switch (name) {
case 'astiencoder.node.continued':
this.apply(payload, {status: 'running'})
@@ -65,7 +66,7 @@ var astiencoder = {
onKeyUp (event) {
switch (event.code) {
case 'ArrowRight':
if (this.playback.loaded && !this.playback.done) this.onPlaybackNextClick()
this.onPlaybackNextClick()
break
}
},
@@ -123,7 +124,47 @@ var astiencoder = {
},
/* playback */
playback: new Proxy({}, {
playback: new Proxy({
lines: [],
parse (line) {
// Split line
const items = line.split(',')
// Not enough items
if (items.length !== 3) return false
// Create data
const d = {}
if (items[0] !== '') d.time = new Date(items[0] * 1000)
if (items[1] !== '') d.name = items[1]
if (items[2] !== '') {
const p = atob(items[2])
if (p !== "null") d.payload = JSON.parse(p)
}
return d
},
next () {
// No line left
if (this.lines.length === 0) return false
// Parse last line
return this.parse(this.lines[0])
},
removeNext () {
this.lines.shift()
},
last () {
// No line left
if (this.lines.length === 0) return false
// Parse last line
return this.parse(this.lines[this.lines.length - 1])
},
updateTime (t) {
document.getElementById('progress').value = ((t.getTime() - this.from.getTime()) / this.duration) * 100
document.getElementById('time').innerText = t.getHours().toString().padStart(2, '0') + ':' + t.getMinutes().toString().padStart(2, '0') + ':' + t.getSeconds().toString().padStart(2, '0')
}
}, {
set: function(obj, prop, value) {
// Nothing changed
if (typeof obj[prop] !== 'undefined' && obj[prop] === value) return
@@ -152,68 +193,110 @@ var astiencoder = {
// No file
if (event.target.files.length === 0) return
// Create form data
const d = new FormData()
d.append('file', event.target.files[0])
// Create reader
const r = new FileReader()
r.addEventListener('load', () => {
// Parse lines
this.playback.lines = r.result.split(/\r\n|\n/)
// Load
this.sendHttp({
method: 'POST',
url: '/playback/load',
payload: d,
onsuccess: function(data) {
// Reset
this.reset()
// Remove last line if empty
if (this.playback.lines[this.playback.lines.length -1] === '') this.playback.lines.pop()
// Update playback
this.playback.done = false
this.playback.loaded = true
// Reset
this.reset()
// Loop through nodes
data.nodes.forEach(function(item) {
// Apply changes
this.apply(item.name, item)
}.bind(this))
}.bind(this)
})
// Update playback
this.playback.done = false
this.playback.loaded = true
// Get next
const n = this.playback.next()
this.playback.removeNext()
// Loop through nodes
n.payload.nodes.forEach(function(item) {
// Apply changes
this.apply(item.name, item)
}.bind(this))
// Get last
const l = this.playback.last()
// Update time boundaries
this.playback.from = n.time
this.playback.duration = l.time.getTime() - n.time.getTime()
// Update time
this.playback.updateTime(n.time)
});
r.readAsText(event.target.files[0])
},
onPlaybackUnloadClick () {
// Unload
this.sendHttp({
method: 'GET',
url: '/playback/unload',
onsuccess: function(data) {
// Reset
this.reset()
// Update playback
this.playback.loaded = false
// Update playback
this.playback.loaded = false
// Loop through nodes
if (data) {
data.nodes.forEach(function(item) {
// Apply changes
this.apply(item.name, item)
}.bind(this))
}
}.bind(this)
})
// On open
this.onopen()
},
onPlaybackNextClick () {
// Next
this.sendHttp({
method: 'GET',
url: '/playback/next',
onsuccess: function(data) {
// Update playback
this.playback.done = data.done
// No playback
if (!this.playback.loaded || this.playback.done) return
// Loop through items
data.items.forEach(function(item) {
this.onmessage(item.name, item.payload)
}.bind(this))
}.bind(this)
})
// Loop
var items = []
var indexedItems = {}
var stop = false
while (!stop) {
// Get next
const n = this.playback.next()
// Playback is done
if (!n) {
this.playback.done = true
stop = true
continue
}
// Get indexed key
var k = ''
switch (n.name) {
case 'astiencoder.node.continued':
case 'astiencoder.node.paused':
case 'astiencoder.node.stopped':
k = 'status | ' + n.payload
break
case 'astiencoder.node.stats':
k = 'stats | ' + n.payload.name
case 'astiencoder.node.started':
k = 'status | ' + n.payload.name
break
default:
this.playback.removeNext()
continue
}
// Same event name is being processed for same node
if (indexedItems[k]) {
stop = true
continue
}
indexedItems[k] = true
// Remove next
this.playback.removeNext()
// Append next
items.push(n)
}
// Loop through items
items.forEach(function(item) {
this.onmessage(item.name, item.payload, true)
}.bind(this))
// Update time
this.playback.updateTime(items[0].time)
},
/* tags */
@@ -652,6 +735,7 @@ var astiencoder = {
},
/* helpers */
sendHttp (options) {
const req = new XMLHttpRequest()
req.onreadystatechange = function() {