Added pause and continue

This commit is contained in:
Quentin Renard
2018-09-30 13:14:29 +02:00
parent fc431fff93
commit 381b0d2ddf
15 changed files with 339 additions and 86 deletions

87
node.go
View File

@@ -49,12 +49,15 @@ type NodeParent interface {
// Statuses
const (
StatusStarted = "started"
StatusPaused = "paused"
StatusRunning = "running"
StatusStopped = "stopped"
)
// Starter represents an object that can start/stop
// Starter represents an object that can start/pause/continue/stop
type Starter interface {
Continue()
Pause()
Start(ctx context.Context, t CreateTaskFunc)
Status() string
Stop()
@@ -74,9 +77,11 @@ func ConnectNodes(parent, child Node) {
// BaseNode represents a base node
type BaseNode struct {
cancel context.CancelFunc
cancelPause context.CancelFunc
children map[string]Node
childrenStarted map[string]bool
ctx context.Context
ctxPause context.Context
e EmitEventFunc
m *sync.Mutex
md NodeMetadata
@@ -85,6 +90,7 @@ type BaseNode struct {
parents map[string]Node
parentsStarted map[string]bool
s *astistat.Stater
status string
}
// NewBaseNode creates a new base node
@@ -99,6 +105,7 @@ func NewBaseNode(e EmitEventFunc, m NodeMetadata) (n *BaseNode) {
oStop: &sync.Once{},
parents: make(map[string]Node),
parentsStarted: make(map[string]bool),
status: StatusStopped,
}
if e != nil {
n.s = astistat.NewStater(2*time.Second, n.statsHandleFunc)
@@ -122,11 +129,9 @@ type BaseNodeExecFunc func(t *astiworker.Task)
// Status implements the Starter interface
func (n *BaseNode) Status() string {
if n.Context() != nil && n.Context().Err() == nil {
return StatusStarted
} else {
return StatusStopped
}
n.m.Lock()
defer n.m.Unlock()
return n.status
}
// Start starts the node
@@ -157,6 +162,11 @@ func (n *BaseNode) Start(ctx context.Context, tc CreateTaskFunc, execFunc BaseNo
p.ChildIsStarted(n.md)
}
// Update status
n.m.Lock()
n.status = StatusRunning
n.m.Unlock()
// Send event
if n.e != nil {
n.e(Event{
@@ -178,6 +188,13 @@ func (n *BaseNode) Start(ctx context.Context, tc CreateTaskFunc, execFunc BaseNo
})
}
// Make sure the status is updated once everything is done
defer func() {
n.m.Lock()
defer n.m.Unlock()
n.status = StatusStopped
}()
// Let children and parents know the node is stopped
defer func() {
// Loop through children
@@ -223,6 +240,62 @@ func (n *BaseNode) Stop() {
})
}
// Pause implements the Starter interface
func (n *BaseNode) Pause() {
// Status is not running
if n.Status() != StatusRunning {
return
}
// Reset ctx
n.ctxPause, n.cancelPause = context.WithCancel(n.ctx)
// Update status
n.m.Lock()
n.status = StatusPaused
n.m.Unlock()
// Send event
if n.e != nil {
n.e(Event{
Name: EventNameNodePaused,
Payload: n.md.Name,
})
}
}
// Continue implements the Starter interface
func (n *BaseNode) Continue() {
// Cancel ctx
if n.cancelPause != nil {
n.cancelPause()
}
// Update status
n.m.Lock()
n.status = StatusRunning
n.m.Unlock()
// Send event
if n.e != nil {
n.e(Event{
Name: EventNameNodeContinued,
Payload: n.md.Name,
})
}
}
// HandlePause handles the pause
func (n *BaseNode) HandlePause() {
// Status is not paused
if n.Status() != StatusPaused {
return
}
// Wait for ctx to be done
<-n.ctxPause.Done()
}
// AddChild implements the NodeParent interface
func (n *BaseNode) AddChild(i Node) {
n.m.Lock()