gpt4 book ai didi

java - 无法从开始获得STORM NEW VERSION(1.0.1)中的消息

转载 作者:行者123 更新时间:2023-12-02 21:19:23 25 4
gpt4 key购买 nike

我们正尝试从使用 STORM版本0.9.3 开始在kafka中获取消息,并能够将其放入HBASE TABLE中。

为此,我们使用的配置是:

kafkaConfig.forceFromStart = true;

因此,我们从 OFFSET 0 获得消息,即从hbase表中开始,即完整消息。

但是,当我们尝试使用 STORM VERSION 1.0.1 从kafka开头获取消息并将其放入HBASE TABLE中时,我们仅获得了最后一条消息。我们没有从一开始就获得 OFFSET 0 的消息(即,最后添加的消息能够从一开始就获得它)。

我们使用的配置:
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.useStartOffsetTimeIfOffsetOutOfRange = true;
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
kafkaConfig.ignoreZkOffsets = false;
kafkaConfig.maxOffsetBehind = Long.MAX_VALUE;
kafkaConfig.startOffsetTime = -2;

任何帮助表示赞赏。

最佳答案

如果要强制使用者使用指定的偏移量而不是从Zookeeper读取它,则必须将ignoreZkOffsets设置为true。

storm-kafka的文档中:

This means that when a topology has run once the setting KafkaConfig.startOffsetTime will not have an effect for subsequent runs of the topology because now the topology will rely on the consumer state information (offsets) in ZooKeeper to determine from where it should begin (more precisely: resume) reading. If you want to force the spout to ignore any consumer state information stored in ZooKeeper, then you should set the parameter KafkaConfig.ignoreZkOffsets to true. If true, the spout will always begin reading from the offset defined by KafkaConfig.startOffsetTime as described above.



因此,要使用队列开头的消息,请使用以下配置:
kafkaConfig.ignoreZkOffsets = true;
kafkaConfig.startOffsetTime = kafka.api.OffsetRequest.EarliestTime();

关于java - 无法从开始获得STORM NEW VERSION(1.0.1)中的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37836631/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com