- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并配置使用BaseRichBolt、一个KafkaSpout和两个 bolt bolt-A、Bolt-B,一旦bolt-B确认,元组就会锚定在bolt-A中,它将被视为成功处理的元组并且它将被 promise 。但是,问题是由于某种原因,一些失败的消息在 KafkaSpout 中重复。
例如
KafkaSpout 在处理时发出了 1000 个元组,由于某种原因,近 20 个元组失败了(在 Bolt-B 处)。这 20 个元组连续重播,在某个时刻,worker 被杀死,supervisor 重新启动了worker,这 20 个元组再次重播,这次它成功处理,但处理了多次(重复)。
但是,我希望这些元组只能处理一次(成功)。我已将 topology.enable.message.timeouts 设置为 false。我的另一个问题是Storm 在哪里存储那些失败的 Kafka 偏移详细信息。我在动物园管理员上没有找到它,它只有以下详细信息。
{"topology":{"id":"test_Topology-12-1508938595","name":"test_Topology"},"offset":505,"partition":2,"broker":{"host":“127.0.0.1”,“端口”:9092},“主题”:“test_topic_1”}
最佳答案
禁用消息超时可能会导致消息丢失,如果您需要处理所有消息,您可能需要重新考虑禁用它。
启用确认时,Storm 提供至少一次处理保证。您可能想看看是否可以使您的 bolt 幂等,这样重播就不会给您带来问题。或者您可以查看 https://storm.apache.org/releases/1.1.1/Trident-tutorial.html ,它提供一次性状态更新。
编辑:您可能需要重新考虑您的问题。据我所知,没有一个流处理系统能够提供您想要的一次性处理。
Trident 提供的精确一次语义是,Trident 将帮助您使状态更新幂等,因此从数据存储的角度来看,消息“看起来”只被处理一次。处理仍然是至少一次。请参阅 https://storm.apache.org/releases/2.0.0-SNAPSHOT/Trident-state.html 处的“事务性 spouts”部分(可能还有页面的其余部分)。以获得关于这将如何运作的直觉。基本思想是在数据存储中存储有关哪些消息已被写入的信息,这样如果它们被重复,状态更新代码可以忽略它们。
您可能还想阅读https://streaml.io/blog/exactly-once 。我想说的是,Flink 实现了类似于那里描述的分布式快照算法的东西,这是在至少一次系统中模拟精确一次的不同方式。
关于java - Storm KafkaSpout 失败元组重复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46946895/
我能够使用本地集群运行storm Kafka,但无法使用storm Submitter运行,下面是我的拓扑代码 谁能帮我解决这个问题:) package com.org.kafka; import o
关闭。这个问题需要debugging details .它目前不接受答案。 编辑问题以包含 desired behavior, a specific problem or error, and th
我正在使用storm-kafka-1.1.1-plus和storm 1.1.1。并配置使用BaseRichBolt、一个KafkaSpout和两个 bolt bolt-A、Bolt-B,一旦bolt-
我正在尝试编译和运行 storm-kafka-starter 项目 https://github.com/TheHydroImpulse/storm-kafka-starter KafkaTopolo
我的拓扑结构使用默认的 KafkaSpout 实现。在一些非常受控的测试中,我注意到 spout 失败了元组,即使我的 bolt 都没有失败任何元组,而且我确信所有消息都在我配置的超时内得到了很好的处
我正在尝试装配一个 Kafka-Storm“Hello World”系统。我安装并运行了 Kafka,当我使用 Kafka 生产者发送数据时,我可以使用 Kafka 控制台消费者读取它。 我从 O'R
我需要查看storm通过其KafkaSpout读取的偏移值。这是我传入的配置: SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, "some
出于某种原因,当我尝试在 Storm 集群上运行拓扑时出现以下错误: java.lang.NoClassDefFoundError: Could not initialize class org.ap
我正在使用 Kafka-Storm 集成。 Kafka 将数据加载到队列中,Kafka Spout 将拉取数据和进程。我有以下设计。 Kafka -> Queue -> KafkaSpout -> P
我的问题是 Storm KafkaSpout 在一段时间后停止使用来自 Kafka 主题的消息。当在 Storm 中启用调试时,我得到这样的日志文件: 2016-07-05 03:58:26.097
我使用的是 storm 0.9.3。我正在尝试为我的拓扑关闭每个元组的确认。我将 Config.TOPOLOGY_ACKER_EXECUTORS 设置为 0,并将 maxSpoutPending 设置
我开发了 storm 拓扑来从 hortonworks 上的 kafka 代理接收 JSONArray 数据, 我不知道为什么我的 kafkaSpout 不使用 HDP 中 Kafka Brokers
我是一名优秀的程序员,十分优秀!