gpt4 book ai didi

apache-nifi - 在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟

转载 作者:行者123 更新时间:2023-12-04 12:12:33 28 4
gpt4 key购买 nike

在 NiFi 中,存在从 MQTT(ConsumeMQTT)消费并发布到 HDFS 路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入 60 分钟的延迟。发现 ControlRate 和 MergeContent 处理器是可能的解决方案但不确定。

What is the ideal solution to introduce time delay?



示例:上午 9:00 使用的流文件应在上午 10:00 发布到 HDFS

enter image description here

最佳答案

您可以使用 ExecuteScript处理器运行 sleep(60*60*1000)循环,但这会不必要地使用系统资源。

相反,我会介绍一个 RouteOnAttribute输出关系为one_hour_elapsed的处理器去PutHDFS , 和 unmatched循环回到自身。 RouteOnAttribute处理器应该有 路由策略设置为 Route to Property Name 和一个名为 的动态属性(单击“属性”选项卡右上角的 + 按钮) one_hour_elapsed .表达式语言值应为 ${now():toNumber():gt(${entryDate:toNumber():plus(3600000)})} .

这个表达:

  • 获取当前时间并将其转换为自纪元以来的毫秒数 ( now():toNumber() )
  • 获取 entryDate流文件的属性(当它进入 NiFi 时)并将其转换为毫秒并增加一小时( entryDate:toNumber():plus(3600000) [ 3600000 == 60*60*1000 ])
  • 比较两个数字 ( a:gt(${b}) )

  • 如果这实际上不是您流程的开始,您可以使用 UpdateAttribute处理器在流程的任何点插入任意时间戳并从那里计算。

    我还建议设置 产量持续时间运行计划 RouteOnAttribute处理器比平常高得多,因为您不希望该处理器持续运行,因为它不会工作。我建议将其设置为 1 或 5 分钟开始,因为您已经引入了一个小时的延迟。

    Flow showing timing loop

    关于apache-nifi - 在将流文件移动到 NiFi 中的下一个处理器之前引入时间延迟,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61875809/

    28 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com