gpt4 book ai didi

java - Storm 中的延迟队列实现——Kafka、Cassandra、Redis 或 Beanstalk?

转载 作者:搜寻专家 更新时间:2023-11-01 03:34:30 26 4
gpt4 key购买 nike

我有一个 Storm 拓扑来处理来自 Kafka 的消息,并根据手头的任务在 Cassandra 中进行 HTTP 调用/保存。我一收到消息就处理。由于来自外部源(如 HTTP)的响应,很少有消息未被完全处理。我想实现一个指数退避机制来重试,以防 HTTP 服务器没有响应/返回错误消息以在一段时间后重试。我想不出几个可以用来实现它们的想法。如果有任何其他我可以使用的容错解决方案,我也想知道它们中的哪一个将是更好的解决方案。由于这用于实现指数退避,每条消息将具有不同的延迟时间。

  • Kafka 中发送另一个主题,稍后使用。 我的首选解决方案。我知道我们可以使用 Kafka 偏移量,以便在后期使用消息。我怎么找不到文档/示例代码来做同样的事情。如果有人能帮助我解决这个问题,那将非常有帮助。
  • 编写消息 Cassandra/Redis 并编写一个调度程序来获取未处理并准备好使用的消息并将其发送到 Kafka,以便我的 Storm 拓扑可以使用它。 (其他遗留项目(非 Storm)中的现有解决方案)
  • 延迟发送到 Beanstalk(其他遗留项目(非 Storm)中的现有解决方案。我怎么想避免使用这个解决方案,只有在我无法选择时才使用它)。

虽然这几乎是我想做的事情。我无法找到实现 delayProcessingUntil 的文档,如 Kafka - Delayed Queue implementation using high level consumer 中所述

我已经从数据存储中完成了预定的工作,并且在过去延迟使用 Beanstalk,但我更愿意使用 Kafka。

最佳答案

Kafka spout 有一个内置的指数退避消息重试。您可以通过 spout 配置来配置初始延迟、延迟乘数和最大延迟。如果bolt有错误,可以调用collector.fail(input)。之后你就让它喷出重试。

https://github.com/apache/storm/blob/v0.10.0/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java

关于java - Storm 中的延迟队列实现——Kafka、Cassandra、Redis 或 Beanstalk?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35385162/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com