gpt4 book ai didi

apache-storm - Storm 拓扑中的可选流

转载 作者:行者123 更新时间:2023-12-04 16:08:41 28 4
gpt4 key购买 nike

我们有一个相当简单的 Storm 拓扑,让人头疼。

我们的一个 bolt 可以发现它正在处理的数据是有效的,并且每件事都正常进行,或者它可以发现它是无效但可以修复的。在这种情况下,我们需要将其发送以进行一些额外处理。

我们尝试使用单独的 bolt 和流将此步骤作为拓扑的一部分。

declarer.declareStream(NORMAL_STREAM, getStreamFields());
declarer.declareStream(ERROR_STREAM, getErrorStreamFields());

在 execute 方法的末尾跟一些类似下面的东西。

if(errorOutput != null) {
collector.emit(ERROR_STREAM, input, errorOutput);
}
else {
collector.emit(NORMAL_STREAM, input, output);
}

collector.ack(input);

这确实有效,但是它有一个破坏性的影响,导致所有没有沿着这个错误路径走下去的元组失败,并被 spout 无休止地重新发送。

我认为这是因为错误 bolt 无法为它未收到的消息发送确认,但确认事件会等待拓扑中的所有 bolt 确认,然后再将确认发送回喷口。至少,删除错误处理 bolt 会导致所有东西都正确地返回到 spout。

实现这样的目标的最佳方法是什么?

最佳答案

error bolt 可能比您怀疑的要慢,导致在 error_stream 上备份,这反过来又导致备份到您的第一个 bolt 并最终导致元组开始超时。当一个元组超时时,它会被 spout 重新发送。

尝试:

  1. 增加超时配置 (topology.message.timeout.secs),
  2. 限制来自 spout 的飞行中元组的数量(topology.max.spout.pending)和/或
  3. 增加 bolt 的并行度

关于apache-storm - Storm 拓扑中的可选流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28047372/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com