Files
streamctl/pkg/streamd/trigger_rules.go

108 lines
3.2 KiB
Go

package streamd
import (
"context"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/xsync"
)
func (d *StreamD) AddTriggerRule(
ctx context.Context,
triggerRule *api.TriggerRule,
) (api.TriggerRuleID, error) {
logger.Debugf(ctx, "AddTriggerRule(ctx, %s)", triggerRule)
defer logger.Debugf(ctx, "/AddTriggerRule(ctx, %s)", triggerRule)
ruleID, err := d.addTriggerRuleToConfig(ctx, triggerRule)
if err != nil {
return 0, fmt.Errorf("unable to add the trigger rule to config: %w", err)
}
return ruleID, nil
}
func (d *StreamD) addTriggerRuleToConfig(
ctx context.Context,
triggerRule *api.TriggerRule,
) (api.TriggerRuleID, error) {
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRuleID, error) {
ruleID := api.TriggerRuleID(len(d.Config.TriggerRules))
d.Config.TriggerRules = append(d.Config.TriggerRules, triggerRule)
if err := d.SaveConfig(ctx); err != nil {
return ruleID, fmt.Errorf("unable to save the config: %w", err)
}
return ruleID, nil
})
}
func (d *StreamD) UpdateTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
triggerRule *api.TriggerRule,
) error {
logger.Debugf(ctx, "UpdateTriggerRule(ctx, %v, %s)", ruleID, triggerRule)
defer logger.Debugf(ctx, "/UpdateTriggerRule(ctx, %v, %s)", ruleID, triggerRule)
if err := d.updateTriggerRuleInConfig(ctx, ruleID, triggerRule); err != nil {
return fmt.Errorf("unable to update the trigger rule %d in the config: %w", ruleID, err)
}
return nil
}
func (d *StreamD) updateTriggerRuleInConfig(
ctx context.Context,
ruleID api.TriggerRuleID,
triggerRule *api.TriggerRule,
) error {
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
return fmt.Errorf("rule %d does not exist", ruleID)
}
d.Config.TriggerRules[ruleID] = triggerRule
if err := d.SaveConfig(ctx); err != nil {
return fmt.Errorf("unable to save the config: %w", err)
}
return nil
})
}
func (d *StreamD) RemoveTriggerRule(
ctx context.Context,
ruleID api.TriggerRuleID,
) error {
logger.Debugf(ctx, "RemoveTriggerRule(ctx, %v)", ruleID)
defer logger.Debugf(ctx, "/RemoveTriggerRule(ctx, %v)", ruleID)
if err := d.removeTriggerRuleFromConfig(ctx, ruleID); err != nil {
return fmt.Errorf("unable to remove the trigger rule %d from the config: %w", ruleID, err)
}
return nil
}
func (d *StreamD) removeTriggerRuleFromConfig(
ctx context.Context,
ruleID api.TriggerRuleID,
) error {
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
return fmt.Errorf("rule %d does not exist", ruleID)
}
d.Config.TriggerRules = append(d.Config.TriggerRules[:ruleID], d.Config.TriggerRules[ruleID+1:]...)
if err := d.SaveConfig(ctx); err != nil {
return fmt.Errorf("unable to save the config: %w", err)
}
return nil
})
}
func (d *StreamD) ListTriggerRules(
ctx context.Context,
) (api.TriggerRules, error) {
logger.Debugf(ctx, "ListTriggerRules(ctx)")
defer logger.Debugf(ctx, "/ListTriggerRules(ctx)")
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRules, error) {
rules := make(api.TriggerRules, len(d.Config.TriggerRules))
copy(rules, d.Config.TriggerRules)
return rules, nil
})
}