gpt4 book ai didi

apache-storm - Storm 如何知道消息何时是 "fully processed"?

转载 作者:行者123 更新时间:2023-12-04 04:12:47 25 4
gpt4 key购买 nike

(还有几个关于超时和 maxSpoutPending 的问题)

我在 Storm 文档中看到很多关于消息被完全处理的引用。但是我的 KafkaSpout 如何知道消息何时被完全处理?

希望它知道我的 bolt 连接方式,所以当我的 Stream 中的最后一个 bolt 确认一个元组时,spout 知道我的消息何时被处理?

否则,我会想象在超时期限到期后,将检查消息的确认状态,如果确认/ anchor 定 XOR 指示,则将其视为已处理。但我希望不是这样?

我也有关于 maxTuplesPending 和超时配置的相关问题。

如果我将 maxTuplePending 设置为 10k,那么我是否认为每个 spout 实例将继续发出元组,直到该 spout 实例正在跟踪 10k 元组,10k 元组尚未完全处理?然后当当前正在传输的消息被完全处理时会发出一个新的元组?

最后,这与超时配置有关吗?在发送新消息之前,spout 是否以任何方式等待配置的超时发生?或者超时配置是否仅在消息被停止/处理缓慢时才起作用,导致由于超时而失败?

更简洁(或者希望更清楚),将我的超时设置为 30 分钟是否有影响,除非消息在 30 分钟内被最终 Bolt 确认,否则消息不会失败?或者是否有其他影响,例如超时配置会影响 spouts 的发射率?

抱歉问了这么长、漫无目的的问题。提前感谢您的任何回复。

*编辑以进一步澄清

这对我来说是一个问题,因为我的消息不一定贯穿整个 Stream。

假设我有 bolt A、B、C、D。大多数情况下,消息将从 A->B->->D 传递。但是我有一些消息会故意停止在 bolt A 上。 A 会确认它们但不会发出它们(因为我的业务逻辑,在这些情况下,我确实希望对消息进行进一步处理)。

那么我的 KafkaSpout 会知道已确认但未从 A 发出的消息已完全处理吗?因为我希望在 Bolt A 完成后立即从 spout 发出另一条消息,在这种情况下。

最佳答案

Storm 通过 UDF 代码必须使用的 anchor 定机制跟踪整个拓扑中的元组。这种 anchor 定导致了所谓的元组树,树的根是 spout 发出的元组,所有其他节点(在树结构中连接)表示从使用输入元组作为 anchor 的 bolts 发出的元组(这只是一个逻辑模型,并没有在 Storm 中以这种方式实现,尽管如此)。

例如,Spout 发出一个句子元组,该元组被单词中的第一个 bolt 分割,一些单词由第二个 bolt 过滤,并且单词计数由第三个 bolt 应用。最后,sink bolt 将结果写入文件。这棵树看起来像这样:

"this is an example sentence" -+-> "this" 
+-> "is"
+-> "an"
+-> "example" -> "example",1 -> "example",1
+-> "sentence" -> "sentence",1 -> "sentence",1

最初的句子由 spout 发出,由 bolt1 用作所有发出的 token 的 anchor ,并由 bolt1 确认。 Bolt2 过滤掉“this”、“is”和“an”,并只确认三个元组。 “example”和“sentence”只是被转发,用作输出元组的 anchor ,然后被确认。同样发生在 bolt2 中,最后一个 sink bolt 只是确认所有传入的元组。

此外,Storm 跟踪所有元组的所有 ack,即来自中间 bolt 和沉没 bolt 。首先,spout 将输出元组的 ID 发送给 acker 任务。每次将元组用作 anchor 时,acker 还会收到一条消息,其中包含 anchor 元组 ID 和输出元组 ID(由 Storm 自动生成)。来自 Bolt 的 ackes 也转到对它们进行 XOR 的相同 acker 任务。如果收到所有确认——即,对于 spout 和所有递归 anchor 定的输出元组——(异或结果将为零),acker 向 spout 发送一条消息,表示元组已被完全处理并且对 Spout.ack(MessageId) 的回调用发生(即,当元组被完全处理时,回调用立即完成)。此外,ackers 会定期检查是否存在由 acker 注册的元组超过超时时间。如果发生这种情况,acker 会丢弃元组 ID,并向 spout 发送一条消息,表示元组失败(导致调用 Spout.fail(MessageId) )。

此外,Spouts 会保留所有正在运行的元组的计数,如果此计数超过 Spout.nextTuple() 参数,则停止调用 maxTuplesPending。据我所知,该参数是全局应用的,即每个 spout 任务的本地计数相加,并将全局计数与参数进行比较(但不确定具体是如何实现的)。

所以 timeout 参数独立于 maxTuplesPending

关于apache-storm - Storm 如何知道消息何时是 "fully processed"?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33546052/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com