mirror of
https://github.com/xaionaro-go/streamctl.git
synced 2025-10-05 15:37:00 +08:00
108 lines
3.2 KiB
Go
108 lines
3.2 KiB
Go
package streamd
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/facebookincubator/go-belt/tool/logger"
|
|
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
|
|
"github.com/xaionaro-go/xsync"
|
|
)
|
|
|
|
func (d *StreamD) AddTriggerRule(
|
|
ctx context.Context,
|
|
triggerRule *api.TriggerRule,
|
|
) (api.TriggerRuleID, error) {
|
|
logger.Debugf(ctx, "AddTriggerRule(ctx, %s)", triggerRule)
|
|
defer logger.Debugf(ctx, "/AddTriggerRule(ctx, %s)", triggerRule)
|
|
ruleID, err := d.addTriggerRuleToConfig(ctx, triggerRule)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("unable to add the trigger rule to config: %w", err)
|
|
}
|
|
return ruleID, nil
|
|
}
|
|
|
|
func (d *StreamD) addTriggerRuleToConfig(
|
|
ctx context.Context,
|
|
triggerRule *api.TriggerRule,
|
|
) (api.TriggerRuleID, error) {
|
|
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRuleID, error) {
|
|
ruleID := api.TriggerRuleID(len(d.Config.TriggerRules))
|
|
d.Config.TriggerRules = append(d.Config.TriggerRules, triggerRule)
|
|
if err := d.SaveConfig(ctx); err != nil {
|
|
return ruleID, fmt.Errorf("unable to save the config: %w", err)
|
|
}
|
|
return ruleID, nil
|
|
})
|
|
}
|
|
|
|
func (d *StreamD) UpdateTriggerRule(
|
|
ctx context.Context,
|
|
ruleID api.TriggerRuleID,
|
|
triggerRule *api.TriggerRule,
|
|
) error {
|
|
logger.Debugf(ctx, "UpdateTriggerRule(ctx, %v, %s)", ruleID, triggerRule)
|
|
defer logger.Debugf(ctx, "/UpdateTriggerRule(ctx, %v, %s)", ruleID, triggerRule)
|
|
if err := d.updateTriggerRuleInConfig(ctx, ruleID, triggerRule); err != nil {
|
|
return fmt.Errorf("unable to update the trigger rule %d in the config: %w", ruleID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *StreamD) updateTriggerRuleInConfig(
|
|
ctx context.Context,
|
|
ruleID api.TriggerRuleID,
|
|
triggerRule *api.TriggerRule,
|
|
) error {
|
|
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
|
|
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
|
|
return fmt.Errorf("rule %d does not exist", ruleID)
|
|
}
|
|
d.Config.TriggerRules[ruleID] = triggerRule
|
|
if err := d.SaveConfig(ctx); err != nil {
|
|
return fmt.Errorf("unable to save the config: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (d *StreamD) RemoveTriggerRule(
|
|
ctx context.Context,
|
|
ruleID api.TriggerRuleID,
|
|
) error {
|
|
logger.Debugf(ctx, "RemoveTriggerRule(ctx, %v)", ruleID)
|
|
defer logger.Debugf(ctx, "/RemoveTriggerRule(ctx, %v)", ruleID)
|
|
if err := d.removeTriggerRuleFromConfig(ctx, ruleID); err != nil {
|
|
return fmt.Errorf("unable to remove the trigger rule %d from the config: %w", ruleID, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *StreamD) removeTriggerRuleFromConfig(
|
|
ctx context.Context,
|
|
ruleID api.TriggerRuleID,
|
|
) error {
|
|
return xsync.DoR1(ctx, &d.ConfigLock, func() error {
|
|
if ruleID >= api.TriggerRuleID(len(d.Config.TriggerRules)) {
|
|
return fmt.Errorf("rule %d does not exist", ruleID)
|
|
}
|
|
d.Config.TriggerRules = append(d.Config.TriggerRules[:ruleID], d.Config.TriggerRules[ruleID+1:]...)
|
|
if err := d.SaveConfig(ctx); err != nil {
|
|
return fmt.Errorf("unable to save the config: %w", err)
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
|
|
func (d *StreamD) ListTriggerRules(
|
|
ctx context.Context,
|
|
) (api.TriggerRules, error) {
|
|
logger.Debugf(ctx, "ListTriggerRules(ctx)")
|
|
defer logger.Debugf(ctx, "/ListTriggerRules(ctx)")
|
|
return xsync.DoR2(ctx, &d.ConfigLock, func() (api.TriggerRules, error) {
|
|
rules := make(api.TriggerRules, len(d.Config.TriggerRules))
|
|
copy(rules, d.Config.TriggerRules)
|
|
return rules, nil
|
|
})
|
|
}
|