initial implementation of watchworkflow

This commit is contained in:
rushtehrani
2019-12-16 17:17:53 -08:00
parent 07f8e0281e
commit 66d10fac40
2 changed files with 51 additions and 0 deletions

View File

@@ -10,6 +10,7 @@ import (
"github.com/onepanelio/core/util"
"github.com/spf13/viper"
"google.golang.org/grpc/codes"
"k8s.io/apimachinery/pkg/watch"
)
func (r *ResourceManager) CreateWorkflow(namespace string, workflow *model.Workflow) (*model.Workflow, error) {
@@ -82,6 +83,15 @@ func (r *ResourceManager) GetWorkflow(namespace, name string) (workflow *model.W
return
}
func (r *ResourceManager) WatchWorkflow(namespace, name string) (watcher watch.Interface, err error) {
watcher, err = r.argClient.WatchWorkflow(name, &argo.Options{Namespace: namespace})
if err != nil {
return nil, util.NewUserError(codes.NotFound, "Workflow not found.")
}
return
}
func (r *ResourceManager) ListWorkflows(namespace, workflowTemplateUID string) (workflows []*model.Workflow, err error) {
opts := &argo.Options{
Namespace: namespace,

View File

@@ -2,9 +2,12 @@ package server
import (
"context"
"encoding/json"
"errors"
"time"
"github.com/onepanelio/core/api"
"github.com/onepanelio/core/argo"
"github.com/onepanelio/core/manager"
"github.com/onepanelio/core/model"
"github.com/onepanelio/core/util"
@@ -80,6 +83,44 @@ func (s *WorkflowServer) GetWorkflow(ctx context.Context, req *api.GetWorkflowRe
return apiWorkflow(wf), nil
}
func (s *WorkflowServer) WatchWorkflow(req *api.WatchWorkflowRequest, stream api.WorkflowService_WatchWorkflowServer) error {
watcher, err := s.resourceManager.WatchWorkflow(req.Namespace, req.Name)
if errors.As(err, &userError) {
return userError.GRPCError()
}
wf := &argo.Workflow{}
ticker := time.NewTicker(time.Second)
for {
select {
case next := <-watcher.ResultChan():
wf, _ = next.Object.(*argo.Workflow)
case <-ticker.C:
}
if wf == nil {
continue
}
status, err := json.Marshal(wf.Status)
if err != nil {
return err
}
if err := stream.Send(&api.Workflow{
Name: wf.ObjectMeta.Name,
Status: string(status),
}); err != nil {
return err
}
if !wf.Status.FinishedAt.IsZero() {
break
}
}
watcher.Stop()
return err
}
func (s *WorkflowServer) ListWorkflows(ctx context.Context, req *api.ListWorkflowsRequest) (*api.ListWorkflowsResponse, error) {
workflows, err := s.resourceManager.ListWorkflows(req.Namespace, req.WorkflowTemplateUid)
if errors.As(err, &userError) {