diff --git a/cluster/cluster.go b/cluster/cluster.go index edfc34bc..4977dbe4 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -718,7 +718,7 @@ func (c *cluster) AddProcess(origin string, config *app.Config) error { cmd := &store.Command{ Operation: store.OpAddProcess, Data: &store.CommandAddProcess{ - Config: *config, + Config: config, }, } @@ -749,7 +749,7 @@ func (c *cluster) UpdateProcess(origin, id string, config *app.Config) error { Operation: store.OpUpdateProcess, Data: &store.CommandUpdateProcess{ ID: id, - Config: *config, + Config: config, }, } diff --git a/cluster/store/store.go b/cluster/store/store.go index 1e8def19..351f80b4 100644 --- a/cluster/store/store.go +++ b/cluster/store/store.go @@ -1,6 +1,7 @@ package store import ( + "bytes" "encoding/json" "fmt" "io" @@ -73,12 +74,12 @@ type Command struct { } type CommandAddProcess struct { - app.Config + Config *app.Config } type CommandUpdateProcess struct { ID string - Config app.Config + Config *app.Config } type CommandRemoveProcess struct { @@ -227,16 +228,16 @@ func (s *store) addProcess(cmd CommandAddProcess) error { s.lock.Lock() defer s.lock.Unlock() - _, ok := s.Process[cmd.ID] + _, ok := s.Process[cmd.Config.ID] if ok { - return NewStoreError("the process with the ID '%s' already exists", cmd.ID) + return NewStoreError("the process with the ID '%s' already exists", cmd.Config.ID) } now := time.Now() - s.Process[cmd.ID] = Process{ + s.Process[cmd.Config.ID] = Process{ CreatedAt: now, UpdatedAt: now, - Config: &cmd.Config, + Config: cmd.Config, } return nil @@ -255,24 +256,33 @@ func (s *store) updateProcess(cmd CommandUpdateProcess) error { s.lock.Lock() defer s.lock.Unlock() - _, ok := s.Process[cmd.ID] - if ok { - if cmd.ID == cmd.Config.ID { - s.Process[cmd.ID] = Process{ - UpdatedAt: time.Now(), - Config: &cmd.Config, - } - } else { - _, ok := s.Process[cmd.Config.ID] - if !ok { - delete(s.Process, cmd.ID) - s.Process[cmd.Config.ID] = Process{ - UpdatedAt: time.Now(), - Config: &cmd.Config, - } - } else { - return NewStoreError("the process with the ID %s already exists", cmd.Config.ID) - } + p, ok := s.Process[cmd.ID] + if !ok { + return NewStoreError("the process with the ID '%s' doesn't exists", cmd.ID) + } + + currentHash := p.Config.Hash() + replaceHash := cmd.Config.Hash() + + if bytes.Equal(currentHash, replaceHash) { + return nil + } + + if cmd.ID == cmd.Config.ID { + s.Process[cmd.ID] = Process{ + UpdatedAt: time.Now(), + Config: cmd.Config, + } + } else { + _, ok := s.Process[cmd.Config.ID] + if ok { + return NewStoreError("the process with the ID '%s' already exists", cmd.Config.ID) + } + + delete(s.Process, cmd.ID) + s.Process[cmd.Config.ID] = Process{ + UpdatedAt: time.Now(), + Config: cmd.Config, } } diff --git a/process/limiter.go b/process/limiter.go index 6faa6198..33de7f3a 100644 --- a/process/limiter.go +++ b/process/limiter.go @@ -3,7 +3,6 @@ package process import ( "context" "fmt" - "math" "sync" "time" @@ -439,9 +438,10 @@ func (l *limiter) limitCPU(ctx context.Context, limit float64, interval time.Dur if workingrate < 0 { workingrate = limit - } else { - workingrate = math.Min(workingrate/pcpu*limit, 1) } + // else { + // workingrate = math.Min(workingrate/pcpu*limit, 1) + //} workingrate = lim diff --git a/restream/app/process.go b/restream/app/process.go index d19eb87f..08a339bc 100644 --- a/restream/app/process.go +++ b/restream/app/process.go @@ -1,6 +1,11 @@ package app import ( + "bytes" + "crypto/md5" + "strconv" + "strings" + "github.com/datarhei/core/v16/process" ) @@ -11,6 +16,17 @@ type ConfigIOCleanup struct { PurgeOnDelete bool } +func (c *ConfigIOCleanup) HashString() string { + b := strings.Builder{} + + b.WriteString(c.Pattern) + b.WriteString(strconv.FormatUint(uint64(c.MaxFiles), 10)) + b.WriteString(strconv.FormatUint(uint64(c.MaxFileAge), 10)) + b.WriteString(strconv.FormatBool(c.PurgeOnDelete)) + + return b.String() +} + type ConfigIO struct { ID string Address string @@ -18,7 +34,7 @@ type ConfigIO struct { Cleanup []ConfigIOCleanup } -func (io ConfigIO) Clone() ConfigIO { +func (io *ConfigIO) Clone() ConfigIO { clone := ConfigIO{ ID: io.ID, Address: io.Address, @@ -33,6 +49,20 @@ func (io ConfigIO) Clone() ConfigIO { return clone } +func (io *ConfigIO) HashString() string { + b := strings.Builder{} + + b.WriteString(io.ID) + b.WriteString(io.Address) + b.WriteString(strings.Join(io.Options, ",")) + + for _, x := range io.Cleanup { + b.WriteString(x.HashString()) + } + + return b.String() +} + type Config struct { ID string Reference string @@ -113,6 +143,39 @@ func (config *Config) CreateCommand() []string { return command } +func (config *Config) Hash() []byte { + b := bytes.Buffer{} + + b.WriteString(config.ID) + b.WriteString(config.Reference) + b.WriteString(config.Owner) + b.WriteString(config.Domain) + b.WriteString(config.FFVersion) + b.WriteString(config.Scheduler) + b.WriteString(strings.Join(config.Options, ",")) + b.WriteString(strings.Join(config.LogPatterns, ",")) + b.WriteString(strconv.FormatBool(config.Reconnect)) + b.WriteString(strconv.FormatBool(config.Autostart)) + b.WriteString(strconv.FormatUint(config.ReconnectDelay, 10)) + b.WriteString(strconv.FormatUint(config.StaleTimeout, 10)) + b.WriteString(strconv.FormatUint(config.Timeout, 10)) + b.WriteString(strconv.FormatUint(config.LimitMemory, 10)) + b.WriteString(strconv.FormatUint(config.LimitWaitFor, 10)) + b.WriteString(strconv.FormatFloat(config.LimitCPU, 'f', -1, 64)) + + for _, x := range config.Input { + b.WriteString(x.HashString()) + } + + for _, x := range config.Output { + b.WriteString(x.HashString()) + } + + sum := md5.Sum(b.Bytes()) + + return sum[:] +} + type Process struct { ID string Owner string diff --git a/restream/app/process_test.go b/restream/app/process_test.go index ad933a79..17bfa6bb 100644 --- a/restream/app/process_test.go +++ b/restream/app/process_test.go @@ -1,6 +1,7 @@ package app import ( + "bytes" "testing" "github.com/stretchr/testify/require" @@ -13,7 +14,7 @@ func TestCreateCommand(t *testing.T) { {Address: "inputAddress", Options: []string{"-input", "inputoption"}}, }, Output: []ConfigIO{ - {Address: "outputAddress", Options: []string{"-output", "oututoption"}}, + {Address: "outputAddress", Options: []string{"-output", "outputoption"}}, }, } @@ -21,6 +22,39 @@ func TestCreateCommand(t *testing.T) { require.Equal(t, []string{ "-global", "global", "-input", "inputoption", "-i", "inputAddress", - "-output", "oututoption", "outputAddress", + "-output", "outputoption", "outputAddress", }, command) } + +func TestConfigHash(t *testing.T) { + config := &Config{ + ID: "id", + Reference: "ref", + Owner: "owner", + Domain: "domain", + FFVersion: "1.2.3", + Input: []ConfigIO{{Address: "inputAddress", Options: []string{"-input", "inputoption"}}}, + Output: []ConfigIO{{Address: "outputAddress", Options: []string{"-output", "outputoption"}}}, + Options: []string{"-global", "global"}, + Reconnect: true, + ReconnectDelay: 15, + Autostart: false, + StaleTimeout: 42, + Timeout: 9, + Scheduler: "* * * * *", + LogPatterns: []string{"^libx264"}, + LimitCPU: 50, + LimitMemory: 3 * 1024 * 1024, + LimitWaitFor: 20, + } + + hash1 := config.Hash() + + require.Equal(t, []byte{0x23, 0x5d, 0xcc, 0x36, 0x77, 0xa1, 0x49, 0x7c, 0xcd, 0x8a, 0x72, 0x6a, 0x6c, 0xa2, 0xc3, 0x24}, hash1) + + config.Reconnect = false + + hash2 := config.Hash() + + require.False(t, bytes.Equal(hash1, hash2)) +} diff --git a/restream/restream.go b/restream/restream.go index 3fc2d439..ff745888 100644 --- a/restream/restream.go +++ b/restream/restream.go @@ -1,6 +1,7 @@ package restream import ( + "bytes" "context" "errors" "fmt" @@ -1184,6 +1185,14 @@ func (r *restream) UpdateProcess(id TaskID, config *app.Config) error { return ErrUnknownProcess } + currentHash := task.config.Hash() + replaceHash := config.Hash() + + // If the new config has the same hash as the current config, do nothing. + if bytes.Equal(currentHash, replaceHash) { + return nil + } + t, err := r.createTask(config) if err != nil { return err diff --git a/restream/restream_test.go b/restream/restream_test.go index 4a8df128..d36f8efe 100644 --- a/restream/restream_test.go +++ b/restream/restream_test.go @@ -302,6 +302,35 @@ func TestUpdateProcess(t *testing.T) { require.NotEqual(t, updatedAt, process.UpdatedAt) } +func TestUpdateSameHashProcess(t *testing.T) { + rs, err := getDummyRestreamer(nil, nil, nil, nil) + require.NoError(t, err) + + config := getDummyProcess() + require.NotNil(t, config) + tid := TaskID{ID: config.ID} + + err = rs.AddProcess(config) + require.Equal(t, nil, err) + + process, err := rs.GetProcess(tid) + require.NoError(t, err) + + createdAt := process.CreatedAt + updatedAt := process.UpdatedAt + + time.Sleep(2 * time.Second) + + err = rs.UpdateProcess(tid, config) + require.NoError(t, err) + + process, err = rs.GetProcess(tid) + require.NoError(t, err) + + require.Equal(t, createdAt, process.CreatedAt) + require.Equal(t, updatedAt, process.UpdatedAt) +} + func TestUpdateProcessLogHistoryTransfer(t *testing.T) { rs, err := getDummyRestreamer(nil, nil, nil, nil) require.NoError(t, err)