feat: updated workflow logs to return a response that may contain several log entries.

This commit is contained in:
Andrey Melnikov
2020-12-30 12:02:25 -08:00
parent ba776cddbd
commit 543367c36e
5 changed files with 559 additions and 474 deletions

View File

@@ -1289,13 +1289,13 @@
"type": "object",
"properties": {
"result": {
"$ref": "#/definitions/LogEntry"
"$ref": "#/definitions/LogStreamResponse"
},
"error": {
"$ref": "#/definitions/google.rpc.Status"
}
},
"title": "Stream result of LogEntry"
"title": "Stream result of LogStreamResponse"
}
},
"default": {
@@ -3543,6 +3543,17 @@
}
}
},
"LogStreamResponse": {
"type": "object",
"properties": {
"logEntries": {
"type": "array",
"items": {
"$ref": "#/definitions/LogEntry"
}
}
}
},
"Metric": {
"type": "object",
"properties": {

File diff suppressed because it is too large Load Diff

View File

@@ -140,7 +140,7 @@ func (c *workflowServiceClient) GetWorkflowExecutionLogs(ctx context.Context, in
}
type WorkflowService_GetWorkflowExecutionLogsClient interface {
Recv() (*LogEntry, error)
Recv() (*LogStreamResponse, error)
grpc.ClientStream
}
@@ -148,8 +148,8 @@ type workflowServiceGetWorkflowExecutionLogsClient struct {
grpc.ClientStream
}
func (x *workflowServiceGetWorkflowExecutionLogsClient) Recv() (*LogEntry, error) {
m := new(LogEntry)
func (x *workflowServiceGetWorkflowExecutionLogsClient) Recv() (*LogStreamResponse, error) {
m := new(LogStreamResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
@@ -460,7 +460,7 @@ func _WorkflowService_GetWorkflowExecutionLogs_Handler(srv interface{}, stream g
}
type WorkflowService_GetWorkflowExecutionLogsServer interface {
Send(*LogEntry) error
Send(*LogStreamResponse) error
grpc.ServerStream
}
@@ -468,7 +468,7 @@ type workflowServiceGetWorkflowExecutionLogsServer struct {
grpc.ServerStream
}
func (x *workflowServiceGetWorkflowExecutionLogsServer) Send(m *LogEntry) error {
func (x *workflowServiceGetWorkflowExecutionLogsServer) Send(m *LogStreamResponse) error {
return x.ServerStream.SendMsg(m)
}

View File

@@ -50,7 +50,7 @@ service WorkflowService {
};
}
rpc GetWorkflowExecutionLogs (GetWorkflowExecutionLogsRequest) returns (stream LogEntry) {
rpc GetWorkflowExecutionLogs (GetWorkflowExecutionLogsRequest) returns (stream LogStreamResponse) {
option (google.api.http) = {
get: "/apis/v1beta1/{namespace}/workflow_executions/{uid}/pods/{podName}/containers/{containerName}/logs"
};
@@ -205,6 +205,10 @@ message ListWorkflowExecutionsResponse {
int32 totalAvailableCount = 6;
}
message LogStreamResponse {
repeated LogEntry logEntries = 1;
}
message LogEntry {
string timestamp = 1;
string content = 2;

View File

@@ -249,14 +249,19 @@ func (s *WorkflowServer) GetWorkflowExecutionLogs(req *api.GetWorkflowExecutionL
break
}
for _, item := range le {
if err := stream.Send(&api.LogEntry{
apiLogEntries := make([]*api.LogEntry, len(le))
for i, item := range le {
apiLogEntries[i] = &api.LogEntry{
Timestamp: item.Timestamp.Format(time.RFC3339),
Content: item.Content,
}); err != nil {
return err
}
}
if err := stream.Send(&api.LogStreamResponse{
LogEntries: apiLogEntries,
}); err != nil {
return err
}
}
return nil