gpt4 book ai didi

java - 如何停止 Storm 中的元组处理并执行其他代码

转载 作者:行者123 更新时间:2023-11-30 07:11:51 25 4
gpt4 key购买 nike

我是 Storm 的新手。我正在将它用于大学项目。

我创建了拓扑,其中有一个链接到 MySql 数据库的 Spout 和两个 Bolt。第一个 Bolt 链接到 spout,准备并删除元组不需要的信息;第二个,对元组进行过滤。

我正在本地模式下工作。

我的问题是:为什么运行拓扑后,在我的控制台中我会看到如下所示的输出?

38211 [Thread-14-movie-SPOUT] INFO  backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __tick, id: {}, [30]
67846 [Thread-10-__acker] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67846 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67852 [Thread-10-__acker] INFO backtype.storm.daemon.task - Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@3c270095> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=0, write_pos=1, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {}]>]]
67853 [Thread-8-cleaning-genre-bolt] INFO backtype.storm.daemon.task - Emitting: cleaning-genre-bolt __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@38c3d111> [#<DataPoint [__emit-count = {default=1680}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1621, write_pos=1622, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {default=1680}]> #<DataPoint [__execute-latency = {movie-SPOUT:default=0.15476190476190477}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=1680, write_pos=1680, capacity=1024, population=0}]> #<DataPoint [__execute-count = {movie-SPOUT:default=1680}]>]]
67854 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.executor - Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
67855 [Thread-13-filtering-genre-BOLT] INFO backtype.storm.daemon.task - Emitting: filtering-genre-BOLT __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@6d5c75a9> [#<DataPoint [__emit-count = {}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__receive = {read_pos=1681, write_pos=1682, capacity=1024, population=1}]> #<DataPoint [__ack-count = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {cleaning-genre-bolt:default=0.08333333333333333}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__sendqueue = {read_pos=-1, write_pos=-1, capacity=1024, population=0}]> #<DataPoint [__execute-count = {cleaning-genre-bolt:default=1680}]>]]

我读到处理最后一个元组后的这些行将被视为正常。不是吗?

提交拓扑后如何运行其他代码?例如,我想打印在第二个 Bolt 中完成的过滤结果,并将其保存在 HashMap 中。如果我将代码放在包含 SubmitTopology() 方法的行之后,则代码会在元组完成之前运行。

第二个也是最后一个问题是:为什么在 Storm 的每个示例中,我都在 Spout 中看到

"Thread.sleep(1000)"?

也许这与我的第一个问题有关。

我希望我的问题很清楚。预先感谢您!

最佳答案

I read that these lines after the last tuple processed are to be considered normal. Isn't it?

这些只是INFO消息。所以无需担心他们。

If I put my code after the line containing the submitTopology() method, the code is ran before the completion of the tuples.

如果您提交拓扑,拓扑将在后台执行(即多线程)。这是必需的,因为您的拓扑“永远”运行(直到您显式停止它,或者您的 Java 应用程序终止,因为您正在运行本地模式)。

“在拓扑完成后”运行代码与 Storm 概念不符,因为 Strom 是一个流系统,并且“处理没有终点”(输入流无限,因此处理永远运行)。如果您想处理有限的数据集,您可能需要考虑像 Flink 或 Spark 这样的批处理框架。

因此,如果您想在 Storm 中实现此功能,您需要能够确定所有数据何时得到处理。因此,在提交拓扑后,您会在所有数据处理完毕后显式阻塞并等待。

但是,对于您的用例,为什么不直接打印最后一个 bolt 内的结果?

关于Thread.sleep() 我不确定你指的是哪个例子。不知道为什么有人要把它投入生产。也许它是为了演示目的而人为地减慢处理速度。

关于java - 如何停止 Storm 中的元组处理并执行其他代码,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39098667/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com