Added code to get the log object from GCS.

Added switch statement for GCS versus S3.
This commit is contained in:
Aleksandr Melnikov
2020-07-08 13:17:07 -07:00
parent 8d662d6ce0
commit 66431a21eb

View File

@@ -2,6 +2,7 @@ package v1
import (
"bufio"
"cloud.google.com/go/storage"
"encoding/json"
"errors"
"fmt"
@@ -10,6 +11,7 @@ import (
"github.com/onepanelio/core/pkg/util/pagination"
"github.com/onepanelio/core/pkg/util/ptr"
uid2 "github.com/onepanelio/core/pkg/util/uid"
"golang.org/x/net/context"
"gopkg.in/yaml.v2"
"io"
"io/ioutil"
@@ -792,6 +794,7 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
var (
stream io.ReadCloser
s3Client *s3.Client
gcsClient *storage.Client
config *NamespaceConfig
endOffset int
)
@@ -809,37 +812,59 @@ func (c *Client) GetWorkflowExecutionLogs(namespace, uid, podName, containerName
return nil, util.NewUserError(codes.NotFound, "Can't get configuration.")
}
s3Client, err = c.GetS3Client(namespace, config.ArtifactRepository.S3)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't connect to S3 storage.")
return nil, util.NewUserError(codes.NotFound, "Can't connect to S3 storage.")
}
switch {
case config.ArtifactRepository.S3 != nil:
{
s3Client, err = c.GetS3Client(namespace, config.ArtifactRepository.S3)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't connect to S3 storage.")
return nil, util.NewUserError(codes.NotFound, "Can't connect to S3 storage.")
}
opts := s3.GetObjectOptions{}
endOffset, err = strconv.Atoi(readEndOffset)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, "Invalid range.")
}
err = opts.SetRange(0, int64(endOffset))
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't set range.")
return nil, util.NewUserError(codes.NotFound, "Can't connect to S3 storage.")
}
opts := s3.GetObjectOptions{}
endOffset, err = strconv.Atoi(readEndOffset)
if err != nil {
return nil, util.NewUserError(codes.InvalidArgument, "Invalid range.")
}
err = opts.SetRange(0, int64(endOffset))
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't set range.")
return nil, util.NewUserError(codes.NotFound, "Can't connect to S3 storage.")
}
key := config.ArtifactRepository.S3.FormatKey(namespace, uid, podName) + "/" + containerName + ".log"
stream, err = s3Client.GetObject(config.ArtifactRepository.S3.Bucket, key, opts)
key := config.ArtifactRepository.S3.FormatKey(namespace, uid, podName) + "/" + containerName + ".log"
stream, err = s3Client.GetObject(config.ArtifactRepository.S3.Bucket, key, opts)
}
case config.ArtifactRepository.GCS != nil:
{
ctx := context.Background()
gcsClient, err = c.GetGCSClient(namespace, config.ArtifactRepository.GCS)
if err != nil {
log.WithFields(log.Fields{
"Namespace": namespace,
"UID": uid,
"PodName": podName,
"ContainerName": containerName,
"Error": err.Error(),
}).Error("Can't connect to GCS storage.")
return nil, util.NewUserError(codes.NotFound, "Can't connect to GCS storage.")
}
key := config.ArtifactRepository.GCS.FormatKey(namespace, uid, podName) + "/" + containerName + ".log"
stream, err = gcsClient.Bucket(config.ArtifactRepository.GCS.Bucket).Object(key).NewReader(ctx)
}
}
} else {
stream, err = c.CoreV1().Pods(namespace).GetLogs(podName, &corev1.PodLogOptions{
Container: containerName,