Files
streamctl/pkg/streamd/timer.go
2025-06-22 14:59:00 +01:00

116 lines
2.1 KiB
Go

package streamd
import (
"context"
"time"
"github.com/facebookincubator/go-belt/tool/logger"
"github.com/xaionaro-go/observability"
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
"github.com/xaionaro-go/xsync"
)
type Timer struct {
api.Timer
xsync.Mutex
StreamD *StreamD
RunningTimer *time.Timer
}
func NewTimer(
streamD *StreamD,
timerID api.TimerID,
triggerAt time.Time,
action api.Action,
) *Timer {
return &Timer{
StreamD: streamD,
Timer: api.Timer{
ID: timerID,
TriggerAt: triggerAt,
Action: action,
},
}
}
func (t *Timer) Start(ctx context.Context) {
logger.Debugf(ctx, "Start")
defer logger.Debugf(ctx, "/Start")
t.Do(ctx, func() {
t.start(ctx)
})
}
func (t *Timer) Stop(ctx context.Context) {
logger.Debugf(ctx, "Stop")
defer logger.Debugf(ctx, "/Stop")
t.Do(ctx, func() {
t.stop(ctx)
})
}
func (t *Timer) start(ctx context.Context) {
logger.Debugf(ctx, "start")
defer logger.Debugf(ctx, "/start")
if t.RunningTimer != nil {
return
}
runningTimer := time.NewTimer(time.Until(t.TriggerAt))
t.RunningTimer = runningTimer
observability.Go(ctx, func(ctx context.Context) {
select {
case <-ctx.Done():
t.stop(ctx)
return
case <-runningTimer.C:
}
t.Do(ctx, func() {
t.stop(ctx)
t.trigger(ctx)
})
})
}
func (t *Timer) stop(ctx context.Context) {
logger.Debugf(ctx, "stop")
defer logger.Debugf(ctx, "/stop")
if t.RunningTimer == nil {
return
}
t.RunningTimer.Stop()
t.RunningTimer = nil
}
func (t *Timer) Trigger(ctx context.Context) {
logger.Debugf(ctx, "Trigger")
defer logger.Debugf(ctx, "/Trigger")
t.Do(ctx, func() {
t.trigger(ctx)
})
}
func (t *Timer) trigger(ctx context.Context) {
logger.Debugf(ctx, "trigger (%T)", t.Timer.Action)
defer logger.Debugf(ctx, "/trigger (%T)", t.Timer.Action)
observability.Go(ctx, func(ctx context.Context) {
err := t.StreamD.RemoveTimer(ctx, t.Timer.ID)
if err != nil {
logger.Error(ctx, "unable to remove timer %d: %v", t.Timer.ID, err)
}
})
err := t.StreamD.doAction(ctx, t.Timer.Action, nil)
if err != nil {
logger.Errorf(ctx, "unable to perform action %#+v: %v", t.Timer.Action, err)
}
}