gpt4 book ai didi

apache-kafka - 我能否从 Kafka 主题的特定分区中读取数据

转载 作者:行者123 更新时间:2023-12-02 04:02:52 25 4
gpt4 key购买 nike

我正在使用 spring-kafka 框架来实现 kafka 监听器。目前我正在阅读一个主题。但是,根据特定要求,我被要求提供一种实现监听器的方法,该监听器将从 kafka 主题的某些分区中读取数据。

让不同的应用程序从不同的分区读取数据是一个好习惯吗?您能否帮助我了解如何通过 spring-kafka 和 java 实现这一点。

提前致谢。

最佳答案

用于配置KafkaMessageListenerContainerContainerProperties具有以下构造函数:

public ContainerProperties(TopicPartitionInitialOffset... topicPartitions) {

它的内容是:

**
* A configuration container to represent a topic name, partition number and, optionally,
* an initial offset for it. The initial offset can be:
* <ul>
* <li>{@code null} - do nothing;</li>
* <li>positive (including {@code 0}) - seek to EITHER the absolute offset within the
* partition or an offset relative to the current position for this consumer, depending
* on {@link #isRelativeToCurrent()}.
* </li>
* <li>negative - seek to EITHER the offset relative to the current last offset within
* the partition: {@code consumer.seekToEnd() + initialOffset} OR the relative to the
* current offset for this consumer (if any), depending on
* {@link #isRelativeToCurrent()}.</li>
* </ul>
* Offsets are applied when the container is {@code start()}ed.
*
* @author Artem Bilan
* @author Gary Russell
*/
public class TopicPartitionInitialOffset {

因此,您可以通过这种方式指定从哪个主题的哪个分区以及开始使用哪个偏移量。

当我们使用基于分区的配置时,我们最终会在 KafkaConsumer 上得到这个函数:

/**
* Manually assign a list of partitions to this consumer. This interface does not allow for incremental assignment
* and will replace the previous assignment (if there is one).
* <p>
* If the given list of topic partitions is empty, it is treated the same as {@link #unsubscribe()}.
* <p>
* Manual topic assignment through this method does not use the consumer's group management
* functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic
* metadata change. Note that it is not possible to use both manual partition assignment with {@link #assign(Collection)}
* and group assignment with {@link #subscribe(Collection, ConsumerRebalanceListener)}.
* <p>
* If auto-commit is enabled, an async commit (based on the old assignment) will be triggered before the new
* assignment replaces the old one.
*
* @param partitions The list of partitions to assign this consumer
* @throws IllegalArgumentException If partitions is null or contains null or empty topics
* @throws IllegalStateException If {@code subscribe()} is called previously with topics or pattern
* (without a subsequent call to {@link #unsubscribe()})
*/
@Override
public void assign(Collection<TopicPartition> partitions) {

因此,这确实是直接从特定分区使用的标准做法。

关于apache-kafka - 我能否从 Kafka 主题的特定分区中读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51043930/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com