Files
streamctl/pkg/streamd/variables.go
Dmitrii Okunev 1004082fe4
Some checks failed
rolling-release / build (push) Has been cancelled
rolling-release / rolling-release (push) Has been cancelled
Multiple updates
2025-07-12 23:11:42 +01:00

81 lines
1.6 KiB
Go

package streamd
import (
"context"
"crypto"
"fmt"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/eventbus"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/streamctl/pkg/streamd/consts"
)
func (d *StreamD) GetVariable(
ctx context.Context,
key consts.VarKey,
) (api.VariableValue, error) {
v, ok := d.Variables.Load(key)
if !ok {
return nil, ErrNoVariable{}
}
b, ok := v.(api.VariableValue)
if !ok {
return nil, ErrVariableWrongType{}
}
return b, nil
}
func (d *StreamD) GetVariableHash(
ctx context.Context,
key consts.VarKey,
hashType crypto.Hash,
) ([]byte, error) {
b, err := d.GetVariable(ctx, key)
if err != nil {
return nil, err
}
hasher := hashType.New()
hasher.Write(b)
hash := hasher.Sum(nil)
return hash, nil
}
type subscriptionTopic string
func topicForVariable(key consts.VarKey) subscriptionTopic {
return subscriptionTopic(fmt.Sprintf("var:%s", key))
}
func (d *StreamD) SetVariable(
ctx context.Context,
key consts.VarKey,
value api.VariableValue,
) error {
logger.Tracef(ctx, "SetVariable(ctx, '%s', value [len == %d])", key, len(value))
defer logger.Tracef(ctx, "/SetVariable(ctx, '%s', value [len == %d])", key, len(value))
d.Variables.Store(key, value)
eventbus.SendEventWithCustomTopic(
ctx,
d.EventBus,
topicForVariable(key),
value,
)
return nil
}
func (d *StreamD) SubscribeToVariable(
ctx context.Context,
varKey consts.VarKey,
) (<-chan api.VariableValue, error) {
return eventSubToChanUsingTopic[subscriptionTopic, api.VariableValue](
ctx,
d.EventBus, 10,
nil,
topicForVariable(varKey),
)
}