gpt4 book ai didi

apache-kafka - Storm Kafka Spout 上的最大元组重放次数

转载 作者:行者123 更新时间:2023-12-04 08:23:34 24 4
gpt4 key购买 nike

我们将 Storm 与 Kafka Spout 一起使用。当消息失败时,我们希望重放它们,但在某些情况下,坏数据或代码错误会导致消息始终无法通过 Bolt,因此我们将进入无限重放循环。显然,当我们发现错误时,我们正在修复它们,但希望我们的拓扑结构具有一般的容错性。在重放 N 次以上后,我们如何 ack() 一个元组?

查看 Kafka Spout 的代码,我发现它被设计为使用指数退避计时器和 comments on the PR 重试。状态:

“spout 不会终止重试周期(我确信它不应该这样做,因为它无法报告有关中止请求的失败的上下文),它只处理延迟重试。拓扑中的一个 bolt 是仍然期望最终调用 ack() 而不是 fail() 来停止循环。”

我已经看到 StackOverflow 响应建议编写自定义 spout,但如果有推荐的方法在 Bolt 中执行此操作,我宁愿不坚持维护 Kafka Spout 内部的自定义补丁。

在 Bolt 中执行此操作的正确方法是什么?我在元组中没有看到任何状态显示它被重放了多少次。

最佳答案

Storm 本身不为您的问题提供任何支持。因此,定制的解决方案是唯一的出路。即使你不想打补丁 KafkaSpout ,我认为,引入一个计数器并打破其中的重播循环,将是最好的方法。作为替代方案,您也可以从 KafkaSpout 继承。并在您的子类中放置一个计数器。这当然有点类似于补丁,但可能不那么具有侵入性并且更容易实现。

如果您想使用 Bolt,您可以执行以下操作(这也需要对 KafkaSpout 或其子类进行一些更改)。

  • 为每个元组分配一个唯一 ID 作为附加属性(可能已经有一个唯一 ID 可用;否则,您可以引入“计数器 ID”或仅引入整个元组,即所有属性,以标识每个元组)。
  • KafkaSpout 后面插入 bolt 通过 fieldsGrouping在 ID 上(以确保重放的元组被流式传输到同一个 Bolt 实例)。
  • 在您的 bolt 内,使用 HashMap<ID,Counter>缓冲所有元组并计算(重新)尝试的次数。如果计数器小于您的阈值,则转发输入元组,以便由后面的实际拓扑处理(当然,您需要适本地 anchor 定元组)。如果计数大于您的阈值,确认元组以打破循环并从 HashMap 中删除其条目(您可能还想记录所有失败的元组)。
  • 为了从 HashMap 中删除成功处理的元组, 每次在 KafkaSpout 中确认一个元组您需要将元组 ID 转发到 bolt ,以便它可以从 HashMap 中删除元组.只需为您的 KafkaSpout 声明第二个输出流子类化和覆盖 Spout.ack(...) (当然,您需要调用 super.ack(...) 以确保 KafkaSpout 也得到确认)。

  • 不过,这种方法可能会消耗大量内存。作为在 HashMap 中为每个元组设置一个条目的替代方法您还可以使用第三个流(与其他两个流一样连接到 bolt),如果元组失败(即在 Spout.fail(...) 中),则转发一个元组 ID。每次,bolt 收到来自第三个流的“失败”消息,计数器都会增加。只要 HashMap中没有条目(或未达到阈值),bolt 只是转发元组进行处理。这应该会减少使用的内存,但需要在你的 spout 和 bolt 中实现更多的逻辑。

    这两种方法都有缺点,即每个确认的元组都会为您新引入的 bolt 生成一条额外的消息(从而增加网络流量)。对于第二种方法,您似乎只需要为之前失败的元组向 Bolt 发送“ack”消息。但是,您不知道哪些元组失败了,哪些没有。如果你想摆脱这个网络开销,你可以引入第二个 HashMapKafkaSpout缓冲失败消息的 ID。因此,如果失败的元组被成功重放,您只能发送“ack”消息。当然,这第三种方法使得要实现的逻辑更加复杂。

    未经修改 KafkaSpout在某种程度上,我看不到您的问题的解决方案。我个人会修补 KafkaSpout或者将使用第三种方法与 HashMapKafkaSpout子类和 bolt (因为与前两种解决方案相比,它消耗的内存很少,并且不会给网络带来很多额外的负载)。

    关于apache-kafka - Storm Kafka Spout 上的最大元组重放次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32912037/

    24 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com