feat: add vendor

This commit is contained in:
fengcaiwen
2023-10-21 20:48:01 +08:00
parent 1c637f45b5
commit 6a6c4e3257
8701 changed files with 2400820 additions and 1 deletions

View File

@@ -0,0 +1,226 @@
package delta
import (
"context"
"errors"
"strconv"
"sync/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
// Server is a wrapper interface which is meant to hold the proper stream handler for each xDS protocol.
type Server interface {
DeltaStreamHandler(stream stream.DeltaStream, typeURL string) error
}
type Callbacks interface {
// OnDeltaStreamOpen is called once an incremental xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnDeltaStreamOpen(context.Context, int64, string) error
// OnDeltaStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnDeltaStreamClosed(int64)
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
// OnStreamDelatResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
}
var deltaErrorResponse = &cache.RawDeltaResponse{}
type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
// total stream count for counting bi-di streams
streamCount int64
ctx context.Context
}
// NewServer creates a delta xDS specific server which utilizes a ConfigWatcher and delta Callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
return &server{
cache: config,
callbacks: callbacks,
ctx: ctx,
}
}
func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.DeltaDiscoveryRequest, defaultTypeURL string) error {
streamID := atomic.AddInt64(&s.streamCount, 1)
// streamNonce holds a unique nonce for req-resp pairs per xDS stream.
var streamNonce int64
// a collection of stack allocated watches per request type
watches := newWatches()
defer func() {
watches.Cancel()
if s.callbacks != nil {
s.callbacks.OnDeltaStreamClosed(streamID)
}
}()
// Sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
}
response, err := resp.GetDeltaDiscoveryResponse()
if err != nil {
return "", err
}
streamNonce = streamNonce + 1
response.Nonce = strconv.FormatInt(streamNonce, 10)
if s.callbacks != nil {
s.callbacks.OnStreamDeltaResponse(streamID, resp.GetDeltaRequest(), response)
}
return response.Nonce, str.Send(response)
}
if s.callbacks != nil {
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
}
}
var node = &core.Node{}
for {
select {
case <-s.ctx.Done():
return nil
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}
nonce, err := send(resp)
if err != nil {
return err
}
watch := watches.deltaWatches[typ]
watch.nonce = nonce
watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
}
}
// The node information might only be set on the first incoming delta discovery request, so store it here so we can
// reset it on subsequent requests that omit it.
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
// type URL is required for ADS but is implicit for any other xDS stream
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
typeURL := req.GetTypeUrl()
// cancel existing watch to (re-)request a newer version
watch, ok := watches.deltaWatches[typeURL]
if !ok {
// Initialize the state of the stream.
// Since there was no previous state, we know we're handling the first request of this type
// so we set the initial resource versions if we have any, and also signal if this stream is in wildcard mode.
watch.state = stream.NewStreamState(len(req.GetResourceNamesSubscribe()) == 0, req.GetInitialResourceVersions())
} else {
watch.Cancel()
}
s.subscribe(req.GetResourceNamesSubscribe(), watch.state.GetResourceVersions())
s.unsubscribe(req.GetResourceNamesUnsubscribe(), watch.state.GetResourceVersions())
watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watches.deltaWatches[typeURL] = watch
go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
}
}
func (s *server) DeltaStreamHandler(str stream.DeltaStream, typeURL string) error {
// a channel for receiving incoming delta requests
reqCh := make(chan *discovery.DeltaDiscoveryRequest)
// we need to concurrently handle incoming requests since we kick off processDelta as a return
go func() {
for {
select {
case <-str.Context().Done():
close(reqCh)
return
default:
req, err := str.Recv()
if err != nil {
close(reqCh)
return
}
reqCh <- req
}
}
}()
return s.processDelta(str, reqCh, typeURL)
}
// When we subscribe, we just want to make the cache know we are subscribing to a resource.
// Providing a name with an empty version is enough to make that happen.
func (s *server) subscribe(resources []string, sv map[string]string) {
for _, resource := range resources {
sv[resource] = ""
}
}
// Unsubscriptions remove resources from the stream state to
// indicate to the cache that we don't care about the resource anymore
func (s *server) unsubscribe(resources []string, sv map[string]string) {
for _, resource := range resources {
delete(sv, resource)
}
}

View File

@@ -0,0 +1,50 @@
package delta
import (
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
// watches for all delta xDS resource types
type watches struct {
deltaWatches map[string]watch
// Opaque resources share a muxed channel
deltaMuxedResponses chan cache.DeltaResponse
}
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)),
}
}
// Cancel all watches
func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
if watch.cancel != nil {
watch.cancel()
}
}
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
state stream.StreamState
}
// Cancel calls terminate and cancel
func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
close(w.responses)
}

View File

@@ -0,0 +1,65 @@
// Copyright 2020 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package rest provides an implementation of REST-JSON part of XDS server
package rest
import (
"context"
"errors"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)
type Server interface {
Fetch(context.Context, *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error)
}
type Callbacks interface {
// OnFetchRequest is called for each Fetch request. Returning an error will end processing of the
// request and respond with an error.
OnFetchRequest(context.Context, *discovery.DiscoveryRequest) error
// OnFetchResponse is called immediately prior to sending a response.
OnFetchResponse(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}
func NewServer(config cache.ConfigFetcher, callbacks Callbacks) Server {
return &server{cache: config, callbacks: callbacks}
}
type server struct {
cache cache.ConfigFetcher
callbacks Callbacks
}
func (s *server) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if s.callbacks != nil {
if err := s.callbacks.OnFetchRequest(ctx, req); err != nil {
return nil, err
}
}
resp, err := s.cache.Fetch(ctx, req)
if err != nil {
return nil, err
}
if resp == nil {
return nil, errors.New("missing response")
}
out, err := resp.GetDiscoveryResponse()
if s.callbacks != nil {
s.callbacks.OnFetchResponse(req, out)
}
return out, err
}

View File

@@ -0,0 +1,257 @@
// Copyright 2020 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package sotw provides an implementation of GRPC SoTW (State of The World) part of XDS server
package sotw
import (
"context"
"errors"
"reflect"
"strconv"
"sync/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
)
type Server interface {
StreamHandler(stream stream.Stream, typeURL string) error
}
type Callbacks interface {
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamOpen(context.Context, int64, string) error
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
OnStreamClosed(int64)
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamRequest(int64, *discovery.DiscoveryRequest) error
// OnStreamResponse is called immediately prior to sending a response on a stream.
OnStreamResponse(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}
// NewServer creates handlers from a config watcher and callbacks.
func NewServer(ctx context.Context, config cache.ConfigWatcher, callbacks Callbacks) Server {
return &server{cache: config, callbacks: callbacks, ctx: ctx}
}
type server struct {
cache cache.ConfigWatcher
callbacks Callbacks
ctx context.Context
// streamCount for counting bi-di streams
streamCount int64
}
// Discovery response that is sent over GRPC stream
// We need to record what resource names are already sent to a client
// So if the client requests a new name we can respond back
// regardless current snapshot version (even if it is not changed yet)
type lastDiscoveryResponse struct {
nonce string
resources map[string]struct{}
}
// process handles a bi-di stream request
func (s *server) process(str stream.Stream, reqCh <-chan *discovery.DiscoveryRequest, defaultTypeURL string) error {
// increment stream count
streamID := atomic.AddInt64(&s.streamCount, 1)
// unique nonce generator for req-resp pairs per xDS stream; the server
// ignores stale nonces. nonce is only modified within send() function.
var streamNonce int64
streamState := stream.NewStreamState(false, map[string]string{})
lastDiscoveryResponses := map[string]lastDiscoveryResponse{}
// a collection of stack allocated watches per request type
watches := newWatches()
defer func() {
watches.close()
if s.callbacks != nil {
s.callbacks.OnStreamClosed(streamID)
}
}()
// sends a response by serializing to protobuf Any
send := func(resp cache.Response) (string, error) {
if resp == nil {
return "", errors.New("missing response")
}
out, err := resp.GetDiscoveryResponse()
if err != nil {
return "", err
}
// increment nonce
streamNonce = streamNonce + 1
out.Nonce = strconv.FormatInt(streamNonce, 10)
lastResponse := lastDiscoveryResponse{
nonce: out.Nonce,
resources: make(map[string]struct{}),
}
for _, r := range resp.GetRequest().ResourceNames {
lastResponse.resources[r] = struct{}{}
}
lastDiscoveryResponses[resp.GetRequest().TypeUrl] = lastResponse
if s.callbacks != nil {
s.callbacks.OnStreamResponse(resp.GetContext(), streamID, resp.GetRequest(), out)
}
return out.Nonce, str.Send(out)
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
}
}
// node may only be set on the first discovery request
var node = &core.Node{}
// recompute dynamic channels for this stream
watches.recompute(s.ctx, reqCh)
for {
// The list of select cases looks like this:
// 0: <- ctx.Done
// 1: <- reqCh
// 2...: per type watches
index, value, ok := reflect.Select(watches.cases)
switch index {
// ctx.Done() -> if we receive a value here we return as no further computation is needed
case 0:
return nil
// Case 1 handles any request inbound on the stream and handles all initialization as needed
case 1:
// input stream ended or errored out
if !ok {
return nil
}
req := value.Interface().(*discovery.DiscoveryRequest)
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// node field in discovery request is delta-compressed
if req.Node != nil {
node = req.Node
} else {
req.Node = node
}
// nonces can be reused across streams; we verify nonce only if nonce is not initialized
nonce := req.GetResponseNonce()
// type URL is required for ADS but is implicit for xDS
if defaultTypeURL == resource.AnyType {
if req.TypeUrl == "" {
return status.Errorf(codes.InvalidArgument, "type URL is required for ADS")
}
} else if req.TypeUrl == "" {
req.TypeUrl = defaultTypeURL
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamRequest(streamID, req); err != nil {
return err
}
}
if lastResponse, ok := lastDiscoveryResponses[req.TypeUrl]; ok {
if lastResponse.nonce == "" || lastResponse.nonce == nonce {
// Let's record Resource names that a client has received.
streamState.SetKnownResourceNames(req.TypeUrl, lastResponse.resources)
}
}
typeURL := req.GetTypeUrl()
responder := make(chan cache.Response, 1)
if w, ok := watches.responders[typeURL]; ok {
// We've found a pre-existing watch, lets check and update if needed.
// If these requirements aren't satisfied, leave an open watch.
if w.nonce == "" || w.nonce == nonce {
w.close()
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
} else {
// No pre-existing watch exists, let's create one.
// We need to precompute the watches first then open a watch in the cache.
watches.addWatch(typeURL, &watch{
cancel: s.cache.CreateWatch(req, streamState, responder),
response: responder,
})
}
// Recompute the dynamic select cases for this stream.
watches.recompute(s.ctx, reqCh)
default:
// Channel n -> these are the dynamic list of responders that correspond to the stream request typeURL
if !ok {
// Receiver channel was closed. TODO(jpeach): probably cancel the watch or something?
return status.Errorf(codes.Unavailable, "resource watch %d -> failed", index)
}
res := value.Interface().(cache.Response)
nonce, err := send(res)
if err != nil {
return err
}
watches.responders[res.GetRequest().TypeUrl].nonce = nonce
}
}
}
// StreamHandler converts a blocking read call to channels and initiates stream processing
func (s *server) StreamHandler(stream stream.Stream, typeURL string) error {
// a channel for receiving incoming requests
reqCh := make(chan *discovery.DiscoveryRequest)
go func() {
defer close(reqCh)
for {
req, err := stream.Recv()
if err != nil {
return
}
select {
case reqCh <- req:
case <-stream.Context().Done():
return
case <-s.ctx.Done():
return
}
}
}()
return s.process(stream, reqCh, typeURL)
}

View File

@@ -0,0 +1,75 @@
package sotw
import (
"context"
"reflect"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)
// watches for all xDS resource types
type watches struct {
responders map[string]*watch
// cases is a dynamic select case for the watched channels.
cases []reflect.SelectCase
}
// newWatches creates and initializes watches.
func newWatches() watches {
return watches{
responders: make(map[string]*watch, int(types.UnknownType)),
cases: make([]reflect.SelectCase, 0),
}
}
// addWatch creates a new watch entry in the watches map.
// Watches are sorted by typeURL.
func (w *watches) addWatch(typeURL string, watch *watch) {
w.responders[typeURL] = watch
}
// close all open watches
func (w *watches) close() {
for _, watch := range w.responders {
watch.close()
}
}
// recomputeWatches rebuilds the known list of dynamic channels if needed
func (w *watches) recompute(ctx context.Context, req <-chan *discovery.DiscoveryRequest) {
w.cases = w.cases[:0] // Clear the existing cases while retaining capacity.
w.cases = append(w.cases,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ctx.Done()),
}, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(req),
},
)
for _, watch := range w.responders {
w.cases = append(w.cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(watch.response),
})
}
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
cancel func()
nonce string
response chan cache.Response
}
// close cancels an open watch
func (w *watch) close() {
if w.cancel != nil {
w.cancel()
}
}

View File

@@ -0,0 +1,87 @@
package stream
import (
"google.golang.org/grpc"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
)
// Generic RPC stream.
type Stream interface {
grpc.ServerStream
Send(*discovery.DiscoveryResponse) error
Recv() (*discovery.DiscoveryRequest, error)
}
type DeltaStream interface {
grpc.ServerStream
Send(*discovery.DeltaDiscoveryResponse) error
Recv() (*discovery.DeltaDiscoveryRequest, error)
}
// StreamState will keep track of resource state per type on a stream.
type StreamState struct { // nolint:golint,revive
// Indicates whether the original DeltaRequest was a wildcard LDS/RDS request.
wildcard bool
// ResourceVersions contains a hash of the resource as the value and the resource name as the key.
// This field stores the last state sent to the client.
resourceVersions map[string]string
// knownResourceNames contains resource names that a client has received previously
knownResourceNames map[string]map[string]struct{}
// indicates whether the object has beed modified since its creation
first bool
}
func (s *StreamState) GetResourceVersions() map[string]string {
return s.resourceVersions
}
func (s *StreamState) SetResourceVersions(resourceVersions map[string]string) {
s.first = false
s.resourceVersions = resourceVersions
}
func (s *StreamState) IsFirst() bool {
return s.first
}
func (s *StreamState) IsWildcard() bool {
return s.wildcard
}
func (s *StreamState) SetKnownResourceNames(url string, names map[string]struct{}) {
s.knownResourceNames[url] = names
}
func (s *StreamState) SetKnownResourceNamesAsList(url string, names []string) {
m := map[string]struct{}{}
for _, name := range names {
m[name] = struct{}{}
}
s.knownResourceNames[url] = m
}
func (s *StreamState) GetKnownResourceNames(url string) map[string]struct{} {
return s.knownResourceNames[url]
}
// NewStreamState initializes a stream state.
func NewStreamState(wildcard bool, initialResourceVersions map[string]string) StreamState {
state := StreamState{
wildcard: wildcard,
resourceVersions: initialResourceVersions,
first: true,
knownResourceNames: map[string]map[string]struct{}{},
}
if initialResourceVersions == nil {
state.resourceVersions = make(map[string]string)
}
return state
}

View File

@@ -0,0 +1,98 @@
// Copyright 2018 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"errors"
"fmt"
"io/ioutil"
"net/http"
"path"
"google.golang.org/protobuf/encoding/protojson"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
)
// HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
// specialized to Envoy xDS API.
type HTTPGateway struct {
// Server is the underlying gRPC server
Server Server
}
func (h *HTTPGateway) ServeHTTP(req *http.Request) ([]byte, int, error) {
p := path.Clean(req.URL.Path)
typeURL := ""
switch p {
case resource.FetchEndpoints:
typeURL = resource.EndpointType
case resource.FetchClusters:
typeURL = resource.ClusterType
case resource.FetchListeners:
typeURL = resource.ListenerType
case resource.FetchRoutes:
typeURL = resource.RouteType
case resource.FetchScopedRoutes:
typeURL = resource.ScopedRouteType
case resource.FetchSecrets:
typeURL = resource.SecretType
case resource.FetchRuntimes:
typeURL = resource.RuntimeType
case resource.FetchExtensionConfigs:
typeURL = resource.ExtensionConfigType
default:
return nil, http.StatusNotFound, fmt.Errorf("no endpoint")
}
if req.Body == nil {
return nil, http.StatusBadRequest, fmt.Errorf("empty body")
}
body, err := ioutil.ReadAll(req.Body)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("cannot read body")
}
// parse as JSON
out := &discovery.DiscoveryRequest{}
err = protojson.Unmarshal(body, out)
if err != nil {
return nil, http.StatusBadRequest, fmt.Errorf("cannot parse JSON body: " + err.Error())
}
out.TypeUrl = typeURL
// fetch results
res, err := h.Server.Fetch(req.Context(), out)
if err != nil {
// SkipFetchErrors will return a 304 which will signify to the envoy client that
// it is already at the latest version; all other errors will 500 with a message.
var skip *types.SkipFetchError
if ok := errors.As(err, &skip); ok {
return nil, http.StatusNotModified, nil
}
return nil, http.StatusInternalServerError, fmt.Errorf("fetch error: " + err.Error())
}
b, err := protojson.MarshalOptions{UseProtoNames: true}.Marshal(res)
if err != nil {
return nil, http.StatusInternalServerError, fmt.Errorf("marshal error: " + err.Error())
}
return b, http.StatusOK, nil
}

View File

@@ -0,0 +1,337 @@
// Copyright 2018 Envoyproxy Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package server provides an implementation of a streaming xDS server.
package server
import (
"context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/envoyproxy/go-control-plane/pkg/server/delta/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/rest/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/sotw/v3"
"github.com/envoyproxy/go-control-plane/pkg/server/stream/v3"
clusterservice "github.com/envoyproxy/go-control-plane/envoy/service/cluster/v3"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
discoverygrpc "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
endpointservice "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
extensionconfigservice "github.com/envoyproxy/go-control-plane/envoy/service/extension/v3"
listenerservice "github.com/envoyproxy/go-control-plane/envoy/service/listener/v3"
routeservice "github.com/envoyproxy/go-control-plane/envoy/service/route/v3"
runtimeservice "github.com/envoyproxy/go-control-plane/envoy/service/runtime/v3"
secretservice "github.com/envoyproxy/go-control-plane/envoy/service/secret/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/v3"
"github.com/envoyproxy/go-control-plane/pkg/resource/v3"
)
// Server is a collection of handlers for streaming discovery requests.
type Server interface {
endpointservice.EndpointDiscoveryServiceServer
clusterservice.ClusterDiscoveryServiceServer
routeservice.RouteDiscoveryServiceServer
routeservice.ScopedRoutesDiscoveryServiceServer
routeservice.VirtualHostDiscoveryServiceServer
listenerservice.ListenerDiscoveryServiceServer
discoverygrpc.AggregatedDiscoveryServiceServer
secretservice.SecretDiscoveryServiceServer
runtimeservice.RuntimeDiscoveryServiceServer
extensionconfigservice.ExtensionConfigDiscoveryServiceServer
rest.Server
sotw.Server
delta.Server
}
// Callbacks is a collection of callbacks inserted into the server operation.
// The callbacks are invoked synchronously.
type Callbacks interface {
rest.Callbacks
sotw.Callbacks
delta.Callbacks
}
// CallbackFuncs is a convenience type for implementing the Callbacks interface.
type CallbackFuncs struct {
StreamOpenFunc func(context.Context, int64, string) error
StreamClosedFunc func(int64)
DeltaStreamOpenFunc func(context.Context, int64, string) error
DeltaStreamClosedFunc func(int64)
StreamRequestFunc func(int64, *discovery.DiscoveryRequest) error
StreamResponseFunc func(context.Context, int64, *discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
StreamDeltaRequestFunc func(int64, *discovery.DeltaDiscoveryRequest) error
StreamDeltaResponseFunc func(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
FetchRequestFunc func(context.Context, *discovery.DiscoveryRequest) error
FetchResponseFunc func(*discovery.DiscoveryRequest, *discovery.DiscoveryResponse)
}
var _ Callbacks = CallbackFuncs{}
// OnStreamOpen invokes StreamOpenFunc.
func (c CallbackFuncs) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
if c.StreamOpenFunc != nil {
return c.StreamOpenFunc(ctx, streamID, typeURL)
}
return nil
}
// OnStreamClosed invokes StreamClosedFunc.
func (c CallbackFuncs) OnStreamClosed(streamID int64) {
if c.StreamClosedFunc != nil {
c.StreamClosedFunc(streamID)
}
}
// OnDeltaStreamOpen invokes DeltaStreamOpenFunc.
func (c CallbackFuncs) OnDeltaStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
if c.DeltaStreamOpenFunc != nil {
return c.DeltaStreamOpenFunc(ctx, streamID, typeURL)
}
return nil
}
// OnDeltaStreamClosed invokes DeltaStreamClosedFunc.
func (c CallbackFuncs) OnDeltaStreamClosed(streamID int64) {
if c.DeltaStreamClosedFunc != nil {
c.DeltaStreamClosedFunc(streamID)
}
}
// OnStreamRequest invokes StreamRequestFunc.
func (c CallbackFuncs) OnStreamRequest(streamID int64, req *discovery.DiscoveryRequest) error {
if c.StreamRequestFunc != nil {
return c.StreamRequestFunc(streamID, req)
}
return nil
}
// OnStreamResponse invokes StreamResponseFunc.
func (c CallbackFuncs) OnStreamResponse(ctx context.Context, streamID int64, req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
if c.StreamResponseFunc != nil {
c.StreamResponseFunc(ctx, streamID, req, resp)
}
}
// OnStreamDeltaRequest invokes StreamDeltaResponseFunc
func (c CallbackFuncs) OnStreamDeltaRequest(streamID int64, req *discovery.DeltaDiscoveryRequest) error {
if c.StreamDeltaRequestFunc != nil {
return c.StreamDeltaRequestFunc(streamID, req)
}
return nil
}
// OnStreamDeltaResponse invokes StreamDeltaResponseFunc.
func (c CallbackFuncs) OnStreamDeltaResponse(streamID int64, req *discovery.DeltaDiscoveryRequest, resp *discovery.DeltaDiscoveryResponse) {
if c.StreamDeltaResponseFunc != nil {
c.StreamDeltaResponseFunc(streamID, req, resp)
}
}
// OnFetchRequest invokes FetchRequestFunc.
func (c CallbackFuncs) OnFetchRequest(ctx context.Context, req *discovery.DiscoveryRequest) error {
if c.FetchRequestFunc != nil {
return c.FetchRequestFunc(ctx, req)
}
return nil
}
// OnFetchResponse invoked FetchResponseFunc.
func (c CallbackFuncs) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
if c.FetchResponseFunc != nil {
c.FetchResponseFunc(req, resp)
}
}
// NewServer creates handlers from a config watcher and callbacks.
func NewServer(ctx context.Context, config cache.Cache, callbacks Callbacks) Server {
return NewServerAdvanced(rest.NewServer(config, callbacks),
sotw.NewServer(ctx, config, callbacks),
delta.NewServer(ctx, config, callbacks),
)
}
func NewServerAdvanced(restServer rest.Server, sotwServer sotw.Server, deltaServer delta.Server) Server {
return &server{rest: restServer, sotw: sotwServer, delta: deltaServer}
}
type server struct {
rest rest.Server
sotw sotw.Server
delta delta.Server
}
func (s *server) StreamHandler(stream stream.Stream, typeURL string) error {
return s.sotw.StreamHandler(stream, typeURL)
}
func (s *server) StreamAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error {
return s.StreamHandler(stream, resource.AnyType)
}
func (s *server) StreamEndpoints(stream endpointservice.EndpointDiscoveryService_StreamEndpointsServer) error {
return s.StreamHandler(stream, resource.EndpointType)
}
func (s *server) StreamClusters(stream clusterservice.ClusterDiscoveryService_StreamClustersServer) error {
return s.StreamHandler(stream, resource.ClusterType)
}
func (s *server) StreamRoutes(stream routeservice.RouteDiscoveryService_StreamRoutesServer) error {
return s.StreamHandler(stream, resource.RouteType)
}
func (s *server) StreamScopedRoutes(stream routeservice.ScopedRoutesDiscoveryService_StreamScopedRoutesServer) error {
return s.StreamHandler(stream, resource.ScopedRouteType)
}
func (s *server) StreamListeners(stream listenerservice.ListenerDiscoveryService_StreamListenersServer) error {
return s.StreamHandler(stream, resource.ListenerType)
}
func (s *server) StreamSecrets(stream secretservice.SecretDiscoveryService_StreamSecretsServer) error {
return s.StreamHandler(stream, resource.SecretType)
}
func (s *server) StreamRuntime(stream runtimeservice.RuntimeDiscoveryService_StreamRuntimeServer) error {
return s.StreamHandler(stream, resource.RuntimeType)
}
func (s *server) StreamExtensionConfigs(stream extensionconfigservice.ExtensionConfigDiscoveryService_StreamExtensionConfigsServer) error {
return s.StreamHandler(stream, resource.ExtensionConfigType)
}
// VHDS doesn't support SOTW requests, so no handler for it exists.
// Fetch is the universal fetch method.
func (s *server) Fetch(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
return s.rest.Fetch(ctx, req)
}
func (s *server) FetchEndpoints(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.EndpointType
return s.Fetch(ctx, req)
}
func (s *server) FetchClusters(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.ClusterType
return s.Fetch(ctx, req)
}
func (s *server) FetchRoutes(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.RouteType
return s.Fetch(ctx, req)
}
func (s *server) FetchScopedRoutes(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.ScopedRouteType
return s.Fetch(ctx, req)
}
func (s *server) FetchListeners(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.ListenerType
return s.Fetch(ctx, req)
}
func (s *server) FetchSecrets(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.SecretType
return s.Fetch(ctx, req)
}
func (s *server) FetchRuntime(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.RuntimeType
return s.Fetch(ctx, req)
}
func (s *server) FetchExtensionConfigs(ctx context.Context, req *discovery.DiscoveryRequest) (*discovery.DiscoveryResponse, error) {
if req == nil {
return nil, status.Errorf(codes.Unavailable, "empty request")
}
req.TypeUrl = resource.ExtensionConfigType
return s.Fetch(ctx, req)
}
// VHDS doesn't support REST requests, so no handler exists for this.
func (s *server) DeltaStreamHandler(stream stream.DeltaStream, typeURL string) error {
return s.delta.DeltaStreamHandler(stream, typeURL)
}
func (s *server) DeltaAggregatedResources(stream discoverygrpc.AggregatedDiscoveryService_DeltaAggregatedResourcesServer) error {
return s.DeltaStreamHandler(stream, resource.AnyType)
}
func (s *server) DeltaEndpoints(stream endpointservice.EndpointDiscoveryService_DeltaEndpointsServer) error {
return s.DeltaStreamHandler(stream, resource.EndpointType)
}
func (s *server) DeltaClusters(stream clusterservice.ClusterDiscoveryService_DeltaClustersServer) error {
return s.DeltaStreamHandler(stream, resource.ClusterType)
}
func (s *server) DeltaRoutes(stream routeservice.RouteDiscoveryService_DeltaRoutesServer) error {
return s.DeltaStreamHandler(stream, resource.RouteType)
}
func (s *server) DeltaScopedRoutes(stream routeservice.ScopedRoutesDiscoveryService_DeltaScopedRoutesServer) error {
return s.DeltaStreamHandler(stream, resource.ScopedRouteType)
}
func (s *server) DeltaListeners(stream listenerservice.ListenerDiscoveryService_DeltaListenersServer) error {
return s.DeltaStreamHandler(stream, resource.ListenerType)
}
func (s *server) DeltaSecrets(stream secretservice.SecretDiscoveryService_DeltaSecretsServer) error {
return s.DeltaStreamHandler(stream, resource.SecretType)
}
func (s *server) DeltaRuntime(stream runtimeservice.RuntimeDiscoveryService_DeltaRuntimeServer) error {
return s.DeltaStreamHandler(stream, resource.RuntimeType)
}
func (s *server) DeltaExtensionConfigs(stream extensionconfigservice.ExtensionConfigDiscoveryService_DeltaExtensionConfigsServer) error {
return s.DeltaStreamHandler(stream, resource.ExtensionConfigType)
}
func (s *server) DeltaVirtualHosts(stream routeservice.VirtualHostDiscoveryService_DeltaVirtualHostsServer) error {
return s.DeltaStreamHandler(stream, resource.VirtualHostType)
}