update: reworked watching workflow execution to keep watching until the workflow is done or failed.

This commit is contained in:
Andrey Melnikov
2020-05-20 11:23:03 -07:00
parent dc1972a808
commit f3deea797b
2 changed files with 58 additions and 85 deletions

View File

@@ -637,7 +637,7 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Workflow template not found.")
}).Errorf("Workflow execution not found for namespace: %v, uid: %v).", namespace, uid)
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
@@ -655,58 +655,52 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow
}
workflowWatcher := make(chan *WorkflowExecution)
ticker := time.NewTicker(time.Second)
go func() {
var workflow *wfv1.Workflow
ok := true
var next watch.Event
done := false
for {
select {
case next = <-watcher.ResultChan():
// -------
if next.Type == "" {
workflow, err = c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{})
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Unable to get workflow.")
}
if workflow.Status.Phase == wfv1.NodeRunning {
watcher, err = c.ArgoprojV1alpha1().Workflows(namespace).Watch(metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Watch Workflow error.")
} else {
continue
}
}
for !done {
for next = range watcher.ResultChan() {
workflow, ok := next.Object.(*wfv1.Workflow)
if !ok {
done = true
break
}
if workflow == nil {
continue
}
// -------
workflow, ok = next.Object.(*wfv1.Workflow)
case <-ticker.C:
time.Sleep(time.Millisecond * 200)
continue
manifest, err := json.Marshal(workflow)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error with trying to JSON Marshal workflow.Status.")
done = true
break
}
workflowWatcher <- &WorkflowExecution{
CreatedAt: workflow.CreationTimestamp.UTC(),
StartedAt: ptr.Time(workflow.Status.StartedAt.UTC()),
FinishedAt: ptr.Time(workflow.Status.FinishedAt.UTC()),
UID: string(workflow.UID),
Name: workflow.Name,
Manifest: string(manifest),
}
if !workflow.Status.FinishedAt.IsZero() {
done = true
break
}
}
if workflow == nil && ok {
continue
}
if workflow == nil && !ok {
workflow, err = c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{})
// We want to continue to watch the workflow until it is done, or an error occurred
// If it is not done, create a new watch and continue watching.
if !done {
workflow, err := c.ArgoprojV1alpha1().Workflows(namespace).Get(uid, metav1.GetOptions{})
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
@@ -715,46 +709,32 @@ func (c *Client) WatchWorkflowExecution(namespace, uid string) (<-chan *Workflow
"Error": err.Error(),
}).Error("Unable to get workflow.")
done = true
break
}
if workflow == nil {
if workflow.Status.Phase == wfv1.NodeRunning {
watcher, err = c.ArgoprojV1alpha1().Workflows(namespace).Watch(metav1.ListOptions{
FieldSelector: fieldSelector.String(),
})
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Error": err.Error(),
}).Error("Watch Workflow error.")
done = true
break
}
} else {
done = true
break
}
}
if workflow == nil {
log.Printf("We hit a bad spot where the workflow is nil")
}
manifest, err := json.Marshal(workflow)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"Workflow": workflow,
"Error": err.Error(),
}).Error("Error with trying to JSON Marshal workflow.Status.")
break
}
workflowWatcher <- &WorkflowExecution{
CreatedAt: workflow.CreationTimestamp.UTC(),
StartedAt: ptr.Time(workflow.Status.StartedAt.UTC()),
FinishedAt: ptr.Time(workflow.Status.FinishedAt.UTC()),
UID: string(workflow.UID),
Name: workflow.Name,
Manifest: string(manifest),
}
if !workflow.Status.FinishedAt.IsZero() || !ok {
break
}
}
close(workflowWatcher)
watcher.Stop()
ticker.Stop()
close(workflowWatcher)
}()
return workflowWatcher, nil

View File

@@ -180,14 +180,7 @@ func (s *WorkflowServer) WatchWorkflowExecution(req *api.WatchWorkflowExecutionR
return err
}
wf := &v1.WorkflowExecution{}
ticker := time.NewTicker(time.Second)
for {
select {
case wf = <-watcher:
case <-ticker.C:
}
for wf := range watcher {
if wf == nil {
break
}