- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
k8s client-go k8s informers实现了持续获取集群的所有资源对象、监听集群的资源对象变化功能,并在本地维护了全量资源对象的内存缓存,以减少对apiserver、对etcd的请求压力。Informers在启动的时候会首先在客户端调用List接口来获取全量的对象集合,然后通过Watch接口来获取增量的对象,然后更新本地缓存。
Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生。
Processor根据Controller的通知,即根据对象的变化事件类型,调用相应的ResourceEventHandler来处理对象的变化。
先通过一张informer概要架构图看一下Controller&Processor所处位置与概要功能。
New用于初始化Controller,方法比较简单。
// staging/src/k8s.io/client-go/tools/cache/controller.go
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
controller.Run为controller的启动方法,这里主要看到几个点:
(1)调用NewReflector,初始化Reflector;
(2)调用r.Run,实际上是调用了Reflector的启动方法来启动Reflector(Reflector相关的分析前面的博客已经分析过了,这里不再重复);
(3)调用c.processLoop,开始controller的核心处理;
// staging/src/k8s.io/client-go/tools/cache/controller.go
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
wg.StartWithChannel(stopCh, r.Run)
wait.Until(c.processLoop, time.Second, stopCh)
}
controller.processLoop即为controller的核心处理方法。
controller的核心处理方法processLoop中,最重要的逻辑是循环调用c.config.Queue.Pop将DeltaFIFO中的队头元素给pop出来(实际上pop出来的是Deltas,是Delta的切片类型),然后调用c.config.Process
方法来做处理,当处理出错时,再调用c.config.Queue.AddIfNotPresent
将对象重新加入到DeltaFIFO中去。
func (c *controller) processLoop() {
for {
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
根据前面sharedIndexInformer的初始化与启动分析(sharedIndexInformer.Run)可以得知,c.config.Process即为s.HandleDeltas方法,所以接下来看到s.HandleDeltas方法的分析。
根据前面分析知道HandleDeltas要处理的是Deltas,是Delta的切片类型。
再来看到HandleDeltas方法的主要逻辑:
(1)循环遍历Deltas,拿到单个Delta;
(2)判断Delta的类型;
(3)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并调用s.processor.distribute方法;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并调用s.processor.distribute方法;
(4)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并调用s.processor.distribute方法;
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
type updateNotification struct {
oldObj interface{}
newObj interface{}
}
type addNotification struct {
newObj interface{}
}
type deleteNotification struct {
oldObj interface{}
}
至此,Controller的分析就结束了,用一张图来回忆一下Controller的功能与架构。
接下来分析一下前面提到的s.processor.distribute方法。
可以看到distribute方法最终是将构造好的addNotification、updateNotification、deleteNotification对象写入到p.addCh中。
sync类型的对象写入到p.syncingListeners中,但informer中貌似没有启动p.syncingListeners或对p.syncingListeners做处理,所以sync类型的对象变化(也即list操作得到的对象所生成的对象变化)会被忽略?有待验证。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
s.processor.run启动了processor,其中注意到listener.run与listener.pop两个核心方法。
这里可以看到processor的run方法中只启动了p.listeners,没有启动p.syncingListeners。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
分析processorListener的pop方法可以得知,其逻辑实际上就是将p.addCh中的对象给拿出来,然后丢进了p.nextCh中。那么谁来处理p.nextCh呢?继续往下看。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
在processorListener的run方法中,将循环读取p.nextCh,判断对象类型,是updateNotification则调用p.handler.OnUpdate方法,是addNotification则调用p.handler.OnAdd方法,是deleteNotification则调用p.handler.OnDelete方法做处理。
// staging/src/k8s.io/client-go/tools/cache/shared_informer.go
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
而p.handler.OnUpdate、p.handler.OnAdd、p.handler.OnDelete方法实际上就是自定义的的ResourceEventHandlerFuncs了。
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: onUpdate,
DeleteFunc: onDelete,
})
// staging/src/k8s.io/client-go/tools/cache/controller.go
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
func (r ResourceEventHandlerFuncs) OnAdd(obj interface{}) {
if r.AddFunc != nil {
r.AddFunc(obj)
}
}
func (r ResourceEventHandlerFuncs) OnUpdate(oldObj, newObj interface{}) {
if r.UpdateFunc != nil {
r.UpdateFunc(oldObj, newObj)
}
}
func (r ResourceEventHandlerFuncs) OnDelete(obj interface{}) {
if r.DeleteFunc != nil {
r.DeleteFunc(obj)
}
}
至此,Processor的分析也结束了,用一张图来回忆一下Processor的功能与架构。
Controller从DeltaFIFO中pop Deltas出来处理,根据对象的变化更新Indexer本地缓存,并通知Processor相关对象有变化事件发生:
(1)如果是Added、Updated、Sync类型,则从indexer中获取该对象,存在则调用s.indexer.Update来更新indexer中的该对象,随后构造updateNotification struct,并通知Processor;如果indexer中不存在该对象,则调用s.indexer.Add来往indexer中添加该对象,随后构造addNotification struct,并通知Processor;
(2)如果是Deleted类型,则调用s.indexer.Delete来将indexer中的该对象删除,随后构造deleteNotification struct,并通知Processor;
Processor根据Controller的通知,即根据对象的变化事件类型(addNotification、updateNotification、deleteNotification),调用相应的ResourceEventHandler(addFunc、updateFunc、deleteFunc)来处理对象的变化。
在对informer中的Controller与Processor分析完之后,接下来将分析informer中的Indexer。
我遇到了一个错误,我不知道如何解决。我有以下代码(来自 Eliom Graffiti 教程),我正在尝试使用 make test.byte 进行测试。 open Eliom_content.Html5
我阅读文档的理解是这样的: 客户端是测试用例的子类。当我们运行 manage.py test 时,会为每个以“test_”开头的方法创建一个 SimpleTest 类的实例(它继承自 TestCase
我已经编写了一个用于接收多个客户端的服务器,它可以分别与客户端通信。在这里,我可以列出服务器中已连接的客户端,但是当客户端断开连接时,它不会从服务器中删除客户端。 Server.py import s
我正在制作一个社交网站。当任何用户在站点上更新或创建新内容时,我需要查看站点的任何其他用户来查看更改更新。 我有一些需要低延迟的评论,因此建议为此订阅。 我也有事件,但这些不需要这么低的延迟。每 10
我想在突变后使用乐观 UI 更新:https://www.apollographql.com/docs/react/basics/mutations.html 我对“乐观响应”和“更新”之间的关系感到
我想了解 Dask 在本地机器上的使用模式。 具体而言, 我有一个适合内存的数据集 我想做一些 pandas 操作 分组依据... 日期解析 等等 Pandas 通过单核执行这些操作,这些操作对我来说
我使用 Apollo、React 和 Graphcool。我有一个查询来获取登录的用户 ID: const LoginServerQuery = gql` query LoginServerQ
在本指南的帮助下,我最近在几个设备的应用程序中设置了 P2P 通信:http://developer.android.com/training/connect-devices-wirelessly/n
注意:我在节点项目中使用@twilio/conversations 1.1.0 版。我正在从使用可编程聊天过渡到对话。 我看到对 Client.getConversationByUniqueName
我对服务客户端和设备客户端库有点困惑。谁能解答我对此的疑问。 问题:当我通过 deviceClient 发送数据时,我无法接收数据,但当我使用服务客户端发送数据时,相同的代码可以工作。现在,xamar
我对服务客户端和设备客户端库有点困惑。谁能解答我对此的疑问。 问题:当我通过 deviceClient 发送数据时,我无法接收数据,但当我使用服务客户端发送数据时,相同的代码可以工作。现在,xamar
假设我有一个简单的应用程序。 如何设置 OAuth2 以允许其他应用程序访问我的应用程序的某些部分。 例如,当开发人员想要使用 Facebook API 时,他们会使用 Facebook API 用户
我有两个模块: 在一个模块中,我从另一个模块run 中引用了一个函数: @myorg/server import { Client } from '.' import { Middleware } f
我在通过服务器从客户端向客户端发送数据时遇到了一些问题(以避免监听客户端上的端口)。 我有一个这样的服务器: var net = require("net"); var server = net.cr
我正在使用 django.test.client.Client 来测试用户登录时是否显示某些文本。但是,我的 Client 对象似乎并没有让我保持登录状态。 如果使用 Firefox 手动完成,则此测
有两个我制作的程序无法运行。有服务器和客户端。服务器通过给用户一个 ID(从 0 开始)来接受许多客户端。服务器根据服务器的 ID 将命令发送到特定的客户端。 (示例:200 个客户端连接到 1 个服
今天,我在 Windows 10 的“程序和功能”列表中看到了 2 个不同版本的 ARC,因此我选择卸载旧版本,因为我需要一些空间。在卸载结束时,它们都消失了! 所以,我从 https://insta
在每个新的客户端连接上 fork 服务器进程 不同的进程(服务器的其他子进程,即 exec)无法识别在 fork 子进程中使用相同 fd 的客户端。 如何在其他进程上区分客户端? 如果文件描述符为新
a和b有什么区别? >>> import boto3 >>> a = boto3.Session().client("s3") >>> b = boto3.client("s3") >>> a ==
a和b有什么区别? >>> import boto3 >>> a = boto3.Session().client("s3") >>> b = boto3.client("s3") >>> a ==
我是一名优秀的程序员,十分优秀!