gpt4 book ai didi

stream - 创建一个每 X 秒发出一次元组的 Apache Storm spout

转载 作者:行者123 更新时间:2023-12-01 02:13:59 25 4
gpt4 key购买 nike

我有一个从 MQTT 代理接收数据的拓扑,我想要一个 spout 的行为如下:

  • 每 x 秒发出一批元组(或单个元组中的字符串列表)。我如何实现这一目标?我读了一些关于 Storm 三叉戟的文章,但它是 IBatchSpout似乎不允许我以特定的时间间隔批量发出元组。
  • 如果没有新数据进来,Spout 应该怎么做?它不能阻塞线程,因为它是 Storm 的主线程,对吧?
  • 最佳答案

    您可以实现自己的 MQTT 喷口。举个例子,看看 MongoSpout .

    重要的部分是 nextTuple 方法。

    When this method is called, Storm is requesting that the Spout emit tuples to the output collector. This method should be non-blocking, so if the Spout has no tuples to emit, this method should return. nextTuple, ack, and fail are all called in a tight loop in a single thread in the spout task. When there are no tuples to emit, it is courteous to have nextTuple sleep for a short amount of time (like a single millisecond) so as not to waste too much CPU.



    您不能立即等待指定的时间,但您可以执行 nextTuple这样它只偶尔发出一个元组。
    private static final EMISSION_PERIOD = 2000; // 2 seconds
    private long lastEmission;

    @Override
    public void nextTuple() {
    if (lastEmission == null ||
    lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
    List<Object> tuple = pollMQTT();
    if (tuple != null) {
    this.collector.emit(tuple);
    return;
    }
    }
    Utils.sleep(50);
    }

    请注意,我找到了一个开源 MQTT spout .它看起来没有做好生产准备,但您可以将其用作起点。

    关于stream - 创建一个每 X 秒发出一次元组的 Apache Storm spout,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26595148/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com