gpt4 book ai didi

spring - 水平缩放 spring-kafka 消费者应用程序

转载 作者:行者123 更新时间:2023-12-05 04:53:48 27 4
gpt4 key购买 nike

我想知道根据水平扩展实例的最大数量配置分区数量的好方法是什么。

假设我有一个主题有 6 个分区。

我有一个应用程序使用 ConcurrentKafkaListenerContainerFactorysetConcurrency6。这意味着我将拥有 6 KafkaMessageListenerContainer,每个都使用一个线程并使用来 self 所有分区的均匀分布的消息。

如果以上是正确的,那么我想知道如果我通过添加另一个实例水平扩展应用程序会发生什么情况?如果新实例具有相同的 6 并发配置,当然还有相同的消费者组,我相信第二个实例将不会使用任何消息。因为不会发生再平衡,因为每个现有消费者都会分配一个分区。

但是如果我们回到第一个例子,有一个 6 分区,其中一个实例的并发度为 3,那么每个消费者线程/KafkaMessageListenerContainer 都会有 分配了 2 个分区。如果我们扩展这个应用程序(相同的消费者组 ID 和 3 的并发),我相信会发生重新平衡,并且两个实例将分别从 3 个分区中消费。

这些假设是否正确?如果不正确,您应该如何处理这种情况?

最佳答案

一般来说,您的假设对于默认行为是正确的,它基于:

/**
* <p>The range assignor works on a per-topic basis. For each topic, we lay out the available partitions in numeric order
* and the consumers in lexicographic order. We then divide the number of partitions by the total number of
* consumers to determine the number of partitions to assign to each consumer. If it does not evenly
* divide, then the first few consumers will have one extra partition.
*
* <p>For example, suppose there are two consumers <code>C0</code> and <code>C1</code>, two topics <code>t0</code> and
* <code>t1</code>, and each topic has 3 partitions, resulting in partitions <code>t0p0</code>, <code>t0p1</code>,
* <code>t0p2</code>, <code>t1p0</code>, <code>t1p1</code>, and <code>t1p2</code>.
*
* <p>The assignment will be:
* <ul>
* <li><code>C0: [t0p0, t0p1, t1p0, t1p1]</code></li>
* <li><code>C1: [t0p2, t1p2]</code></li>
* </ul>
*
* Since the introduction of static membership, we could leverage <code>group.instance.id</code> to make the assignment behavior more sticky.
* For the above example, after one rolling bounce, group coordinator will attempt to assign new <code>member.id</code> towards consumers,
* for example <code>C0</code> -&gt; <code>C3</code> <code>C1</code> -&gt; <code>C2</code>.
*
* <p>The assignment could be completely shuffled to:
* <ul>
* <li><code>C3 (was C0): [t0p2, t1p2] (before was [t0p0, t0p1, t1p0, t1p1])</code>
* <li><code>C2 (was C1): [t0p0, t0p1, t1p0, t1p1] (before was [t0p2, t1p2])</code>
* </ul>
*
* The assignment change was caused by the change of <code>member.id</code> relative order, and
* can be avoided by setting the group.instance.id.
* Consumers will have individual instance ids <code>I1</code>, <code>I2</code>. As long as
* 1. Number of members remain the same across generation
* 2. Static members' identities persist across generation
* 3. Subscription pattern doesn't change for any member
*
* <p>The assignment will always be:
* <ul>
* <li><code>I0: [t0p0, t0p1, t1p0, t1p1]</code>
* <li><code>I1: [t0p2, t1p2]</code>
* </ul>
*/
public class RangeAssignor extends AbstractPartitionAssignor {

但是,您可以通过 partition.assignment.strategy 消费者属性插入任何 ConsumerPartitionAssignor:https://kafka.apache.org/documentation/#consumerconfigs_partition.assignment.strategy

另请参阅 ConsumerPartitionAssignor JavaDocs 了解更多信息及其实现,以便为您的用例做出选择。

关于spring - 水平缩放 spring-kafka 消费者应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/65946520/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com