k8s: reimplement backend based on CRDs

Signed-off-by: Steffen Vogel <post@steffenvogel.de>
This commit is contained in:
Steffen Vogel
2022-02-03 17:18:53 +01:00
parent 92c7e272ff
commit 520fdf9039
35 changed files with 1860 additions and 269 deletions

42
contrib/crd.yaml Normal file
View File

@@ -0,0 +1,42 @@
---
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
name: signalingenvelopes.wice.riasc.eu
spec:
scope: Namespaced
group: wice.riasc.eu
names:
kind: SignalingEnvelope
plural: signalingenvelopes
singular: signalingenvelope
shortNames:
- env
versions:
- name: v1
served: true
storage: true
schema:
openAPIV3Schema:
type: object
properties:
contents:
type: object
properties:
nonce:
type: string
description: Base64-encoded nonce
pattern: '^[A-Za-z0-9+/]+={0,2}$'
body:
type: string
description: Base64-encoded encrypted message body
pattern: '^[A-Za-z0-9+/]+={0,2}$'
receipient:
type: string
description: Base64-encoded public key of the receipient
pattern: '^[A-Za-z0-9+/]+={0,2}$'
sender:
type: string
description: Base64-encoded public key of the sender
pattern: '^[A-Za-z0-9+/]+={0,2}$'

17
go.mod
View File

@@ -40,9 +40,9 @@ require (
google.golang.org/grpc v1.43.0
google.golang.org/protobuf v1.27.1
gopkg.in/ini.v1 v1.66.3
k8s.io/api v0.23.2
k8s.io/apimachinery v0.23.2
k8s.io/client-go v0.23.2
k8s.io/api v0.23.3 // indirect
k8s.io/apimachinery v0.23.3
k8s.io/client-go v0.23.3
k8s.io/klog/v2 v2.40.1
)
@@ -95,7 +95,7 @@ require (
github.com/marten-seemann/qtls-go1-17 v0.1.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mdlayher/genetlink v1.1.0 // indirect
github.com/mdlayher/netlink v1.5.0 // indirect
github.com/mdlayher/socket v0.1.1 // indirect
@@ -118,7 +118,7 @@ require (
github.com/multiformats/go-multistream v0.2.2 // indirect
github.com/multiformats/go-varint v0.0.6 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/onsi/ginkgo v1.16.4 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.4 // indirect
github.com/pion/mdns v0.0.5 // indirect
@@ -159,17 +159,18 @@ require (
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/tools v0.2.2 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b // indirect
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
kernel.org/pub/linux/libs/security/libcap/cap v1.2.62 // indirect
kernel.org/pub/linux/libs/security/libcap/psx v1.2.62 // indirect
lukechampine.com/blake3 v1.1.7 // indirect
sigs.k8s.io/json v0.0.0-20211020170558-c049b76a60c6 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 // indirect
sigs.k8s.io/yaml v1.2.0 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
require (
github.com/evanphx/json-patch v4.12.0+incompatible // indirect
github.com/libp2p/go-addr-util v0.1.0 // indirect
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect

29
go.sum
View File

@@ -219,6 +219,7 @@ github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.
github.com/envoyproxy/go-control-plane v0.10.1/go.mod h1:AY7fTTXNdv/aJ2O5jwpxAPOWUZ7hQAEvzN5Pf27BkQQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/envoyproxy/protoc-gen-validate v0.6.2/go.mod h1:2t7qjJNvHPx8IjnBOzl9E9/baC+qXE/TeeyBRzgJDws=
github.com/evanphx/json-patch v4.12.0+incompatible h1:4onqiflcdA9EOZ4RxV643DvftH5pOlLGNtQ5lPWQu84=
github.com/evanphx/json-patch v4.12.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fatih/color v1.9.0/go.mod h1:eQcE1qtQxscV5RaZvpXrrb8Drkc3/DdQ+uUYCNjL+zU=
@@ -889,8 +890,9 @@ github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI=
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo=
github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60 h1:tHdB+hQRHU10CfcK0furo6rSNgZ38JT8uPh70c/pFD8=
github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60/go.mod h1:aYbhishWc4Ai3I2U4Gaa2n3kHWSwzme6EsG/46HRQbE=
@@ -1055,8 +1057,9 @@ github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE=
github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042wbpU=
github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
@@ -1971,23 +1974,24 @@ honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9
honnef.co/go/tools v0.2.1/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
honnef.co/go/tools v0.2.2 h1:MNh1AVMyVX23VUHE2O27jm6lNj3vjO5DexS4A1xvnzk=
honnef.co/go/tools v0.2.2/go.mod h1:lPVVZ2BS5TfnjLyizF7o7hv7j9/L+8cZY2hLyjP9cGY=
k8s.io/api v0.23.2 h1:62cpzreV3dCuj0hqPi8r4dyWh48ogMcyh+ga9jEGij4=
k8s.io/api v0.23.2/go.mod h1:sYuDb3flCtRPI8ghn6qFrcK5ZBu2mhbElxRE95qpwlI=
k8s.io/apimachinery v0.23.2 h1:dBmjCOeYBdg2ibcQxMuUq+OopZ9fjfLIR5taP/XKeTs=
k8s.io/apimachinery v0.23.2/go.mod h1:zDqeV0AK62LbCI0CI7KbWCAYdLg+E+8UXJ0rIz5gmS8=
k8s.io/client-go v0.23.2 h1:BNbOcxa99jxHH8mM1cPKGIrrKRnCSAfAtyonYGsbFtE=
k8s.io/client-go v0.23.2/go.mod h1:k3YbsWg6GWdHF1THHTQP88X9RhB1DWPo3Dq7KfU/D1c=
k8s.io/api v0.23.3 h1:KNrME8KHGr12Ozjf8ytOewKzZh6hl/hHUZeHddT3a38=
k8s.io/api v0.23.3/go.mod h1:w258XdGyvCmnBj/vGzQMj6kzdufJZVUwEM1U2fRJwSQ=
k8s.io/apimachinery v0.23.3 h1:7IW6jxNzrXTsP0c8yXz2E5Yx/WTzVPTsHIx/2Vm0cIk=
k8s.io/apimachinery v0.23.3/go.mod h1:BEuFMMBaIbcOqVIJqNZJXGFTP4W6AycEpb5+m/97hrM=
k8s.io/client-go v0.23.3 h1:23QYUmCQ/W6hW78xIwm3XqZrrKZM+LWDqW2zfo+szJs=
k8s.io/client-go v0.23.3/go.mod h1:47oMd+YvAOqZM7pcQ6neJtBiFH7alOyfunYN48VsmwE=
k8s.io/gengo v0.0.0-20210813121822-485abfe95c7c/go.mod h1:FiNAH4ZV3gBg2Kwh89tzAEV2be7d5xI0vBa/VySYy3E=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.2.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y=
k8s.io/klog/v2 v2.30.0/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/klog/v2 v2.40.1 h1:P4RRucWk/lFOlDdkAr3mc7iWFkgKrZY9qZMAgek06S4=
k8s.io/klog/v2 v2.40.1/go.mod h1:y1WjHnz7Dj687irZUWR/WLkLc5N1YHtjLdmgWjndZn0=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 h1:E3J9oCLlaobFUqsjG9DfKbP2BmgwBL2p7pn0A3dG9W4=
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk=
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf h1:M9XBsiMslw2lb2ZzglC0TOkBPK5NQi0/noUrdnoFwUg=
k8s.io/kube-openapi v0.0.0-20220124234850-424119656bbf/go.mod h1:sX9MT8g7NVZM5lVL/j8QyCCJe8YSMW30QvGZWaCIDIk=
k8s.io/utils v0.0.0-20210802155522-efc7438f0176/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b h1:wxEMGetGMur3J1xuGLQY7GEQYg9bZxKn3tKo5k/eYcs=
k8s.io/utils v0.0.0-20210930125809-cb0fa318a74b/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20211116205334-6203023598ed h1:ck1fRPWPJWsMd8ZRFsWc6mh/zHp5fZ/shhbrgPUxDAE=
k8s.io/utils v0.0.0-20211116205334-6203023598ed/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
kernel.org/pub/linux/libs/security/libcap/cap v1.2.62 h1:9iE7a676uhvcBUqPaX1m8vsGPLxWhY6HacTae4fAZ4M=
kernel.org/pub/linux/libs/security/libcap/cap v1.2.62/go.mod h1:IZIlNB3lc6EmaOILJvTigHdE+vE5DrVDEEm6CB133Ak=
kernel.org/pub/linux/libs/security/libcap/psx v1.2.62 h1:Tko+77IH7RReA/jcHqaVgtUJ9zAaXH4I0ZE3YHluBdw=
@@ -2004,8 +2008,9 @@ sigs.k8s.io/structured-merge-diff/v4 v4.0.2/go.mod h1:bJZC9H9iH24zzfZ/41RGcq60oK
sigs.k8s.io/structured-merge-diff/v4 v4.2.1 h1:bKCqE9GvQ5tiVHn5rfn1r+yao3aLQEaLzkkmAkf+A6Y=
sigs.k8s.io/structured-merge-diff/v4 v4.2.1/go.mod h1:j/nl6xW8vLS49O8YvXW1ocPhZawJtm+Yrr7PPRQ0Vg4=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=
sigs.k8s.io/yaml v1.2.0 h1:kr/MCeFWJWTwyaHoR9c8EjH9OumOmoF9YGiZd7lFm/Q=
sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
sigs.k8s.io/yaml v1.3.0 h1:a2VclLzOGrwOHDiV8EfBGhvjHvP46CtW5j6POvhYGGo=
sigs.k8s.io/yaml v1.3.0/go.mod h1:GeOyir5tyXNByN85N/dRIT9es5UQNerPYEKK56eTBm8=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/go-diff v0.5.0/go.mod h1:kuch7UrkMzY0X+p9CRK03kfuPQ2zzQcaEFbx8wA8rck=
sourcegraph.com/sqs/pbtypes v0.0.0-20180604144634-d3ebe8f20ae4/go.mod h1:ketZ/q3QxT9HOBeFhu6RdvsftgpsbFHBF5Cas6cDKZ0=

16
hack/update-codegen.sh Normal file
View File

@@ -0,0 +1,16 @@
#!/usr/bin/env bash
set -o errexit
set -o nounset
set -o pipefail
SCRIPT_ROOT=$(dirname "${BASH_SOURCE[@]}")/..
CODEGEN_PKG="${CODEGEN_PKG:-$(cd "${SCRIPT_ROOT}"; ls -d -1 ./vendor/k8s.io/code-generator 2>/dev/null || echo ../code-generator)}"
echo "Calling ${CODEGEN_PKG}/generate-groups.sh"
"${CODEGEN_PKG}"/generate-groups.sh all \
riasc.eu/wice/pkg/signaling/k8s/client \
riasc.eu/wice/pkg/signaling/k8s/apis \
wice:v1 \
--go-header-file="${CODEGEN_PKG}"/hack/boilerplate.go.txt \
--trim-path-prefix riasc.eu/wice

View File

@@ -0,0 +1,5 @@
package wice
const (
GroupName = "wice.riasc.eu"
)

View File

@@ -0,0 +1,5 @@
// +k8s:deepcopy-gen=package
// +k8s:defaulter-gen=TypeMeta
// +groupName=wice.riasc.eu
package v1

View File

@@ -0,0 +1,38 @@
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"riasc.eu/wice/pkg/signaling/k8s/apis/wice"
)
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: wice.GroupName, Version: "v1"}
// Kind takes an unqualified kind and returns back a Group qualified GroupKind
func Kind(kind string) schema.GroupKind {
return SchemeGroupVersion.WithKind(kind).GroupKind()
}
// Resource takes an unqualified resource and returns a Group qualified GroupResource
func Resource(resource string) schema.GroupResource {
return SchemeGroupVersion.WithResource(resource).GroupResource()
}
var (
// SchemeBuilder initializes a scheme builder
SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes)
// AddToScheme is a global function that registers this API group & version to a scheme
AddToScheme = SchemeBuilder.AddToScheme
)
// Adds the list of known types to Scheme.
func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&SignalingEnvelope{},
&SignalingEnvelopeList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil
}

View File

@@ -0,0 +1,23 @@
package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"riasc.eu/wice/pkg/pb"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type SignalingEnvelope struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
pb.SignalingEnvelope
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
type SignalingEnvelopeList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata,omitempty"`
Items []SignalingEnvelope `json:"items"`
}

View File

@@ -0,0 +1,86 @@
//go:build !ignore_autogenerated
// +build !ignore_autogenerated
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by deepcopy-gen. DO NOT EDIT.
package v1
import (
runtime "k8s.io/apimachinery/pkg/runtime"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SignalingEnvelope) DeepCopyInto(out *SignalingEnvelope) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
in.SignalingEnvelope.DeepCopyInto(&out.SignalingEnvelope)
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SignalingEnvelope.
func (in *SignalingEnvelope) DeepCopy() *SignalingEnvelope {
if in == nil {
return nil
}
out := new(SignalingEnvelope)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *SignalingEnvelope) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SignalingEnvelopeList) DeepCopyInto(out *SignalingEnvelopeList) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ListMeta.DeepCopyInto(&out.ListMeta)
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]SignalingEnvelope, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SignalingEnvelopeList.
func (in *SignalingEnvelopeList) DeepCopy() *SignalingEnvelopeList {
if in == nil {
return nil
}
out := new(SignalingEnvelopeList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *SignalingEnvelopeList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}

View File

@@ -3,12 +3,9 @@ package k8s
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
@@ -16,30 +13,32 @@ import (
"riasc.eu/wice/pkg/crypto"
"riasc.eu/wice/pkg/pb"
"riasc.eu/wice/pkg/signaling"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned"
informers "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions"
)
const (
annotationPrefix string = "wice.riasc.eu"
defaultAnnotationOffers string = annotationPrefix + "/offers"
defaultAnnotationPublicKey string = annotationPrefix + "/public-key"
cleanupInterval = 1 * time.Minute
cleanupMaxAge = 10 * time.Minute
)
type Backend struct {
logger *zap.Logger
offers map[crypto.KeyPair]chan *pb.Offer
signaling.SubscriptionsRegistry
config BackendConfig
clientSet *kubernetes.Clientset
clientSet *wicev1.Clientset
informer cache.SharedInformer
term chan struct{}
updates chan NodeCallback
term chan struct{}
events chan *pb.Event
}
type OfferMap map[crypto.Key]*pb.Offer
logger *zap.Logger
}
func init() {
signaling.Backends["k8s"] = &signaling.BackendPlugin{
@@ -53,12 +52,11 @@ func NewBackend(cfg *signaling.BackendConfig, events chan *pb.Event) (signaling.
var err error
b := Backend{
offers: make(map[crypto.KeyPair]chan *pb.Offer),
logger: zap.L().Named("backend").With(zap.String("backend", uri.Scheme)),
term: make(chan struct{}),
updates: make(chan NodeCallback),
config: defaultConfig,
events: events,
SubscriptionsRegistry: signaling.NewSubscriptionsRegistry(),
term: make(chan struct{}),
config: defaultConfig,
events: events,
logger: zap.L().Named("backend").With(zap.String("backend", cfg.URI.Scheme)),
}
if err := b.config.Parse(cfg); err != nil {
@@ -88,32 +86,28 @@ func NewBackend(cfg *signaling.BackendConfig, events chan *pb.Event) (signaling.
}
// Create the clientset
b.clientSet, err = kubernetes.NewForConfig(config)
if err != nil {
if b.clientSet, err = wicev1.NewForConfig(config); err != nil {
return nil, fmt.Errorf("failed to create clientset: %w", err)
}
// Create the shared informer factory and use the client to connect to
// Kubernetes
factory := informers.NewSharedInformerFactoryWithOptions(b.clientSet, 0,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// options.LabelSelector = b.config.AnnotationPublicKey
}))
// Create the shared informer factory and use the client to connect to Kubernetes
factory := informers.NewSharedInformerFactoryWithOptions(b.clientSet, 0, informers.WithNamespace(b.config.Namespace))
// Get the informer for the right resource, in this case a Pod
b.informer = factory.Core().V1().Nodes().Informer()
b.informer = factory.Wice().V1().SignalingEnvelopes().Informer()
b.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: b.onNodeAdd,
UpdateFunc: b.onNodeUpdate,
DeleteFunc: b.onNodeDelete,
AddFunc: b.onSignalingEnvelopeAdd,
UpdateFunc: b.onSessionDescriptionUpdate,
})
go b.informer.Run(b.term)
b.logger.Debug("Started watching node resources")
go b.applyUpdates()
b.logger.Debug("Started batched updates")
cache.WaitForNamedCacheSync("signalingenvelopes", b.term, b.informer.HasSynced)
go b.periodicCleanup()
b.logger.Debug("Started regular cleanup")
b.events <- &pb.Event{
Type: pb.Event_BACKEND_READY,
@@ -127,58 +121,51 @@ func NewBackend(cfg *signaling.BackendConfig, events chan *pb.Event) (signaling.
return &b, nil
}
func (b *Backend) SubscribeOffers(kp crypto.KeyPair) (chan *pb.Offer, error) {
b.logger.Info("Subscribe to offers from peer", zap.Any("kp", kp))
func (b *Backend) Subscribe(kp *crypto.KeyPair) (chan *pb.SignalingMessage, error) {
b.logger.Info("Subscribe to messages from peer", zap.Any("kp", kp))
ch, ok := b.offers[kp]
if !ok {
ch = make(chan *pb.Offer, 100)
b.offers[kp] = ch
sub, err := b.NewSubscription(kp)
if err != nil {
return nil, fmt.Errorf("failed create subscription: %w", err)
}
// Process the node annotation at least once before we rely on the informer
node, err := b.getNodeByPublicKey(kp.Theirs)
if err == nil {
b.processNode(node)
// Process existing envelopes in cache
if err := b.processByKeyPair(kp); err != nil {
return nil, err
}
return ch, nil
return sub.Channel, nil
}
func (b *Backend) PublishOffer(kp crypto.KeyPair, offer *pb.Offer) error {
b.updateNode(func(node *corev1.Node) error {
offerMapJson, ok := node.ObjectMeta.Annotations[b.config.AnnotationOffers]
func (b *Backend) Publish(kp *crypto.KeyPair, msg *pb.SignalingMessage) error {
var err error
// Unmarshal
var om OfferMap
if ok && offerMapJson != "" {
if err := json.Unmarshal([]byte(offerMapJson), &om); err != nil {
return err
}
} else {
om = OfferMap{}
}
// Update
om[kp.Theirs] = offer
// Marshal
offerMapJsonNew, err := json.Marshal(&om)
if err != nil {
return err
}
node.ObjectMeta.Annotations[b.config.AnnotationOffers] = string(offerMapJsonNew)
node.ObjectMeta.Annotations[b.config.AnnotationPublicKey] = kp.Ours.String()
return nil
})
b.logger.Debug("Published offer",
b.logger.Debug("Published signaling message",
zap.Any("kp", kp),
zap.Any("offer", offer),
zap.Any("msg", msg),
)
envs := b.clientSet.WiceV1().SignalingEnvelopes(b.config.Namespace)
pbEnv, err := msg.Encrypt(kp)
if err != nil {
return fmt.Errorf("failed to encrypt message: %w", err)
}
env := &v1.SignalingEnvelope{
ObjectMeta: metav1.ObjectMeta{
GenerateName: b.config.GenerateName,
},
}
pbEnv.DeepCopyInto(&env.SignalingEnvelope)
if env, err = envs.Create(context.Background(), env, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("failed to create envelope: %w", err)
}
b.logger.Debug("Created new SignalingEnvelope", zap.String("name", env.ObjectMeta.Name))
return nil
}
@@ -188,22 +175,85 @@ func (b *Backend) Close() error {
return nil // TODO
}
func (b *Backend) Tick() {
func (b *Backend) onSignalingEnvelopeAdd(obj interface{}) {
env := obj.(*v1.SignalingEnvelope)
b.logger.Debug("SignalingEnvelope added", zap.String("name", env.ObjectMeta.Name))
if err := b.process(env); err != nil {
b.logger.Error("Failed to process SignalEnvelope", zap.Error(err))
}
}
func (b *Backend) getNodeByPublicKey(pk crypto.Key) (*corev1.Node, error) {
coreV1 := b.clientSet.CoreV1()
nodes, err := coreV1.Nodes().List(context.TODO(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("%s=%s", b.config.AnnotationPublicKey, pk),
})
func (b *Backend) onSessionDescriptionUpdate(_ interface{}, new interface{}) {
newEnv := new.(*v1.SignalingEnvelope)
b.logger.Debug("SignalingEnvelope updated", zap.String("name", newEnv.ObjectMeta.Name))
if err := b.process(newEnv); err != nil {
b.logger.Error("Failed to process SignalEnvelope", zap.Error(err))
}
}
func (b *Backend) process(env *v1.SignalingEnvelope) error {
sender, err := crypto.ParseKeyBytes(env.Sender)
if err != nil {
return nil, err
return fmt.Errorf("invalid key: %w", err)
}
if len(nodes.Items) != 1 {
return nil, fmt.Errorf("could not find node with public key: %s", pk)
sub, err := b.GetSubscription(&sender)
if err != nil {
return nil // ignore envelopes not addressed to us
}
return &nodes.Items[0], nil
if err := sub.NewMessage(&env.SignalingEnvelope); err != nil {
return err
}
// Delete envelope
envs := b.clientSet.WiceV1().SignalingEnvelopes(b.config.Namespace)
if err := envs.Delete(context.Background(), env.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
b.logger.Warn("Failed to delete envelope", zap.Error(err))
} else {
b.logger.Debug("Deleted envelope", zap.String("envelope", env.ObjectMeta.Name))
}
return nil
}
func (b *Backend) processByKeyPair(kp *crypto.KeyPair) error {
store := b.informer.GetStore()
for _, obj := range store.List() {
if env, ok := obj.(*v1.SignalingEnvelope); ok {
if err := b.process(env); err != nil {
return err
}
}
}
return nil
}
func (b *Backend) periodicCleanup() {
ticker := time.NewTicker(cleanupInterval)
b.cleanup()
for range ticker.C {
b.cleanup()
}
}
func (b *Backend) cleanup() {
store := b.informer.GetStore()
envs := b.clientSet.WiceV1().SignalingEnvelopes(b.config.Namespace)
for _, obj := range store.List() {
if env, ok := obj.(*v1.SignalingEnvelope); ok {
if time.Since(env.ObjectMeta.CreationTimestamp.Time) > cleanupMaxAge {
if err := envs.Delete(context.Background(), env.ObjectMeta.Name, metav1.DeleteOptions{}); err != nil {
b.logger.Error("Failed to delete envelope", zap.Any("name", env.ObjectMeta.Name), zap.Error(err))
} else {
b.logger.Debug("Deleted stale SignalingEnvelope", zap.String("name", env.ObjectMeta.Name))
}
}
}
}
}

View File

@@ -0,0 +1,117 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package versioned
import (
"fmt"
"net/http"
discovery "k8s.io/client-go/discovery"
rest "k8s.io/client-go/rest"
flowcontrol "k8s.io/client-go/util/flowcontrol"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/typed/wice/v1"
)
type Interface interface {
Discovery() discovery.DiscoveryInterface
WiceV1() wicev1.WiceV1Interface
}
// Clientset contains the clients for groups. Each group has exactly one
// version included in a Clientset.
type Clientset struct {
*discovery.DiscoveryClient
wiceV1 *wicev1.WiceV1Client
}
// WiceV1 retrieves the WiceV1Client
func (c *Clientset) WiceV1() wicev1.WiceV1Interface {
return c.wiceV1
}
// Discovery retrieves the DiscoveryClient
func (c *Clientset) Discovery() discovery.DiscoveryInterface {
if c == nil {
return nil
}
return c.DiscoveryClient
}
// NewForConfig creates a new Clientset for the given config.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfig will generate a rate-limiter in configShallowCopy.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*Clientset, error) {
configShallowCopy := *c
// share the transport between all clients
httpClient, err := rest.HTTPClientFor(&configShallowCopy)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&configShallowCopy, httpClient)
}
// NewForConfigAndClient creates a new Clientset for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
// If config's RateLimiter is not set and QPS and Burst are acceptable,
// NewForConfigAndClient will generate a rate-limiter in configShallowCopy.
func NewForConfigAndClient(c *rest.Config, httpClient *http.Client) (*Clientset, error) {
configShallowCopy := *c
if configShallowCopy.RateLimiter == nil && configShallowCopy.QPS > 0 {
if configShallowCopy.Burst <= 0 {
return nil, fmt.Errorf("burst is required to be greater than 0 when RateLimiter is not set and QPS is set to greater than 0")
}
configShallowCopy.RateLimiter = flowcontrol.NewTokenBucketRateLimiter(configShallowCopy.QPS, configShallowCopy.Burst)
}
var cs Clientset
var err error
cs.wiceV1, err = wicev1.NewForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
cs.DiscoveryClient, err = discovery.NewDiscoveryClientForConfigAndClient(&configShallowCopy, httpClient)
if err != nil {
return nil, err
}
return &cs, nil
}
// NewForConfigOrDie creates a new Clientset for the given config and
// panics if there is an error in the config.
func NewForConfigOrDie(c *rest.Config) *Clientset {
cs, err := NewForConfig(c)
if err != nil {
panic(err)
}
return cs
}
// New creates a new Clientset for the given RESTClient.
func New(c rest.Interface) *Clientset {
var cs Clientset
cs.wiceV1 = wicev1.New(c)
cs.DiscoveryClient = discovery.NewDiscoveryClient(c)
return &cs
}

View File

@@ -0,0 +1,20 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
// This package has the automatically generated clientset.
package versioned

View File

@@ -0,0 +1,85 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
fakediscovery "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/testing"
clientset "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/typed/wice/v1"
fakewicev1 "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/typed/wice/v1/fake"
)
// NewSimpleClientset returns a clientset that will respond with the provided objects.
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
func NewSimpleClientset(objects ...runtime.Object) *Clientset {
o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
for _, obj := range objects {
if err := o.Add(obj); err != nil {
panic(err)
}
}
cs := &Clientset{tracker: o}
cs.discovery = &fakediscovery.FakeDiscovery{Fake: &cs.Fake}
cs.AddReactor("*", "*", testing.ObjectReaction(o))
cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := o.Watch(gvr, ns)
if err != nil {
return false, nil, err
}
return true, watch, nil
})
return cs
}
// Clientset implements clientset.Interface. Meant to be embedded into a
// struct to get a default implementation. This makes faking out just the method
// you want to test easier.
type Clientset struct {
testing.Fake
discovery *fakediscovery.FakeDiscovery
tracker testing.ObjectTracker
}
func (c *Clientset) Discovery() discovery.DiscoveryInterface {
return c.discovery
}
func (c *Clientset) Tracker() testing.ObjectTracker {
return c.tracker
}
var (
_ clientset.Interface = &Clientset{}
_ testing.FakeClient = &Clientset{}
)
// WiceV1 retrieves the WiceV1Client
func (c *Clientset) WiceV1() wicev1.WiceV1Interface {
return &fakewicev1.FakeWiceV1{Fake: &c.Fake}
}

View File

@@ -0,0 +1,20 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
// This package has the automatically generated fake clientset.
package fake

View File

@@ -0,0 +1,56 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
serializer "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
)
var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)
var localSchemeBuilder = runtime.SchemeBuilder{
wicev1.AddToScheme,
}
// AddToScheme adds all types of this clientset into the given scheme. This allows composition
// of clientsets, like in:
//
// import (
// "k8s.io/client-go/kubernetes"
// clientsetscheme "k8s.io/client-go/kubernetes/scheme"
// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
// )
//
// kclientset, _ := kubernetes.NewForConfig(c)
// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme)
//
// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types
// correctly.
var AddToScheme = localSchemeBuilder.AddToScheme
func init() {
v1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1"})
utilruntime.Must(AddToScheme(scheme))
}

View File

@@ -0,0 +1,20 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
// This package contains the scheme of the automatically generated clientset.
package scheme

View File

@@ -0,0 +1,56 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package scheme
import (
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
serializer "k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
)
var Scheme = runtime.NewScheme()
var Codecs = serializer.NewCodecFactory(Scheme)
var ParameterCodec = runtime.NewParameterCodec(Scheme)
var localSchemeBuilder = runtime.SchemeBuilder{
wicev1.AddToScheme,
}
// AddToScheme adds all types of this clientset into the given scheme. This allows composition
// of clientsets, like in:
//
// import (
// "k8s.io/client-go/kubernetes"
// clientsetscheme "k8s.io/client-go/kubernetes/scheme"
// aggregatorclientsetscheme "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/scheme"
// )
//
// kclientset, _ := kubernetes.NewForConfig(c)
// _ = aggregatorclientsetscheme.AddToScheme(clientsetscheme.Scheme)
//
// After this, RawExtensions in Kubernetes types will serialize kube-aggregator types
// correctly.
var AddToScheme = localSchemeBuilder.AddToScheme
func init() {
v1.AddToGroupVersion(Scheme, schema.GroupVersion{Version: "v1"})
utilruntime.Must(AddToScheme(Scheme))
}

View File

@@ -0,0 +1,20 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
// This package has the automatically generated typed clients.
package v1

View File

@@ -0,0 +1,20 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
// Package fake has the automatically generated clients.
package fake

View File

@@ -0,0 +1,130 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
"context"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
)
// FakeSignalingEnvelopes implements SignalingEnvelopeInterface
type FakeSignalingEnvelopes struct {
Fake *FakeWiceV1
ns string
}
var signalingenvelopesResource = schema.GroupVersionResource{Group: "wice.riasc.eu", Version: "v1", Resource: "signalingenvelopes"}
var signalingenvelopesKind = schema.GroupVersionKind{Group: "wice.riasc.eu", Version: "v1", Kind: "SignalingEnvelope"}
// Get takes name of the signalingEnvelope, and returns the corresponding signalingEnvelope object, and an error if there is any.
func (c *FakeSignalingEnvelopes) Get(ctx context.Context, name string, options v1.GetOptions) (result *wicev1.SignalingEnvelope, err error) {
obj, err := c.Fake.
Invokes(testing.NewGetAction(signalingenvelopesResource, c.ns, name), &wicev1.SignalingEnvelope{})
if obj == nil {
return nil, err
}
return obj.(*wicev1.SignalingEnvelope), err
}
// List takes label and field selectors, and returns the list of SignalingEnvelopes that match those selectors.
func (c *FakeSignalingEnvelopes) List(ctx context.Context, opts v1.ListOptions) (result *wicev1.SignalingEnvelopeList, err error) {
obj, err := c.Fake.
Invokes(testing.NewListAction(signalingenvelopesResource, signalingenvelopesKind, c.ns, opts), &wicev1.SignalingEnvelopeList{})
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &wicev1.SignalingEnvelopeList{ListMeta: obj.(*wicev1.SignalingEnvelopeList).ListMeta}
for _, item := range obj.(*wicev1.SignalingEnvelopeList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested signalingEnvelopes.
func (c *FakeSignalingEnvelopes) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchAction(signalingenvelopesResource, c.ns, opts))
}
// Create takes the representation of a signalingEnvelope and creates it. Returns the server's representation of the signalingEnvelope, and an error, if there is any.
func (c *FakeSignalingEnvelopes) Create(ctx context.Context, signalingEnvelope *wicev1.SignalingEnvelope, opts v1.CreateOptions) (result *wicev1.SignalingEnvelope, err error) {
obj, err := c.Fake.
Invokes(testing.NewCreateAction(signalingenvelopesResource, c.ns, signalingEnvelope), &wicev1.SignalingEnvelope{})
if obj == nil {
return nil, err
}
return obj.(*wicev1.SignalingEnvelope), err
}
// Update takes the representation of a signalingEnvelope and updates it. Returns the server's representation of the signalingEnvelope, and an error, if there is any.
func (c *FakeSignalingEnvelopes) Update(ctx context.Context, signalingEnvelope *wicev1.SignalingEnvelope, opts v1.UpdateOptions) (result *wicev1.SignalingEnvelope, err error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateAction(signalingenvelopesResource, c.ns, signalingEnvelope), &wicev1.SignalingEnvelope{})
if obj == nil {
return nil, err
}
return obj.(*wicev1.SignalingEnvelope), err
}
// Delete takes name of the signalingEnvelope and deletes it. Returns an error if one occurs.
func (c *FakeSignalingEnvelopes) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error {
_, err := c.Fake.
Invokes(testing.NewDeleteActionWithOptions(signalingenvelopesResource, c.ns, name, opts), &wicev1.SignalingEnvelope{})
return err
}
// DeleteCollection deletes a collection of objects.
func (c *FakeSignalingEnvelopes) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(signalingenvelopesResource, c.ns, listOpts)
_, err := c.Fake.Invokes(action, &wicev1.SignalingEnvelopeList{})
return err
}
// Patch applies the patch and returns the patched signalingEnvelope.
func (c *FakeSignalingEnvelopes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *wicev1.SignalingEnvelope, err error) {
obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(signalingenvelopesResource, c.ns, name, pt, data, subresources...), &wicev1.SignalingEnvelope{})
if obj == nil {
return nil, err
}
return obj.(*wicev1.SignalingEnvelope), err
}

View File

@@ -0,0 +1,40 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
rest "k8s.io/client-go/rest"
testing "k8s.io/client-go/testing"
v1 "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/typed/wice/v1"
)
type FakeWiceV1 struct {
*testing.Fake
}
func (c *FakeWiceV1) SignalingEnvelopes(namespace string) v1.SignalingEnvelopeInterface {
return &FakeSignalingEnvelopes{c, namespace}
}
// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *FakeWiceV1) RESTClient() rest.Interface {
var ret *rest.RESTClient
return ret
}

View File

@@ -0,0 +1,21 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1
type SignalingEnvelopeExpansion interface{}

View File

@@ -0,0 +1,178 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1
import (
"context"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
rest "k8s.io/client-go/rest"
v1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
scheme "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/scheme"
)
// SignalingEnvelopesGetter has a method to return a SignalingEnvelopeInterface.
// A group's client should implement this interface.
type SignalingEnvelopesGetter interface {
SignalingEnvelopes(namespace string) SignalingEnvelopeInterface
}
// SignalingEnvelopeInterface has methods to work with SignalingEnvelope resources.
type SignalingEnvelopeInterface interface {
Create(ctx context.Context, signalingEnvelope *v1.SignalingEnvelope, opts metav1.CreateOptions) (*v1.SignalingEnvelope, error)
Update(ctx context.Context, signalingEnvelope *v1.SignalingEnvelope, opts metav1.UpdateOptions) (*v1.SignalingEnvelope, error)
Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error
DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error
Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.SignalingEnvelope, error)
List(ctx context.Context, opts metav1.ListOptions) (*v1.SignalingEnvelopeList, error)
Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error)
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.SignalingEnvelope, err error)
SignalingEnvelopeExpansion
}
// signalingEnvelopes implements SignalingEnvelopeInterface
type signalingEnvelopes struct {
client rest.Interface
ns string
}
// newSignalingEnvelopes returns a SignalingEnvelopes
func newSignalingEnvelopes(c *WiceV1Client, namespace string) *signalingEnvelopes {
return &signalingEnvelopes{
client: c.RESTClient(),
ns: namespace,
}
}
// Get takes name of the signalingEnvelope, and returns the corresponding signalingEnvelope object, and an error if there is any.
func (c *signalingEnvelopes) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.SignalingEnvelope, err error) {
result = &v1.SignalingEnvelope{}
err = c.client.Get().
Namespace(c.ns).
Resource("signalingenvelopes").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do(ctx).
Into(result)
return
}
// List takes label and field selectors, and returns the list of SignalingEnvelopes that match those selectors.
func (c *signalingEnvelopes) List(ctx context.Context, opts metav1.ListOptions) (result *v1.SignalingEnvelopeList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1.SignalingEnvelopeList{}
err = c.client.Get().
Namespace(c.ns).
Resource("signalingenvelopes").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do(ctx).
Into(result)
return
}
// Watch returns a watch.Interface that watches the requested signalingEnvelopes.
func (c *signalingEnvelopes) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("signalingenvelopes").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch(ctx)
}
// Create takes the representation of a signalingEnvelope and creates it. Returns the server's representation of the signalingEnvelope, and an error, if there is any.
func (c *signalingEnvelopes) Create(ctx context.Context, signalingEnvelope *v1.SignalingEnvelope, opts metav1.CreateOptions) (result *v1.SignalingEnvelope, err error) {
result = &v1.SignalingEnvelope{}
err = c.client.Post().
Namespace(c.ns).
Resource("signalingenvelopes").
VersionedParams(&opts, scheme.ParameterCodec).
Body(signalingEnvelope).
Do(ctx).
Into(result)
return
}
// Update takes the representation of a signalingEnvelope and updates it. Returns the server's representation of the signalingEnvelope, and an error, if there is any.
func (c *signalingEnvelopes) Update(ctx context.Context, signalingEnvelope *v1.SignalingEnvelope, opts metav1.UpdateOptions) (result *v1.SignalingEnvelope, err error) {
result = &v1.SignalingEnvelope{}
err = c.client.Put().
Namespace(c.ns).
Resource("signalingenvelopes").
Name(signalingEnvelope.Name).
VersionedParams(&opts, scheme.ParameterCodec).
Body(signalingEnvelope).
Do(ctx).
Into(result)
return
}
// Delete takes name of the signalingEnvelope and deletes it. Returns an error if one occurs.
func (c *signalingEnvelopes) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error {
return c.client.Delete().
Namespace(c.ns).
Resource("signalingenvelopes").
Name(name).
Body(&opts).
Do(ctx).
Error()
}
// DeleteCollection deletes a collection of objects.
func (c *signalingEnvelopes) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error {
var timeout time.Duration
if listOpts.TimeoutSeconds != nil {
timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second
}
return c.client.Delete().
Namespace(c.ns).
Resource("signalingenvelopes").
VersionedParams(&listOpts, scheme.ParameterCodec).
Timeout(timeout).
Body(&opts).
Do(ctx).
Error()
}
// Patch applies the patch and returns the patched signalingEnvelope.
func (c *signalingEnvelopes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.SignalingEnvelope, err error) {
result = &v1.SignalingEnvelope{}
err = c.client.Patch(pt).
Namespace(c.ns).
Resource("signalingenvelopes").
Name(name).
SubResource(subresources...).
VersionedParams(&opts, scheme.ParameterCodec).
Body(data).
Do(ctx).
Into(result)
return
}

View File

@@ -0,0 +1,107 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1
import (
"net/http"
rest "k8s.io/client-go/rest"
v1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
"riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned/scheme"
)
type WiceV1Interface interface {
RESTClient() rest.Interface
SignalingEnvelopesGetter
}
// WiceV1Client is used to interact with features provided by the wice.riasc.eu group.
type WiceV1Client struct {
restClient rest.Interface
}
func (c *WiceV1Client) SignalingEnvelopes(namespace string) SignalingEnvelopeInterface {
return newSignalingEnvelopes(c, namespace)
}
// NewForConfig creates a new WiceV1Client for the given config.
// NewForConfig is equivalent to NewForConfigAndClient(c, httpClient),
// where httpClient was generated with rest.HTTPClientFor(c).
func NewForConfig(c *rest.Config) (*WiceV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
httpClient, err := rest.HTTPClientFor(&config)
if err != nil {
return nil, err
}
return NewForConfigAndClient(&config, httpClient)
}
// NewForConfigAndClient creates a new WiceV1Client for the given config and http client.
// Note the http client provided takes precedence over the configured transport values.
func NewForConfigAndClient(c *rest.Config, h *http.Client) (*WiceV1Client, error) {
config := *c
if err := setConfigDefaults(&config); err != nil {
return nil, err
}
client, err := rest.RESTClientForConfigAndClient(&config, h)
if err != nil {
return nil, err
}
return &WiceV1Client{client}, nil
}
// NewForConfigOrDie creates a new WiceV1Client for the given config and
// panics if there is an error in the config.
func NewForConfigOrDie(c *rest.Config) *WiceV1Client {
client, err := NewForConfig(c)
if err != nil {
panic(err)
}
return client
}
// New creates a new WiceV1Client for the given RESTClient.
func New(c rest.Interface) *WiceV1Client {
return &WiceV1Client{c}
}
func setConfigDefaults(config *rest.Config) error {
gv := v1.SchemeGroupVersion
config.GroupVersion = &gv
config.APIPath = "/apis"
config.NegotiatedSerializer = scheme.Codecs.WithoutConversion()
if config.UserAgent == "" {
config.UserAgent = rest.DefaultKubernetesUserAgent()
}
return nil
}
// RESTClient returns a RESTClient that is used to communicate
// with API server by this client implementation.
func (c *WiceV1Client) RESTClient() rest.Interface {
if c == nil {
return nil
}
return c.restClient
}

View File

@@ -0,0 +1,180 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package externalversions
import (
reflect "reflect"
sync "sync"
time "time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
schema "k8s.io/apimachinery/pkg/runtime/schema"
cache "k8s.io/client-go/tools/cache"
versioned "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned"
internalinterfaces "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/internalinterfaces"
wice "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/wice"
)
// SharedInformerOption defines the functional option type for SharedInformerFactory.
type SharedInformerOption func(*sharedInformerFactory) *sharedInformerFactory
type sharedInformerFactory struct {
client versioned.Interface
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
lock sync.Mutex
defaultResync time.Duration
customResync map[reflect.Type]time.Duration
informers map[reflect.Type]cache.SharedIndexInformer
// startedInformers is used for tracking which informers have been started.
// This allows Start() to be called multiple times safely.
startedInformers map[reflect.Type]bool
}
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
for k, v := range resyncConfig {
factory.customResync[reflect.TypeOf(k)] = v
}
return factory
}
}
// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory.
func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.tweakListOptions = tweakListOptions
return factory
}
}
// WithNamespace limits the SharedInformerFactory to the specified namespace.
func WithNamespace(namespace string) SharedInformerOption {
return func(factory *sharedInformerFactory) *sharedInformerFactory {
factory.namespace = namespace
return factory
}
}
// NewSharedInformerFactory constructs a new instance of sharedInformerFactory for all namespaces.
func NewSharedInformerFactory(client versioned.Interface, defaultResync time.Duration) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync)
}
// NewFilteredSharedInformerFactory constructs a new instance of sharedInformerFactory.
// Listers obtained via this SharedInformerFactory will be subject to the same filters
// as specified here.
// Deprecated: Please use NewSharedInformerFactoryWithOptions instead
func NewFilteredSharedInformerFactory(client versioned.Interface, defaultResync time.Duration, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerFactory {
return NewSharedInformerFactoryWithOptions(client, defaultResync, WithNamespace(namespace), WithTweakListOptions(tweakListOptions))
}
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options.
func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory {
factory := &sharedInformerFactory{
client: client,
namespace: v1.NamespaceAll,
defaultResync: defaultResync,
informers: make(map[reflect.Type]cache.SharedIndexInformer),
startedInformers: make(map[reflect.Type]bool),
customResync: make(map[reflect.Type]time.Duration),
}
// Apply all options
for _, opt := range options {
factory = opt(factory)
}
return factory
}
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
f.lock.Lock()
defer f.lock.Unlock()
for informerType, informer := range f.informers {
if !f.startedInformers[informerType] {
go informer.Run(stopCh)
f.startedInformers[informerType] = true
}
}
}
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
informers := func() map[reflect.Type]cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informers := map[reflect.Type]cache.SharedIndexInformer{}
for informerType, informer := range f.informers {
if f.startedInformers[informerType] {
informers[informerType] = informer
}
}
return informers
}()
res := map[reflect.Type]bool{}
for informType, informer := range informers {
res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
}
return res
}
// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
f.lock.Lock()
defer f.lock.Unlock()
informerType := reflect.TypeOf(obj)
informer, exists := f.informers[informerType]
if exists {
return informer
}
resyncPeriod, exists := f.customResync[informerType]
if !exists {
resyncPeriod = f.defaultResync
}
informer = newFunc(f.client, resyncPeriod)
f.informers[informerType] = informer
return informer
}
// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
internalinterfaces.SharedInformerFactory
ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
Wice() wice.Interface
}
func (f *sharedInformerFactory) Wice() wice.Interface {
return wice.New(f, f.namespace, f.tweakListOptions)
}

View File

@@ -0,0 +1,62 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package externalversions
import (
"fmt"
schema "k8s.io/apimachinery/pkg/runtime/schema"
cache "k8s.io/client-go/tools/cache"
v1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
)
// GenericInformer is type of SharedIndexInformer which will locate and delegate to other
// sharedInformers based on type
type GenericInformer interface {
Informer() cache.SharedIndexInformer
Lister() cache.GenericLister
}
type genericInformer struct {
informer cache.SharedIndexInformer
resource schema.GroupResource
}
// Informer returns the SharedIndexInformer.
func (f *genericInformer) Informer() cache.SharedIndexInformer {
return f.informer
}
// Lister returns the GenericLister.
func (f *genericInformer) Lister() cache.GenericLister {
return cache.NewGenericLister(f.Informer().GetIndexer(), f.resource)
}
// ForResource gives generic access to a shared informer of the matching type
// TODO extend this to unknown resources with a client pool
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
switch resource {
// Group=wice.riasc.eu, Version=v1
case v1.SchemeGroupVersion.WithResource("signalingenvelopes"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Wice().V1().SignalingEnvelopes().Informer()}, nil
}
return nil, fmt.Errorf("no informer found for %v", resource)
}

View File

@@ -0,0 +1,40 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package internalinterfaces
import (
time "time"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
cache "k8s.io/client-go/tools/cache"
versioned "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned"
)
// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer.
type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer
// SharedInformerFactory a small interface to allow for adding an informer without an import cycle
type SharedInformerFactory interface {
Start(stopCh <-chan struct{})
InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer
}
// TweakListOptionsFunc is a function that transforms a v1.ListOptions.
type TweakListOptionsFunc func(*v1.ListOptions)

View File

@@ -0,0 +1,46 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package wice
import (
internalinterfaces "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/internalinterfaces"
v1 "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/wice/v1"
)
// Interface provides access to each of this group's versions.
type Interface interface {
// V1 provides access to shared informers for resources in V1.
V1() v1.Interface
}
type group struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &group{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// V1 returns a new v1.Interface.
func (g *group) V1() v1.Interface {
return v1.New(g.factory, g.namespace, g.tweakListOptions)
}

View File

@@ -0,0 +1,45 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package v1
import (
internalinterfaces "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/internalinterfaces"
)
// Interface provides access to all the informers in this group version.
type Interface interface {
// SignalingEnvelopes returns a SignalingEnvelopeInformer.
SignalingEnvelopes() SignalingEnvelopeInformer
}
type version struct {
factory internalinterfaces.SharedInformerFactory
namespace string
tweakListOptions internalinterfaces.TweakListOptionsFunc
}
// New returns a new Interface.
func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakListOptions internalinterfaces.TweakListOptionsFunc) Interface {
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// SignalingEnvelopes returns a SignalingEnvelopeInformer.
func (v *version) SignalingEnvelopes() SignalingEnvelopeInformer {
return &signalingEnvelopeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}

View File

@@ -0,0 +1,90 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package v1
import (
"context"
time "time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
cache "k8s.io/client-go/tools/cache"
wicev1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
versioned "riasc.eu/wice/pkg/signaling/k8s/client/clientset/versioned"
internalinterfaces "riasc.eu/wice/pkg/signaling/k8s/client/informers/externalversions/internalinterfaces"
v1 "riasc.eu/wice/pkg/signaling/k8s/client/listers/wice/v1"
)
// SignalingEnvelopeInformer provides access to a shared informer and lister for
// SignalingEnvelopes.
type SignalingEnvelopeInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1.SignalingEnvelopeLister
}
type signalingEnvelopeInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// NewSignalingEnvelopeInformer constructs a new informer for SignalingEnvelope type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewSignalingEnvelopeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredSignalingEnvelopeInformer(client, namespace, resyncPeriod, indexers, nil)
}
// NewFilteredSignalingEnvelopeInformer constructs a new informer for SignalingEnvelope type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredSignalingEnvelopeInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.WiceV1().SignalingEnvelopes(namespace).List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.WiceV1().SignalingEnvelopes(namespace).Watch(context.TODO(), options)
},
},
&wicev1.SignalingEnvelope{},
resyncPeriod,
indexers,
)
}
func (f *signalingEnvelopeInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredSignalingEnvelopeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *signalingEnvelopeInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&wicev1.SignalingEnvelope{}, f.defaultInformer)
}
func (f *signalingEnvelopeInformer) Lister() v1.SignalingEnvelopeLister {
return v1.NewSignalingEnvelopeLister(f.Informer().GetIndexer())
}

View File

@@ -0,0 +1,27 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by lister-gen. DO NOT EDIT.
package v1
// SignalingEnvelopeListerExpansion allows custom methods to be added to
// SignalingEnvelopeLister.
type SignalingEnvelopeListerExpansion interface{}
// SignalingEnvelopeNamespaceListerExpansion allows custom methods to be added to
// SignalingEnvelopeNamespaceLister.
type SignalingEnvelopeNamespaceListerExpansion interface{}

View File

@@ -0,0 +1,99 @@
/*
Copyright The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by lister-gen. DO NOT EDIT.
package v1
import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
v1 "riasc.eu/wice/pkg/signaling/k8s/apis/wice/v1"
)
// SignalingEnvelopeLister helps list SignalingEnvelopes.
// All objects returned here must be treated as read-only.
type SignalingEnvelopeLister interface {
// List lists all SignalingEnvelopes in the indexer.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.SignalingEnvelope, err error)
// SignalingEnvelopes returns an object that can list and get SignalingEnvelopes.
SignalingEnvelopes(namespace string) SignalingEnvelopeNamespaceLister
SignalingEnvelopeListerExpansion
}
// signalingEnvelopeLister implements the SignalingEnvelopeLister interface.
type signalingEnvelopeLister struct {
indexer cache.Indexer
}
// NewSignalingEnvelopeLister returns a new SignalingEnvelopeLister.
func NewSignalingEnvelopeLister(indexer cache.Indexer) SignalingEnvelopeLister {
return &signalingEnvelopeLister{indexer: indexer}
}
// List lists all SignalingEnvelopes in the indexer.
func (s *signalingEnvelopeLister) List(selector labels.Selector) (ret []*v1.SignalingEnvelope, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1.SignalingEnvelope))
})
return ret, err
}
// SignalingEnvelopes returns an object that can list and get SignalingEnvelopes.
func (s *signalingEnvelopeLister) SignalingEnvelopes(namespace string) SignalingEnvelopeNamespaceLister {
return signalingEnvelopeNamespaceLister{indexer: s.indexer, namespace: namespace}
}
// SignalingEnvelopeNamespaceLister helps list and get SignalingEnvelopes.
// All objects returned here must be treated as read-only.
type SignalingEnvelopeNamespaceLister interface {
// List lists all SignalingEnvelopes in the indexer for a given namespace.
// Objects returned here must be treated as read-only.
List(selector labels.Selector) (ret []*v1.SignalingEnvelope, err error)
// Get retrieves the SignalingEnvelope from the indexer for a given namespace and name.
// Objects returned here must be treated as read-only.
Get(name string) (*v1.SignalingEnvelope, error)
SignalingEnvelopeNamespaceListerExpansion
}
// signalingEnvelopeNamespaceLister implements the SignalingEnvelopeNamespaceLister
// interface.
type signalingEnvelopeNamespaceLister struct {
indexer cache.Indexer
namespace string
}
// List lists all SignalingEnvelopes in the indexer for a given namespace.
func (s signalingEnvelopeNamespaceLister) List(selector labels.Selector) (ret []*v1.SignalingEnvelope, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1.SignalingEnvelope))
})
return ret, err
}
// Get retrieves the SignalingEnvelope from the indexer for a given namespace and name.
func (s signalingEnvelopeNamespaceLister) Get(name string) (*v1.SignalingEnvelope, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1.Resource("signalingenvelope"), name)
}
return obj.(*v1.SignalingEnvelope), nil
}

View File

@@ -1,41 +1,25 @@
package k8s
import (
"errors"
"net/url"
"riasc.eu/wice/pkg/signaling"
)
type BackendConfig struct {
signaling.BackendConfig
Kubeconfig string
NodeName string
AnnotationOffers string
AnnotationPublicKey string
Namespace string
GenerateName string
}
var defaultConfig = BackendConfig{
AnnotationOffers: defaultAnnotationOffers,
AnnotationPublicKey: defaultAnnotationPublicKey,
GenerateName: "wice-",
Namespace: "riasc-system",
}
func (c *BackendConfig) Parse(cfg *signaling.BackendConfig) error {
c.BackendConfig = *cfg
options := uri.Query()
if str := options.Get("node"); str == "" {
return errors.New("missing backend option: node")
}
if str := options.Get("annotation-offers"); str != "" {
c.AnnotationOffers = str
}
if str := options.Get("annotation-public-key"); str != "" {
c.AnnotationPublicKey = str
}
c.Kubeconfig = c.URI.Path
return nil

View File

@@ -1,81 +0,0 @@
package k8s
import (
"encoding/json"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"riasc.eu/wice/pkg/crypto"
)
func (b *Backend) onNodeAdd(obj interface{}) {
node := obj.(*corev1.Node)
b.logger.Debug("Node added", zap.String("name", node.ObjectMeta.Name))
b.processNode(node)
}
func (b *Backend) onNodeUpdate(_ interface{}, new interface{}) {
newNode := new.(*corev1.Node)
b.logger.Debug("Node updated", zap.String("name", newNode.ObjectMeta.Name))
b.processNode(newNode)
}
func (b *Backend) onNodeDelete(obj interface{}) {
node := obj.(*corev1.Node)
b.logger.Debug("Node deleted", zap.String("name", node.ObjectMeta.Name))
b.processNode(node)
}
func (b *Backend) processNode(node *corev1.Node) {
// Ignore ourself
if node.ObjectMeta.Name == b.config.NodeName {
b.logger.Debug("Ignoring ourself", zap.String("node", node.ObjectMeta.Name))
return
}
// Check if required annotations are present
offersJson, ok := node.ObjectMeta.Annotations[b.config.AnnotationOffers]
if !ok {
b.logger.Debug("Missing candidate annotation", zap.String("node", node.ObjectMeta.Name))
return
}
keyString, ok := node.ObjectMeta.Annotations[b.config.AnnotationPublicKey]
if !ok {
b.logger.Debug("Missing public key annotation", zap.String("node", node.ObjectMeta.Name))
return
}
var err error
var theirPK crypto.Key
theirPK, err = crypto.ParseKey(keyString)
if err != nil {
b.logger.Warn("Failed to parse public key", zap.Error(err))
}
var om OfferMap
if err := json.Unmarshal([]byte(offersJson), &om); err != nil {
b.logger.Warn("Failed to parse candidate annotation", zap.Error(err))
return
}
for ourPK, o := range om {
kp := crypto.KeyPair{
Ours: ourPK,
Theirs: theirPK,
}
ch, ok := b.offers[kp]
if !ok {
b.logger.Warn("Found candidates for unknown peer", zap.Any("kp", kp))
continue
}
ch <- o
}
}

View File

@@ -1,62 +0,0 @@
package k8s
import (
"context"
"fmt"
"time"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/util/retry"
)
type NodeCallback func(*corev1.Node) error
type AnnotationCallback func(string) (string, error)
type AnnotationCallbackMap map[string]AnnotationCallback
func (b *Backend) applyUpdates() {
var cbs []NodeCallback
var timer time.Timer
for {
select {
case cb := <-b.updates:
cbs = append(cbs, cb)
timer = *time.NewTimer(50 * time.Millisecond)
case <-timer.C:
if len(cbs) > 0 {
b.logger.Debug("Applying batched updates", zap.Int("count", len(cbs)))
nodes := b.clientSet.CoreV1().Nodes()
if err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
node, err := nodes.Get(context.TODO(), b.config.NodeName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get latest version of node %s: %w", b.config.NodeName, err)
}
for _, cb := range cbs {
if err := cb(node); err != nil {
return err
}
}
_, err = nodes.Update(context.TODO(), node, metav1.UpdateOptions{})
return err
}); err != nil {
b.logger.Error("Failed to update node", zap.Error(err))
}
cbs = nil
}
}
}
}
func (b *Backend) updateNode(cb NodeCallback) {
b.updates <- cb
}