Process []byte instread of string in parser

This commit is contained in:
Ingo Oppermann
2024-07-26 11:31:47 +02:00
parent d391e274d7
commit 70a49f8bdb
7 changed files with 209 additions and 125 deletions

View File

@@ -1,11 +1,11 @@
package parse
import (
"bytes"
"container/ring"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
@@ -200,14 +200,14 @@ func New(config Config) Parser {
return p
}
func (p *parser) Parse(line string) uint64 {
isDefaultProgress := strings.HasPrefix(line, "frame=")
isFFmpegInputs := strings.HasPrefix(line, "ffmpeg.inputs:")
isFFmpegOutputs := strings.HasPrefix(line, "ffmpeg.outputs:")
isFFmpegMapping := strings.HasPrefix(line, "ffmpeg.mapping:")
isFFmpegProgress := strings.HasPrefix(line, "ffmpeg.progress:")
isHLSStreamMap := strings.HasPrefix(line, "hls.streammap:")
isAVstreamProgress := strings.HasPrefix(line, "avstream.progress:")
func (p *parser) Parse(line []byte) uint64 {
isDefaultProgress := bytes.HasPrefix(line, []byte("frame="))
isFFmpegInputs := bytes.HasPrefix(line, []byte("ffmpeg.inputs:"))
isFFmpegOutputs := bytes.HasPrefix(line, []byte("ffmpeg.outputs:"))
isFFmpegMapping := bytes.HasPrefix(line, []byte("ffmpeg.mapping:"))
isFFmpegProgress := bytes.HasPrefix(line, []byte("ffmpeg.progress:"))
isHLSStreamMap := bytes.HasPrefix(line, []byte("hls.streammap:"))
isAVstreamProgress := bytes.HasPrefix(line, []byte("avstream.progress:"))
p.lock.log.Lock()
if p.logStart.IsZero() {
@@ -235,7 +235,7 @@ func (p *parser) Parse(line string) uint64 {
}
if isFFmpegInputs {
if err := p.parseFFmpegIO("input", strings.TrimPrefix(line, "ffmpeg.inputs:")); err != nil {
if err := p.parseFFmpegIO("input", bytes.TrimPrefix(line, []byte("ffmpeg.inputs:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -246,7 +246,7 @@ func (p *parser) Parse(line string) uint64 {
}
if isHLSStreamMap {
if err := p.parseHLSStreamMap(strings.TrimPrefix(line, "hls.streammap:")); err != nil {
if err := p.parseHLSStreamMap(bytes.TrimPrefix(line, []byte("hls.streammap:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -257,7 +257,7 @@ func (p *parser) Parse(line string) uint64 {
}
if isFFmpegOutputs {
if err := p.parseFFmpegIO("output", strings.TrimPrefix(line, "ffmpeg.outputs:")); err != nil {
if err := p.parseFFmpegIO("output", bytes.TrimPrefix(line, []byte("ffmpeg.outputs:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -287,7 +287,7 @@ func (p *parser) Parse(line string) uint64 {
}
if isFFmpegMapping {
if err := p.parseFFmpegMapping(strings.TrimPrefix(line, "ffmpeg.mapping:")); err != nil {
if err := p.parseFFmpegMapping(bytes.TrimPrefix(line, []byte("ffmpeg.mapping:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -298,13 +298,14 @@ func (p *parser) Parse(line string) uint64 {
}
if !isDefaultProgress && !isFFmpegProgress && !isAVstreamProgress {
stringLine := string(line)
// Write the current non-progress line to the log
p.addLog(line)
p.addLog(stringLine)
p.lock.prelude.Lock()
if !p.prelude.done {
if len(p.prelude.data) < p.prelude.headLines {
p.prelude.data = append(p.prelude.data, line)
p.prelude.data = append(p.prelude.data, stringLine)
} else {
p.prelude.tail.Value = line
p.prelude.tail = p.prelude.tail.Next()
@@ -315,8 +316,8 @@ func (p *parser) Parse(line string) uint64 {
p.lock.log.Lock()
for _, pattern := range p.logpatterns.patterns {
if pattern.MatchString(line) {
p.logpatterns.matches = append(p.logpatterns.matches, line)
if pattern.Match(line) {
p.logpatterns.matches = append(p.logpatterns.matches, stringLine)
}
}
p.lock.log.Unlock()
@@ -363,7 +364,7 @@ func (p *parser) Parse(line string) uint64 {
// Update the progress
if isAVstreamProgress {
if err := p.parseAVstreamProgress(strings.TrimPrefix(line, "avstream.progress:")); err != nil {
if err := p.parseAVstreamProgress(bytes.TrimPrefix(line, []byte("avstream.progress:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -382,7 +383,7 @@ func (p *parser) Parse(line string) uint64 {
return 0
}
} else if isFFmpegProgress {
if err := p.parseFFmpegProgress(strings.TrimPrefix(line, "ffmpeg.progress:")); err != nil {
if err := p.parseFFmpegProgress(bytes.TrimPrefix(line, []byte("ffmpeg.progress:"))); err != nil {
p.logger.WithFields(log.Fields{
"line": line,
"error": err,
@@ -466,48 +467,48 @@ func (p *parser) Parse(line string) uint64 {
return pFrames
}
func (p *parser) parseDefaultProgress(line string) error {
var matches []string
func (p *parser) parseDefaultProgress(line []byte) error {
var matches [][]byte
if matches = p.re.frame.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(matches[1], 10, 64); err == nil {
if matches = p.re.frame.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(string(matches[1]), 10, 64); err == nil {
p.progress.ffmpeg.Frame = x
}
}
if matches = p.re.quantizer.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseFloat(matches[1], 64); err == nil {
if matches = p.re.quantizer.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseFloat(string(matches[1]), 64); err == nil {
p.progress.ffmpeg.Quantizer = x
}
}
if matches = p.re.size.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(matches[1], 10, 64); err == nil {
if matches = p.re.size.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(string(matches[1]), 10, 64); err == nil {
p.progress.ffmpeg.Size = x * 1024
}
}
if matches = p.re.time.FindStringSubmatch(line); matches != nil {
s := fmt.Sprintf("%sh%sm%ss%s0ms", matches[1], matches[2], matches[3], matches[4])
if matches = p.re.time.FindSubmatch(line); matches != nil {
s := fmt.Sprintf("%sh%sm%ss%s0ms", string(matches[1]), string(matches[2]), string(matches[3]), string(matches[4]))
if x, err := time.ParseDuration(s); err == nil {
p.progress.ffmpeg.Time.Duration = x
}
}
if matches = p.re.speed.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseFloat(matches[1], 64); err == nil {
if matches = p.re.speed.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseFloat(string(matches[1]), 64); err == nil {
p.progress.ffmpeg.Speed = x
}
}
if matches = p.re.drop.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(matches[1], 10, 64); err == nil {
if matches = p.re.drop.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(string(matches[1]), 10, 64); err == nil {
p.progress.ffmpeg.Drop = x
}
}
if matches = p.re.dup.FindStringSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(matches[1], 10, 64); err == nil {
if matches = p.re.dup.FindSubmatch(line); matches != nil {
if x, err := strconv.ParseUint(string(matches[1]), 10, 64); err == nil {
p.progress.ffmpeg.Dup = x
}
}
@@ -515,10 +516,10 @@ func (p *parser) parseDefaultProgress(line string) error {
return nil
}
func (p *parser) parseFFmpegIO(kind, line string) error {
func (p *parser) parseFFmpegIO(kind string, line []byte) error {
processIO := []ffmpegProcessIO{}
err := json.Unmarshal([]byte(line), &processIO)
err := json.Unmarshal(line, &processIO)
if err != nil {
return err
}
@@ -542,10 +543,10 @@ func (p *parser) parseFFmpegIO(kind, line string) error {
return nil
}
func (p *parser) parseFFmpegMapping(line string) error {
func (p *parser) parseFFmpegMapping(line []byte) error {
mapping := ffmpegStreamMapping{}
err := json.Unmarshal([]byte(line), &mapping)
err := json.Unmarshal(line, &mapping)
if err != nil {
return err
}
@@ -555,10 +556,10 @@ func (p *parser) parseFFmpegMapping(line string) error {
return nil
}
func (p *parser) parseHLSStreamMap(line string) error {
func (p *parser) parseHLSStreamMap(line []byte) error {
mapping := ffmpegHLSStreamMap{}
err := json.Unmarshal([]byte(line), &mapping)
err := json.Unmarshal(line, &mapping)
if err != nil {
return err
}
@@ -568,10 +569,10 @@ func (p *parser) parseHLSStreamMap(line string) error {
return nil
}
func (p *parser) parseFFmpegProgress(line string) error {
func (p *parser) parseFFmpegProgress(line []byte) error {
progress := ffmpegProgress{}
err := json.Unmarshal([]byte(line), &progress)
err := json.Unmarshal(line, &progress)
if err != nil {
return err
}
@@ -609,10 +610,10 @@ func (p *parser) parseFFmpegProgress(line string) error {
return nil
}
func (p *parser) parseAVstreamProgress(line string) error {
func (p *parser) parseAVstreamProgress(line []byte) error {
progress := ffmpegAVstream{}
err := json.Unmarshal([]byte(line), &progress)
err := json.Unmarshal(line, &progress)
if err != nil {
return err
}
@@ -766,7 +767,7 @@ func (p *parser) addLog(line string) {
p.log.Value = process.Line{
Timestamp: time.Now(),
Data: line,
Data: p.lastLogline,
}
p.log = p.log.Next()
}