gpt4 book ai didi

java - Apache Storm : Make an Unscalable Resource Scalable

转载 作者:行者123 更新时间:2023-11-30 10:47:47 27 4
gpt4 key购买 nike

我最近开始使用 Apache Storm 向大数据世界介绍自己。我遇到了以下问题,想了很多如何解决它,但我所有的方法似乎都很幼稚。

技术

Apache Storm 0.9.3,Java 1.8.0_20

上下文

有一个很大的 xml 文件 (~400MB) 需要逐行读取 (xml-file-spout)。然后,每个读取的文件行都由一系列 bolt 发出并处理。

它必须是有保证的消息处理(发射锚定...)

问题

就文件相当大(包含大约 200 亿行)而言,我使用扫描仪读取它,基于缓冲流而不是将整个文件加载到内存中。到目前为止,一切都很好。当处理过程中某处出现错误时,问题就会出现:xml-file-spout 本身已死,或者存在一些内部问题......

  1. Nimbus 将重启 spout,但整个处理从头开始;
  2. 这种方法根本无法扩展。

解决思路

解决第一个问题的最初想法是将当前状态保存在某个地方:分布式缓存、JMS 队列、本地磁盘文件。当 spout 打开时,它应该找到这样的存储,读取状态并从指定的文件行开始。这里我也想过将状态存储在Storm的Zookeeper中,但不知道是否可以从spout中寻址Zookeeper(有没有这种能力)?您能否为此建议最佳做法?

对于问题 2,我考虑将初始文件分解为一组子文件并并行处理它们。这可以通过引入一个新的“breaking”spout 来完成,其中每个文件都将由一个专用的 bolt 处理。在这种情况下,有保证的处理会引发大问题,因为如果出现错误,则必须完全重新处理包含失败行的子文件(喷口的确认/失败方法)...您能否提出最佳实践解决这个问题?

更新

好的,到目前为止我做了什么。

先决条件

以下拓扑之所以有效,是因为它的所有部分(spouts 和 bolts)都是幂等的。

  1. 引入了一个单独的 spout,它读取文件行(一个接一个)并将它们发送到一个中间 ActiveMQ 队列('file-line-queue')以便能够轻松重放失败的文件行(参见下一步);

  2. 为“file-line-queue”队列创建了一个单独的 spout,它接收每个文件行并将其发送到后续的 bolts。就我使用有保证的消息处理而言,如果任何 bolt 失败,都会重新处理消息,如果 bolt 链成功,则会确认相应的消息(CLIENT_ACKNOWLEDGE 模式)。

如果第一个(文件读取)spout 失败,将抛出 RuntimeException,这会终止 spout。稍后,专门的主管会重新启动 spout,从而重新读取 inout 文件。这将导致重复的消息,但只要一切都是幂等的,这不是问题。此外,这里值得考虑一个状态存储库以减少重复...

新刊

为了使中间 JMS 更加可靠,我添加了一个异常监听器,用于为消费者和生产者恢复连接和 session 。问题出在消费者身上:如果 session 已恢复并且我在 bolt 处理过程中有一条 JMS 消息未被确认,在成功处理后我需要确认它,但就 session 是新的而言,我收到 '找不到相关 ID' 问题。

有人可以建议如何处理吗?

最佳答案

先回答你的问题:

  1. 是的,你可以将状态存储在像 Zookeeper 这样的地方,并使用像 Apache Curator 这样的库。来处理这个问题。
  2. 分解文件可能会有所帮助,但仍然无法解决您必须管理状态的问题。

让我们在这里谈谈设计。 Storm 是为流而不是批处理而构建的。在我看来,更适合批处理的 Hadoop 技术在这里会更好:MapReduce、Hive、Spark 等。

如果您打算使用 storm,那么它将有助于将数据流式传输到更容易使用的地方。您可以将文件写入 Kafka 或队列,以帮助解决管理状态、确认/失败和重试的问题。

关于java - Apache Storm : Make an Unscalable Resource Scalable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36035135/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com