- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我们将 Storm 与 Kafka Spout 一起使用。当消息失败时,我们希望重放它们,但在某些情况下,坏数据或代码错误会导致消息始终无法通过 Bolt,因此我们将进入无限重放循环。显然,当我们发现错误时,我们正在修复它们,但希望我们的拓扑结构具有一般的容错性。在重放 N 次以上后,我们如何 ack() 一个元组?
查看 Kafka Spout 的代码,我发现它被设计为使用指数退避计时器和 comments on the PR 重试。状态:
“spout 不会终止重试周期(我确信它不应该这样做,因为它无法报告有关中止请求的失败的上下文),它只处理延迟重试。拓扑中的一个 bolt 是仍然期望最终调用 ack() 而不是 fail() 来停止循环。”
我已经看到 StackOverflow 响应建议编写自定义 spout,但如果有推荐的方法在 Bolt 中执行此操作,我宁愿不坚持维护 Kafka Spout 内部的自定义补丁。
在 Bolt 中执行此操作的正确方法是什么?我在元组中没有看到任何状态显示它被重放了多少次。
最佳答案
Storm 本身不为您的问题提供任何支持。因此,定制的解决方案是唯一的出路。即使你不想打补丁 KafkaSpout
,我认为,引入一个计数器并打破其中的重播循环,将是最好的方法。作为替代方案,您也可以从 KafkaSpout
继承。并在您的子类中放置一个计数器。这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现。
如果您想使用 Bolt,您可以执行以下操作(这也需要对 KafkaSpout
或其子类进行一些更改)。
KafkaSpout
后面插入 bolt 通过 fieldsGrouping
在 ID 上(以确保重放的元组被流式传输到同一个 Bolt 实例)。 HashMap<ID,Counter>
缓冲所有元组并计算(重新)尝试的次数。如果计数器小于您的阈值,则转发输入元组,以便由后面的实际拓扑处理(当然,您需要适本地 anchor 定元组)。如果计数大于您的阈值,确认元组以打破循环并从 HashMap
中删除其条目(您可能还想记录所有失败的元组)。 HashMap
中删除成功处理的元组, 每次在 KafkaSpout
中确认一个元组您需要将元组 ID 转发到 bolt ,以便它可以从 HashMap
中删除元组.只需为您的 KafkaSpout
声明第二个输出流子类化和覆盖 Spout.ack(...)
(当然,您需要调用 super.ack(...)
以确保 KafkaSpout
也得到确认)。 HashMap
中为每个元组设置一个条目的替代方法您还可以使用第三个流(与其他两个流一样连接到 bolt),如果元组失败(即在
Spout.fail(...)
中),则转发一个元组 ID。每次,bolt 收到来自第三个流的“失败”消息,计数器都会增加。只要
HashMap
中没有条目(或未达到阈值),bolt 只是转发元组进行处理。这应该会减少使用的内存,但需要在你的 spout 和 bolt 中实现更多的逻辑。
HashMap
在
KafkaSpout
缓冲失败消息的 ID。因此,如果失败的元组被成功重放,您只能发送“ack”消息。当然,这第三种方法使得要实现的逻辑更加复杂。
KafkaSpout
在某种程度上,我看不到您的问题的解决方案。我个人会修补
KafkaSpout
或者将使用第三种方法与
HashMap
在
KafkaSpout
子类和 bolt (因为与前两种解决方案相比,它消耗的内存很少,并且不会给网络带来很多额外的负载)。
关于apache-kafka - Storm Kafka Spout 上的最大元组重放次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32912037/
你好,我有一张 table : from | to | item | count ------- Jack | Danie| food | 10 Danie| Maria| food | 2 Ja
已关闭。此问题不符合Stack Overflow guidelines 。目前不接受答案。 这个问题似乎偏离主题,因为它缺乏足够的信息来诊断问题。 更详细地描述您的问题或 include a mini
我正在尝试解决以下面试问题 Given two arrays firstDay and lastDay representing the intervals in days of possible m
这个问题已经有答案了: Explanation of a output of a C program involving fork() (2 个回答) 已关闭 9 年前。 这是我从我的研究所去年的试卷
如何在 html 页面上重复一个 div X 次,可以说我想设置方差来声明重复次数。重复这个部分 5 次,我假设它是用 JS 的。 black BLUE WHITE strip 我
我目前使用类中的函数将数据插入数据库,如果每行成功插入(从 csv 文件),则会记录一条消息(logMessage 函数),以显示哪一行成功或失败。但是我想要已导入数据库的成功执行的计数。我遇到了一些
这个问题可能看起来非常基础,但我很难弄清楚如何做。我有一个整数,我需要使用 for 循环来循环整数次。 首先,我尝试了—— fn main() { let number = 10; // An
我正在准备 CS 125 期末考试,其中(简要地)介绍了 Big O Notation。 鉴于: Mergesort 的最佳运行时间为 O(N lg(N)),最坏运行时间为 O(N lg (N)) 有
关闭。这个问题需要更多focused .它目前不接受答案。 想改进这个问题吗? 更新问题,使其只关注一个问题 editing this post . 关闭 3 年前。 Improve this qu
我正在构建一个简单的程序来计算骰子实验中数字的频率,但我尝试扩展它并将最大 throw 次数增加到巨大的数字,通过反复试验,我发现最大限制为519253。 使用这个最大值,我也无法创建任何新数组,它会
这是一道面试题 There is an airline company that wants to provide new updates to all of its flight attendant
我正在尝试以一种可以节省我无数小时的繁琐数据输入的方式实现 Excel 自动化。这是我的问题。 我们需要为所有库存打印条形码,其中包括 4,000 种型号,每种型号都有特定数量。 Shopify是我们
我想根据给定的预定义级别(从级别 1 到级别 6)分离代码中的所有内容,现在我的 JSON 读取 $scope.myJson=[{ id: 1, level: 1, name: "any
我创建了一个菜单,它使用一些 CSS 和 jquery 在悬停时显示其子菜单。事情是,如果用户在菜单项上多次悬停,它会有点滑稽。这是网址:http://91.202.168.37/~ibi/ ,这是
假设我对每小时的事件数进行了如下统计: np.random.seed(42) idx = pd.date_range('2017-01-01', '2017-01-14', freq='1H') df
我想确保我正确理解了这个概念: 在 Hadoop 权威指南中指出:“设计文件系统的目标始终是减少与要传输的数据量相比的查找次数。”在此声明中,作者指的是 Hadoop 逻辑 block 的“seeks
我有一个用 C++11 编写的程序,我想计算 std::vector 的 move 和复制(构造和赋值)次数。对象。有办法吗? 最好的问候 最佳答案 否。 std::vector<>的执行没有办法做到
我们组织的帐户空间不足,我们一直在尝试剔除一些较旧的存储库。问题在于一些较旧的存储库可能仍然是事件服务的依赖项(即使它们多年未更新)。 我知道我们可以跟踪克隆,但据我所知,我们看不到直接下载/pull
我是一名优秀的程序员,十分优秀!