Check for identical configs on process update

This commit is contained in:
Ingo Oppermann
2023-06-01 16:43:17 +02:00
parent a79cfa0c77
commit d652fd213b
7 changed files with 177 additions and 32 deletions

View File

@@ -718,7 +718,7 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error {
cmd := &store.Command{ cmd := &store.Command{
Operation: store.OpAddProcess, Operation: store.OpAddProcess,
Data: &store.CommandAddProcess{ Data: &store.CommandAddProcess{
Config: *config, Config: config,
}, },
} }
@@ -749,7 +749,7 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error {
Operation: store.OpUpdateProcess, Operation: store.OpUpdateProcess,
Data: &store.CommandUpdateProcess{ Data: &store.CommandUpdateProcess{
ID: id, ID: id,
Config: *config, Config: config,
}, },
} }

View File

@@ -1,6 +1,7 @@
package store package store
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@@ -73,12 +74,12 @@ type Command struct {
} }
type CommandAddProcess struct { type CommandAddProcess struct {
app.Config Config *app.Config
} }
type CommandUpdateProcess struct { type CommandUpdateProcess struct {
ID string ID string
Config app.Config Config *app.Config
} }
type CommandRemoveProcess struct { type CommandRemoveProcess struct {
@@ -227,16 +228,16 @@ func (s *store) addProcess(cmd CommandAddProcess) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
_, ok := s.Process[cmd.ID] _, ok := s.Process[cmd.Config.ID]
if ok { if ok {
return NewStoreError("the process with the ID '%s' already exists", cmd.ID) return NewStoreError("the process with the ID '%s' already exists", cmd.Config.ID)
} }
now := time.Now() now := time.Now()
s.Process[cmd.ID] = Process{ s.Process[cmd.Config.ID] = Process{
CreatedAt: now, CreatedAt: now,
UpdatedAt: now, UpdatedAt: now,
Config: &cmd.Config, Config: cmd.Config,
} }
return nil return nil
@@ -255,24 +256,33 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
_, ok := s.Process[cmd.ID] p, ok := s.Process[cmd.ID]
if ok { if !ok {
if cmd.ID == cmd.Config.ID { return NewStoreError("the process with the ID '%s' doesn't exists", cmd.ID)
s.Process[cmd.ID] = Process{ }
UpdatedAt: time.Now(),
Config: &cmd.Config, currentHash := p.Config.Hash()
} replaceHash := cmd.Config.Hash()
} else {
_, ok := s.Process[cmd.Config.ID] if bytes.Equal(currentHash, replaceHash) {
if !ok { return nil
delete(s.Process, cmd.ID) }
s.Process[cmd.Config.ID] = Process{
UpdatedAt: time.Now(), if cmd.ID == cmd.Config.ID {
Config: &cmd.Config, s.Process[cmd.ID] = Process{
} UpdatedAt: time.Now(),
} else { Config: cmd.Config,
return NewStoreError("the process with the ID %s already exists", cmd.Config.ID) }
} } else {
_, ok := s.Process[cmd.Config.ID]
if ok {
return NewStoreError("the process with the ID '%s' already exists", cmd.Config.ID)
}
delete(s.Process, cmd.ID)
s.Process[cmd.Config.ID] = Process{
UpdatedAt: time.Now(),
Config: cmd.Config,
} }
} }

View File

@@ -3,7 +3,6 @@ package process
import ( import (
"context" "context"
"fmt" "fmt"
"math"
"sync" "sync"
"time" "time"
@@ -439,9 +438,10 @@ func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Dur
if workingrate < 0 { if workingrate < 0 {
workingrate = limit workingrate = limit
} else {
workingrate = math.Min(workingrate/pcpu*limit, 1)
} }
// else {
// workingrate = math.Min(workingrate/pcpu*limit, 1)
//}
workingrate = lim workingrate = lim

View File

@@ -1,6 +1,11 @@
package app package app
import ( import (
"bytes"
"crypto/md5"
"strconv"
"strings"
"github.com/datarhei/core/v16/process" "github.com/datarhei/core/v16/process"
) )
@@ -11,6 +16,17 @@ type ConfigIOCleanup struct {
PurgeOnDelete bool PurgeOnDelete bool
} }
func (c *ConfigIOCleanup) HashString() string {
b := strings.Builder{}
b.WriteString(c.Pattern)
b.WriteString(strconv.FormatUint(uint64(c.MaxFiles), 10))
b.WriteString(strconv.FormatUint(uint64(c.MaxFileAge), 10))
b.WriteString(strconv.FormatBool(c.PurgeOnDelete))
return b.String()
}
type ConfigIO struct { type ConfigIO struct {
ID string ID string
Address string Address string
@@ -18,7 +34,7 @@ type ConfigIO struct {
Cleanup []ConfigIOCleanup Cleanup []ConfigIOCleanup
} }
func (io ConfigIO) Clone() ConfigIO { func (io *ConfigIO) Clone() ConfigIO {
clone := ConfigIO{ clone := ConfigIO{
ID: io.ID, ID: io.ID,
Address: io.Address, Address: io.Address,
@@ -33,6 +49,20 @@ func (io ConfigIO) Clone() ConfigIO {
return clone return clone
} }
func (io *ConfigIO) HashString() string {
b := strings.Builder{}
b.WriteString(io.ID)
b.WriteString(io.Address)
b.WriteString(strings.Join(io.Options, ","))
for _, x := range io.Cleanup {
b.WriteString(x.HashString())
}
return b.String()
}
type Config struct { type Config struct {
ID string ID string
Reference string Reference string
@@ -113,6 +143,39 @@ func (config *Config) CreateCommand() []string {
return command return command
} }
func (config *Config) Hash() []byte {
b := bytes.Buffer{}
b.WriteString(config.ID)
b.WriteString(config.Reference)
b.WriteString(config.Owner)
b.WriteString(config.Domain)
b.WriteString(config.FFVersion)
b.WriteString(config.Scheduler)
b.WriteString(strings.Join(config.Options, ","))
b.WriteString(strings.Join(config.LogPatterns, ","))
b.WriteString(strconv.FormatBool(config.Reconnect))
b.WriteString(strconv.FormatBool(config.Autostart))
b.WriteString(strconv.FormatUint(config.ReconnectDelay, 10))
b.WriteString(strconv.FormatUint(config.StaleTimeout, 10))
b.WriteString(strconv.FormatUint(config.Timeout, 10))
b.WriteString(strconv.FormatUint(config.LimitMemory, 10))
b.WriteString(strconv.FormatUint(config.LimitWaitFor, 10))
b.WriteString(strconv.FormatFloat(config.LimitCPU, 'f', -1, 64))
for _, x := range config.Input {
b.WriteString(x.HashString())
}
for _, x := range config.Output {
b.WriteString(x.HashString())
}
sum := md5.Sum(b.Bytes())
return sum[:]
}
type Process struct { type Process struct {
ID string ID string
Owner string Owner string

View File

@@ -1,6 +1,7 @@
package app package app
import ( import (
"bytes"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@@ -13,7 +14,7 @@ func TestCreateCommand(t *testing.T) {
{Address: "inputAddress", Options: []string{"-input", "inputoption"}}, {Address: "inputAddress", Options: []string{"-input", "inputoption"}},
}, },
Output: []ConfigIO{ Output: []ConfigIO{
{Address: "outputAddress", Options: []string{"-output", "oututoption"}}, {Address: "outputAddress", Options: []string{"-output", "outputoption"}},
}, },
} }
@@ -21,6 +22,39 @@ func TestCreateCommand(t *testing.T) {
require.Equal(t, []string{ require.Equal(t, []string{
"-global", "global", "-global", "global",
"-input", "inputoption", "-i", "inputAddress", "-input", "inputoption", "-i", "inputAddress",
"-output", "oututoption", "outputAddress", "-output", "outputoption", "outputAddress",
}, command) }, command)
} }
func TestConfigHash(t *testing.T) {
config := &Config{
ID: "id",
Reference: "ref",
Owner: "owner",
Domain: "domain",
FFVersion: "1.2.3",
Input: []ConfigIO{{Address: "inputAddress", Options: []string{"-input", "inputoption"}}},
Output: []ConfigIO{{Address: "outputAddress", Options: []string{"-output", "outputoption"}}},
Options: []string{"-global", "global"},
Reconnect: true,
ReconnectDelay: 15,
Autostart: false,
StaleTimeout: 42,
Timeout: 9,
Scheduler: "* * * * *",
LogPatterns: []string{"^libx264"},
LimitCPU: 50,
LimitMemory: 3 * 1024 * 1024,
LimitWaitFor: 20,
}
hash1 := config.Hash()
require.Equal(t, []byte{0x23, 0x5d, 0xcc, 0x36, 0x77, 0xa1, 0x49, 0x7c, 0xcd, 0x8a, 0x72, 0x6a, 0x6c, 0xa2, 0xc3, 0x24}, hash1)
config.Reconnect = false
hash2 := config.Hash()
require.False(t, bytes.Equal(hash1, hash2))
}

View File

@@ -1,6 +1,7 @@
package restream package restream
import ( import (
"bytes"
"context" "context"
"errors" "errors"
"fmt" "fmt"
@@ -1184,6 +1185,14 @@ func (r *restream) UpdateProcess(id TaskID, config *app.Config) error {
return ErrUnknownProcess return ErrUnknownProcess
} }
currentHash := task.config.Hash()
replaceHash := config.Hash()
// If the new config has the same hash as the current config, do nothing.
if bytes.Equal(currentHash, replaceHash) {
return nil
}
t, err := r.createTask(config) t, err := r.createTask(config)
if err != nil { if err != nil {
return err return err

View File

@@ -302,6 +302,35 @@ func TestUpdateProcess(t *testing.T) {
require.NotEqual(t, updatedAt, process.UpdatedAt) require.NotEqual(t, updatedAt, process.UpdatedAt)
} }
func TestUpdateSameHashProcess(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err)
config := getDummyProcess()
require.NotNil(t, config)
tid := TaskID{ID: config.ID}
err = rs.AddProcess(config)
require.Equal(t, nil, err)
process, err := rs.GetProcess(tid)
require.NoError(t, err)
createdAt := process.CreatedAt
updatedAt := process.UpdatedAt
time.Sleep(2 * time.Second)
err = rs.UpdateProcess(tid, config)
require.NoError(t, err)
process, err = rs.GetProcess(tid)
require.NoError(t, err)
require.Equal(t, createdAt, process.CreatedAt)
require.Equal(t, updatedAt, process.UpdatedAt)
}
func TestUpdateProcessLogHistoryTransfer(t *testing.T) { func TestUpdateProcessLogHistoryTransfer(t *testing.T) {
rs, err := getDummyRestreamer(nil, nil, nil, nil) rs, err := getDummyRestreamer(nil, nil, nil, nil)
require.NoError(t, err) require.NoError(t, err)