- r - 以节省内存的方式增长 data.frame
- ruby-on-rails - ruby/ruby on rails 内存泄漏检测
- android - 无法解析导入android.support.v7.app
- UNIX 域套接字与共享内存(映射文件)
我正在编写一个使用 k8s.io/client-go 的脚本库 ( godocs here ) 来操作部署。特别是,我想为集群中的每个 Deployment 添加一个标签选择器。部署标签选择器是 immutable .所以我的方法是:
我不能只使用 client-go 库按顺序执行步骤 1-4,因为我只想在 API 服务器认为上一步已完成时继续下一步。例如,在 API 服务器说原始 Deployments 已被删除之前,我不想执行第 3 步。否则,我会得到同名 Deployment 已存在的错误。
使用 client-go 库检测 Deployment 何时创建和删除以及附加回调函数的最佳方法是什么?我遇到了以下软件包。
但我不确定它们之间有什么区别以及使用哪一个。
我阅读了 watch here 的示例和 informer here .这是 two related所以问题。
好像watch提供了一种较低级别的方法来监视资源的更改并接收有关更改的事件。好像在使用 SharedInformerFactory创建一个 SharedInformer 是可行的方法。
目前为止
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
typedv1 "k8s.io/client-go/kubernetes/typed/apps/v1"
"k8s.io/client-go/tools/cache"
"path/filepath"
"strings"
// We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"log"
"os"
)
func main() {
...
factory := informers.NewSharedInformerFactory(kubeclient, 0)
informer := factory.Apps().V1().Deployments().Informer()
stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d := obj.(v1.Deployment)
fmt.Printf("Created deployment in namespace %s, name %s.\n", d.GetNamespace(), d.GetName())
if _, ok := d.GetLabels()[tempLabelKey]; ok {
fmt.Printf("Detected temporary deployment created in namespace %s, name %s.\n", d.GetNamespace(), d.GetName())
deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
fmt.Printf("Now deleting previous deployment in namespace %s, name %s.\n", d.GetNamespace(), deploymentToDelete)
deleteDeployment(deploymentToDelete, d.GetNamespace(), kubeclient)
}
},
DeleteFunc: func(obj interface{}) {
d := obj.(v1.Deployment)
fmt.Printf("Deleted deployment in namespace %s, name %s.\n", d.GetNamespace(), d.GetName())
if _, ok := d.GetLabels()[stageLabelKey]; !ok {
fmt.Printf("Detected deployment without stage label was deleted in namespace %s, name %s.\n", d.GetNamespace(), d.GetName())
fmt.Printf("Now creating normal deployment with stage label in namespace %s, name %s.\n", d.GetNamespace(), d.GetName())
deployment := createDeploymentWithNewLabel(stageLabelKey, "production", d)
createDeploymentsOnApi(deployment, kubeclient)
}
},
})
informer.Run(stopper)
}
最佳答案
我最终使用了 SharedInformer .
这些资源很有帮助。
.
package main
import (
"encoding/json"
"errors"
"flag"
"fmt"
"io/ioutil"
"k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"path/filepath"
"strings"
// We need this import to load the GCP auth plugin which is required to authenticate against GKE clusters.
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"k8s.io/client-go/tools/clientcmd"
"log"
"os"
)
const manifestsDir = "manifests"
// Use an empty string to run on all namespaces
const namespace = ""
const newLabelKey = "new-label-to-add"
const tempLabelKey = "temporary"
const tempSuffix = "-temp"
const componentLabelKey = "component"
func main() {
var kubeconfig *string
if home := homeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
// use the current context in kubeconfig
// TODO (dxia) How can I specify a masterUrl or even better a kubectl context?
cfg, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
exitOnErr(err)
kubeclient, err := kubernetes.NewForConfig(cfg)
exitOnErr(err)
fmt.Printf("Getting deployments with '%s' label.\n", componentLabelKey)
deployments, err := kubeclient.AppsV1().Deployments(namespace).List(metav1.ListOptions{
LabelSelector: componentLabelKey,
})
fmt.Printf("Got %d deployments.\n", len(deployments.Items))
exitOnErr(err)
deployments = processDeployments(deployments)
fmt.Println("Saving deployment manifests to disk as backup.")
err = saveDeployments(deployments)
exitOnErr(err)
tempDeployments := appendToDeploymentName(deployments, tempSuffix)
tempDeployments = createDeploymentsWithNewLabel(tempLabelKey, "true", tempDeployments)
factory := informers.NewSharedInformerFactory(kubeclient, 0)
informer := factory.Apps().V1().Deployments().Informer()
stopper := make(chan struct{})
defer close(stopper)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
d := obj.(*v1.Deployment)
labels := d.GetLabels()
if _, ok := labels[tempLabelKey]; ok {
labelsStr := joinLabelKeyVals(labels)
fmt.Printf("2: Temporary deployment created in namespace %s, name %s, labels '%s'.\n", d.GetNamespace(), d.GetName(), labelsStr)
deploymentToDelete := strings.Replace(d.GetName(), tempSuffix, "", -1)
deployment := getDeployment(d.GetNamespace(), deploymentToDelete, componentLabelKey, kubeclient)
if deployment != nil {
fmt.Printf("3: Now deleting previous deployment in namespace %s, name %s.\n", d.GetNamespace(), deploymentToDelete)
if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
exitOnErr(err)
}
} else {
fmt.Printf("4: Didn't find deployment in namespace %s, name %s, label %s. Skipping.\n", d.GetNamespace(), deploymentToDelete, componentLabelKey)
}
} else if labelVal, ok := labels[newLabelKey]; ok && labelVal == "production" {
fmt.Printf("Normal deployment with '%s' label created in namespace %s, name %s.\n", newLabelKey, d.GetNamespace(), d.GetName())
deploymentToDelete := d.GetName() + tempSuffix
fmt.Printf("6: Now deleting temporary deployment in namespace %s, name %s.\n", d.GetNamespace(), deploymentToDelete)
if err := deleteDeployment(d.GetNamespace(), deploymentToDelete, kubeclient); err != nil {
exitOnErr(err)
}
}
},
DeleteFunc: func(obj interface{}) {
d := obj.(*v1.Deployment)
labels := d.GetLabels()
if _, ok := labels[newLabelKey]; !ok {
if _, ok := labels[tempLabelKey]; !ok {
fmt.Printf("Deployment without '%s' or '%s' label deleted in namespace %s, name %s.\n", newLabelKey, tempLabelKey, d.GetNamespace(), d.GetName())
fmt.Printf("5: Now creating normal deployment with '%s' label in namespace %s, name %s.\n", newLabelKey, d.GetNamespace(), d.GetName())
deploymentToCreate := createDeploymentWithNewLabel(newLabelKey, "production", *d)
if err := createDeploymentOnApi(deploymentToCreate, kubeclient); err != nil {
exitOnErr(err)
}
}
}
},
})
fmt.Println("1: Creating temporary Deployments.")
err = createDeploymentsOnApi(tempDeployments, kubeclient)
exitOnErr(err)
informer.Run(stopper)
}
func getDeployment(namespace string, name string, labelKey string, client *kubernetes.Clientset) *v1.Deployment {
d, err := client.AppsV1().Deployments(namespace).Get(name, metav1.GetOptions{})
if err != nil {
return nil
}
if _, ok := d.GetLabels()[labelKey]; !ok {
return nil
}
return d
}
func createDeploymentWithNewLabel(key string, val string, deployment v1.Deployment) v1.Deployment {
newDeployment := deployment.DeepCopy()
labels := newDeployment.GetLabels()
if labels == nil {
labels = make(map[string]string)
newDeployment.SetLabels(labels)
}
labels[key] = val
podTemplateSpecLabels := newDeployment.Spec.Template.GetLabels()
if podTemplateSpecLabels == nil {
podTemplateSpecLabels = make(map[string]string)
newDeployment.Spec.Template.SetLabels(podTemplateSpecLabels)
}
podTemplateSpecLabels[key] = val
labelSelectors := newDeployment.Spec.Selector.MatchLabels
if labelSelectors == nil {
labelSelectors = make(map[string]string)
newDeployment.Spec.Selector.MatchLabels = labelSelectors
}
labelSelectors[key] = val
return *newDeployment
}
func createDeploymentsWithNewLabel(key string, val string, deployments *v1.DeploymentList) *v1.DeploymentList {
newDeployments := &v1.DeploymentList{}
for _, d := range deployments.Items {
newDeployment := createDeploymentWithNewLabel(key, val, d)
newDeployments.Items = append(newDeployments.Items, newDeployment)
}
return newDeployments
}
func setAPIVersionAndKindForDeployment(d v1.Deployment, apiVersion string, kind string) {
// These fields are empty strings.
// Looks like an open issue: https://github.com/kubernetes/kubernetes/issues/3030.
d.APIVersion = apiVersion
d.Kind = kind
}
func processDeployments(deployments *v1.DeploymentList) *v1.DeploymentList {
newDeployments := &v1.DeploymentList{}
for _, d := range deployments.Items {
// Set APIVersion and Kind until https://github.com/kubernetes/kubernetes/issues/3030 is fixed
setAPIVersionAndKindForDeployment(d, "apps/v1", "Deployment")
d.Status = v1.DeploymentStatus{}
d.SetUID(types.UID(""))
d.SetSelfLink("")
d.SetGeneration(0)
d.SetCreationTimestamp(metav1.Now())
newDeployments.Items = append(newDeployments.Items, d)
}
return newDeployments
}
func saveDeployments(deployments *v1.DeploymentList) error {
for _, d := range deployments.Items {
if err := saveManifest(d); err != nil {
return err
}
}
return nil
}
func saveManifest(resource interface{}) error {
var path = manifestsDir
var name string
var err error
switch v := resource.(type) {
case v1.Deployment:
path = fmt.Sprintf("%s%s/%s/%s", path, v.GetClusterName(), v.GetNamespace(), "deployments")
name = v.GetName()
default:
return errors.New(fmt.Sprintf("Got an unknown resource kind: %v", resource))
}
bytes, err := json.MarshalIndent(resource, "", " ")
if err != nil {
return err
}
err = os.MkdirAll(path, 0755)
if err != nil {
return err
}
err = ioutil.WriteFile(fmt.Sprintf("%s/%s", path, name), bytes, 0644)
if err != nil {
return err
}
return nil
}
func deleteDeployment(namespace string, name string, client *kubernetes.Clientset) error {
if err := client.AppsV1().Deployments(namespace).Delete(name, &metav1.DeleteOptions{}); err != nil {
return err
}
return nil
}
func appendToDeploymentName(deployments *v1.DeploymentList, suffix string) *v1.DeploymentList {
newDeployments := &v1.DeploymentList{}
for _, d := range deployments.Items {
d.SetName(fmt.Sprintf("%s%s", d.GetName(), suffix))
newDeployments.Items = append(newDeployments.Items, d)
}
return newDeployments
}
func createDeploymentOnApi(d v1.Deployment, client *kubernetes.Clientset) error {
d.SetResourceVersion("")
if _, err := client.AppsV1().Deployments(d.GetNamespace()).Create(&d); err != nil {
return err
}
return nil
}
func createDeploymentsOnApi(deployments *v1.DeploymentList, client *kubernetes.Clientset) error {
for _, d := range deployments.Items {
if err := createDeploymentOnApi(d, client); err != nil {
return err
}
}
return nil
}
func joinLabelKeyVals(labels map[string]string) string {
labelKeyVals := make([]string, 0, len(labels))
for k, v := range labels {
labelKeyVals = append(labelKeyVals, fmt.Sprintf("%v=%v", k, v))
}
return strings.Join(labelKeyVals, ", ")
}
func homeDir() string {
if h := os.Getenv("HOME"); h != "" {
return h
}
return os.Getenv("USERPROFILE") // windows
}
func exitOnErr(err error) {
if err != nil {
log.Fatal(err)
}
}
关于go - 当使用 k8s.io/client-go 库更改 kubernetes 部署时,获得通知的最佳方式是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53200785/
我遇到了一个错误,我不知道如何解决。我有以下代码(来自 Eliom Graffiti 教程),我正在尝试使用 make test.byte 进行测试。 open Eliom_content.Html5
我阅读文档的理解是这样的: 客户端是测试用例的子类。当我们运行 manage.py test 时,会为每个以“test_”开头的方法创建一个 SimpleTest 类的实例(它继承自 TestCase
我已经编写了一个用于接收多个客户端的服务器,它可以分别与客户端通信。在这里,我可以列出服务器中已连接的客户端,但是当客户端断开连接时,它不会从服务器中删除客户端。 Server.py import s
我正在制作一个社交网站。当任何用户在站点上更新或创建新内容时,我需要查看站点的任何其他用户来查看更改更新。 我有一些需要低延迟的评论,因此建议为此订阅。 我也有事件,但这些不需要这么低的延迟。每 10
我想在突变后使用乐观 UI 更新:https://www.apollographql.com/docs/react/basics/mutations.html 我对“乐观响应”和“更新”之间的关系感到
我想了解 Dask 在本地机器上的使用模式。 具体而言, 我有一个适合内存的数据集 我想做一些 pandas 操作 分组依据... 日期解析 等等 Pandas 通过单核执行这些操作,这些操作对我来说
我使用 Apollo、React 和 Graphcool。我有一个查询来获取登录的用户 ID: const LoginServerQuery = gql` query LoginServerQ
在本指南的帮助下,我最近在几个设备的应用程序中设置了 P2P 通信:http://developer.android.com/training/connect-devices-wirelessly/n
注意:我在节点项目中使用@twilio/conversations 1.1.0 版。我正在从使用可编程聊天过渡到对话。 我看到对 Client.getConversationByUniqueName
我对服务客户端和设备客户端库有点困惑。谁能解答我对此的疑问。 问题:当我通过 deviceClient 发送数据时,我无法接收数据,但当我使用服务客户端发送数据时,相同的代码可以工作。现在,xamar
我对服务客户端和设备客户端库有点困惑。谁能解答我对此的疑问。 问题:当我通过 deviceClient 发送数据时,我无法接收数据,但当我使用服务客户端发送数据时,相同的代码可以工作。现在,xamar
假设我有一个简单的应用程序。 如何设置 OAuth2 以允许其他应用程序访问我的应用程序的某些部分。 例如,当开发人员想要使用 Facebook API 时,他们会使用 Facebook API 用户
我有两个模块: 在一个模块中,我从另一个模块run 中引用了一个函数: @myorg/server import { Client } from '.' import { Middleware } f
我在通过服务器从客户端向客户端发送数据时遇到了一些问题(以避免监听客户端上的端口)。 我有一个这样的服务器: var net = require("net"); var server = net.cr
我正在使用 django.test.client.Client 来测试用户登录时是否显示某些文本。但是,我的 Client 对象似乎并没有让我保持登录状态。 如果使用 Firefox 手动完成,则此测
有两个我制作的程序无法运行。有服务器和客户端。服务器通过给用户一个 ID(从 0 开始)来接受许多客户端。服务器根据服务器的 ID 将命令发送到特定的客户端。 (示例:200 个客户端连接到 1 个服
今天,我在 Windows 10 的“程序和功能”列表中看到了 2 个不同版本的 ARC,因此我选择卸载旧版本,因为我需要一些空间。在卸载结束时,它们都消失了! 所以,我从 https://insta
在每个新的客户端连接上 fork 服务器进程 不同的进程(服务器的其他子进程,即 exec)无法识别在 fork 子进程中使用相同 fd 的客户端。 如何在其他进程上区分客户端? 如果文件描述符为新
a和b有什么区别? >>> import boto3 >>> a = boto3.Session().client("s3") >>> b = boto3.client("s3") >>> a ==
a和b有什么区别? >>> import boto3 >>> a = boto3.Session().client("s3") >>> b = boto3.client("s3") >>> a ==
我是一名优秀的程序员,十分优秀!