mirror of
https://github.com/xaionaro-go/streamctl.git
synced 2025-09-27 03:45:52 +08:00
116 lines
2.1 KiB
Go
116 lines
2.1 KiB
Go
package streamd
|
|
|
|
import (
|
|
"context"
|
|
"time"
|
|
|
|
"github.com/facebookincubator/go-belt/tool/logger"
|
|
"github.com/xaionaro-go/observability"
|
|
"github.com/xaionaro-go/streamctl/pkg/streamd/api"
|
|
"github.com/xaionaro-go/xsync"
|
|
)
|
|
|
|
type Timer struct {
|
|
api.Timer
|
|
xsync.Mutex
|
|
StreamD *StreamD
|
|
RunningTimer *time.Timer
|
|
}
|
|
|
|
func NewTimer(
|
|
streamD *StreamD,
|
|
timerID api.TimerID,
|
|
triggerAt time.Time,
|
|
action api.Action,
|
|
) *Timer {
|
|
return &Timer{
|
|
StreamD: streamD,
|
|
Timer: api.Timer{
|
|
ID: timerID,
|
|
TriggerAt: triggerAt,
|
|
Action: action,
|
|
},
|
|
}
|
|
}
|
|
|
|
func (t *Timer) Start(ctx context.Context) {
|
|
logger.Debugf(ctx, "Start")
|
|
defer logger.Debugf(ctx, "/Start")
|
|
|
|
t.Do(ctx, func() {
|
|
t.start(ctx)
|
|
})
|
|
}
|
|
|
|
func (t *Timer) Stop(ctx context.Context) {
|
|
logger.Debugf(ctx, "Stop")
|
|
defer logger.Debugf(ctx, "/Stop")
|
|
|
|
t.Do(ctx, func() {
|
|
t.stop(ctx)
|
|
})
|
|
}
|
|
|
|
func (t *Timer) start(ctx context.Context) {
|
|
logger.Debugf(ctx, "start")
|
|
defer logger.Debugf(ctx, "/start")
|
|
|
|
if t.RunningTimer != nil {
|
|
return
|
|
}
|
|
|
|
runningTimer := time.NewTimer(time.Until(t.TriggerAt))
|
|
t.RunningTimer = runningTimer
|
|
|
|
observability.Go(ctx, func(ctx context.Context) {
|
|
select {
|
|
case <-ctx.Done():
|
|
t.stop(ctx)
|
|
return
|
|
case <-runningTimer.C:
|
|
}
|
|
t.Do(ctx, func() {
|
|
t.stop(ctx)
|
|
t.trigger(ctx)
|
|
})
|
|
})
|
|
}
|
|
|
|
func (t *Timer) stop(ctx context.Context) {
|
|
logger.Debugf(ctx, "stop")
|
|
defer logger.Debugf(ctx, "/stop")
|
|
|
|
if t.RunningTimer == nil {
|
|
return
|
|
}
|
|
|
|
t.RunningTimer.Stop()
|
|
t.RunningTimer = nil
|
|
}
|
|
|
|
func (t *Timer) Trigger(ctx context.Context) {
|
|
logger.Debugf(ctx, "Trigger")
|
|
defer logger.Debugf(ctx, "/Trigger")
|
|
|
|
t.Do(ctx, func() {
|
|
t.trigger(ctx)
|
|
})
|
|
}
|
|
|
|
func (t *Timer) trigger(ctx context.Context) {
|
|
logger.Debugf(ctx, "trigger (%T)", t.Timer.Action)
|
|
defer logger.Debugf(ctx, "/trigger (%T)", t.Timer.Action)
|
|
|
|
observability.Go(ctx, func(ctx context.Context) {
|
|
err := t.StreamD.RemoveTimer(ctx, t.Timer.ID)
|
|
if err != nil {
|
|
logger.Error(ctx, "unable to remove timer %d: %v", t.Timer.ID, err)
|
|
}
|
|
})
|
|
|
|
err := t.StreamD.doAction(ctx, t.Timer.Action, nil)
|
|
if err != nil {
|
|
logger.Errorf(ctx, "unable to perform action %#+v: %v", t.Timer.Action, err)
|
|
}
|
|
}
|