gpt4 book ai didi

spring-cloud-stream kafka 消费者并发

转载 作者:行者123 更新时间:2023-12-02 21:06:28 27 4
gpt4 key购买 nike

使用spring-cloud-stream的kafka binder,如何配置并发消息消费者(在单个消费者jvm中)?如果我理解正确的话,使用kafka时并发消息消费需要分区,但是s-c-s docs指示要使用分区,您需要通过partitionKeyExpression或partitionKeyExtractorClass在生产者中指定分区选择。 Kafka 文档提到了循环分区。

s-c-s 文档根本没有提到 spring.cloud.stream.bindings.*.concurrency,尽管这在我上面描述的用例中似乎确实很重要。使用生产者配置

spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/json
partitionCount: 3

和消费者配置

spring:
cloud:
stream:
bindings:
customer-save:
destination: customer-save
group: customer-save
content-type: application/x-java-object;type=foo.Customer
partitioned: true
concurrency: 3

我似乎得到了我想要的行为(至少在某种程度上)。我可以看到有时有 3 个消费者线程处于事件状态,尽管似乎确实存在除循环之外的一些分区,因为某些消息似乎在等待繁忙的消费者线程,并在该线程完成后被消耗。我认为这是因为消息被发送到同一分区。

当我没有指定partitionKeyExpression或partitionKeyExtractorClass时,生产者是否会使用一些默认的键提取和分区策略?这是使用 kafka 设置 s-c-s 消费者的合适方法,您希望多个线程消费消息以增加消费者吞吐量吗?

最佳答案

由于您的生产者未分区(没有设置 partitionKeyExpression),生产者端将在 3 个分区上循环(如果这不是观察到的行为,请在 Git Hub 中打开一个票证) )。如果您配置了 partitionKeyExpression,那么生产者将根据配置的逻辑有效地对数据进行分区。

在消费者方面,我们确保线程/分区亲和性,因为这是广泛遵守的 Kafka 约定 - 我们确保给定分区上的消息按顺序处理 - 这可能解释了您正在观察的行为。如果将消息 A、B、C、D 发送到分区 0、1、2、0 - D 将必须等待 A 被处理,即使还有其他两个线程可用。

增加吞吐量的一个选择是过度分区(这是 Kafka 中相当典型的策略)。这会进一步分散消息,并增加消息发送到不同线程的机会。

如果您不关心排序,则提高吞吐量的另一个选择是在下游异步处理消息:例如通过将输入 channel 桥接至 ExecutorChannel。

一般来说,partitioned是指客户端接收分区数据的能力(Kafka客户端始终是分区的,但此设置也适用于Rabbit和/或Redis)。它与属性 instanceIndexinstanceCount 结合使用,以确保主题的分区在多个应用程序实例之间正确划分(另请参阅 http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count )

关于spring-cloud-stream kafka 消费者并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35854603/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com