gpt4 book ai didi

java - 如何设置 TOPOLOGY_MAX_SPOUT_PENDING 参数

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:41:35 26 4
gpt4 key购买 nike

在我的拓扑中,我从 Kafka 队列中读取触发消息。收到触发消息后,我需要向 bolt 发送大约 4096 条消息。在 bolt 中,经过一些处理后,它将发布到另一个 Kafka 队列(稍后另一个拓扑将使用它)。

我正在尝试设置 TOPOLOGY_MAX_SPOUT_PENDING 参数来限制发送的消息数量。但我看到它没有效果。是因为我在一个 nextTuple() 方法中发出所有元组吗?如果是这样,应该如何解决?

最佳答案

如果您从 kafka 读取数据,您应该使用 storm 自带的 KafkaSpout。不要尝试实现自己的 spout,相信我,我在生产中使用 KafkaSpout 并且它工作得非常顺利。每条 Kafka 消息只生成一个元组。

正如您在 this nice page from the manual 上看到的那样,您可以像这样设置 topology.max.spout.pending:

Config conf = new Config();
conf.setMaxSpoutPending(5000);
StormSubmitter.submitTopology("mytopology", conf, topology);

topology.max.spout.pending 是为每个 spout 设置的,如果你有四个 spout,你的拓扑中的不完整元组的最大值等于 spout 的数量 * 拓扑。最大 spout.pending.

另一个提示,您应该使用 Storm UI 查看 topology.max.spout.pending 是否设置正确。


记住 topology.max.spout.pending 只是拓扑内部未处理的元组数量,拓扑永远不会停止消费来自 kafka 的消息,至少在生产系统上是这样......如果你想消费 4096 的批处理,你需要在你的 bolt 上实现缓存逻辑,或者使用 storm 以外的东西(面向微批处理的东西)。

关于java - 如何设置 TOPOLOGY_MAX_SPOUT_PENDING 参数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32812448/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com