gpt4 book ai didi

kubernetes - 如何处理K8s go client informer错误事件

转载 作者:行者123 更新时间:2023-12-05 06:15:48 24 4
gpt4 key购买 nike

我已经编写了一个基于 Go 的 K8s 客户端应用程序来连接 K8s 集群。为了处理来自 K8s 集群的 Pod、Namespace 和 Node 的实时通知(添加、删除、更新),我编写了一个 informer。代码片段如下。

我想特别注意“runtime.HandleCrash()”函数,(我猜)它有助于将运行时 panic /错误重定向到 panic 文件。

// Read the ES config.
panicFile, _ := os.OpenFile("/var/log/panicfile", os.O_WRONLY|os.O_CREATE|os.O_SYNC, 0644)
syscall.Dup2(int(panicFile.Fd()), int(os.Stderr.Fd()))

查看下面的一些错误,这些错误是在 panic 文件中报告/收集的。

我的问题是:有什么方法可以让 informer 向我的应用程序报告/通知特定错误,而不是写入 panic 文件?这样,我的应用程序就能够更优雅地处理这个预期事件。

有什么方法可以注册回调函数(类似于 Informer.AddEventHandler())。

func (kcv *K8sWorker) armK8sPodListeners() error {

// Kubernetes serves an utility to handle API crashes
defer runtime.HandleCrash()

var sharedInformer = informers.NewSharedInformerFactory(kcv.kubeClient.K8sClient, 0)

// Add watcher for the Pod.
kcv.podInformer = sharedInformer.Core().V1().Pods().Informer()
kcv.podInformerChan = make(chan struct{})

// Pod informer state change handler
kcv.podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
// When a new pod gets created
AddFunc: func(obj interface{}) {
kcv.handleAddPod(obj)
},
// When a pod gets updated
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
kcv.handleUpdatePod(oldObj, newObj)
},
// When a pod gets deleted
DeleteFunc: func(obj interface{}) {
kcv.handleDeletePod(obj)
},
})

kcv.nsInformer = sharedInformer.Core().V1().Namespaces().Informer()
kcv.nsInformerChan = make(chan struct{})

// Namespace informer state change handler
kcv.nsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
// When a new namespace gets created
AddFunc: func(obj interface{}) {
kcv.handleAddNamespace(obj)
},
// When a namespace gets updated
//UpdateFunc: func(oldObj interface{}, newObj interface{}) {
// kcv.handleUpdateNamespace(oldObj, newObj)
//},
// When a namespace gets deleted
DeleteFunc: func(obj interface{}) {
kcv.handleDeleteNamespace(obj)
},
})

// Add watcher for the Node.
kcv.nodeInformer = sharedInformer.Core().V1().Nodes().Informer()
kcv.nodeInformerChan = make(chan struct{})

// Node informer state change handler
kcv.nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs {
// When a new node gets created
AddFunc: func(obj interface{}) {
kcv.handleAddNode(obj)
},
// When a node gets updated
UpdateFunc: func(oldObj interface{}, newObj interface{}) {
kcv.handleUpdateNode(oldObj, newObj)
},
// When a node gets deleted
DeleteFunc: func(obj interface{}) {
kcv.handleDeleteNode(obj)
},
})

// Start the shared informer.
kcv.sharedInformerChan = make(chan struct{})
sharedInformer.Start(kcv.sharedInformerChan)
log.Debug("Shared informer started")

return nil
}

在一个特定的用例中,我关闭了 K8s 集群,导致线人将错误消息扔到一个 panic 文件中,如下所示。

当我启动 K8s 集群节点时,它停止报告这些错误。

==== output from "/var/log/panicfile" ====== 

E0611 16:13:03.558214 10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Pod: Get https://10.30.8.75:6443/api/v1/pods?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host
E0611 16:13:03.558224 10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Namespace: Get https://10.30.8.75:6443/api/v1/namespaces?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host
E0611 16:13:03.558246 10 reflector.go:125] k8s.io/client-go/informers/factory.go:133: Failed to list *v1.Node: Get https://10.30.8.75:6443/api/v1/nodes?limit=500&resourceVersion=0: dial tcp 10.30.8.75:6443: connect: no route to host

最佳答案

您的问题是:

Is there any way I can register a callback function (similar to Informer.AddEventHandler()).

相信您正在寻找的是 SetWatchErrorHandler()

来自source code :

type SharedInformer interface {
...

// The WatchErrorHandler is called whenever ListAndWatch drops the
// connection with an error. After calling this handler, the informer
// will backoff and retry.
//
// The default implementation looks at the error type and tries to log
// the error message at an appropriate level.
//
// There's only one handler, so if you call this multiple times, last one
// wins; calling after the informer has been started returns an error.
//
// The handler is intended for visibility, not to e.g. pause the consumers.
// The handler should return quickly - any expensive processing should be
// offloaded.
SetWatchErrorHandler(handler WatchErrorHandler) error
}

你在 informer 上调用这个函数:

kcv.podInformer.SetWatchErrorHandler(func(r *Reflector, err error) {
// your code goes here
})

这是 DefaultWatchErrorHandler .

关于kubernetes - 如何处理K8s go client informer错误事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62335565/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com