gpt4 book ai didi

apache-nifi - 使用 NiFi Flowfiles 作为事件通知器

转载 作者:行者123 更新时间:2023-12-01 07:34:38 24 4
gpt4 key购买 nike

NiFi 新手!

我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示某种类型的事件已经开始。

在 NiFi 中,我还有其他方法可以指示一组事件已经开始和结束吗?例如,如果我有三个读取数据的处理器,并且我想知道第一个处理器即将被触发并且最后一个处理器已经完成。无论如何我可以这样做吗?如果处理器继续运行,我希望能够一次性将从处理器 1 读取的数据分组到处理器 3。为了更清楚地说明这一点

Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...

任何帮助,将不胜感激,
提前致谢!

最佳答案

我将把这个答案分成几个部分,因为这里发生了很多事情。

I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.



GenerateFlowFile 处理器允许您以常规运行计划或使用 CRON 计划发送空(或填充)流文件。您可以将其与 UpdateAttribute 结合使用处理器向流文件添加任意静态或动态属性。

In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this?



这接近于批处理,Apache NiFi 并非针对批处理进行设计或优化。确定一个源处理器是否“即将被触发”是非常困难的。如果该处理器是基于计时器/CRON 触发的,您可以知道该时间,但如果您的意思是“ GetFile 即将成功检索文件”,那么这并不容易。可以使用您自己的自定义处理器扩展处理器并覆盖 onTrigger() DistributedMapCacheClientService 中存储一些值的方法另一个处理器可以接听。或者我猜你可以将逻辑包装在 ExecuteScript 中。处理器并编写自定义通知代码。我不确定这里的目标——谁会收到有关此状态更改的通知?它是另一个处理器、人类观察者还是外部服务?

If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear

Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2
Processor3 End ...



但是,我相信使用新的 Wait 可以实现您的要求。和 Notify 处理器。 Koji Kawamura 写了一篇很好的文章来描述它们的使用 here .

我认为在这种情况下,您需要特殊的内容或属性才能检测到通过系统的批处理,除非它一次是一个数据单元。我将尝试在下面描述两种情况,但对此我没有太多背景信息。

场景 1(单个数据单元)

随意替换不同的源处理器,但我使用的是 GetFile为了简单起见。

假设您有一个充满文本文件的目录(由某个外部进程放置在那里)。每个文件都包含格式为“Firstname Lastname”的文本,并命名为 Lastname_YYYY-MM-DD-HH-mm-ss.txt使用它写入的时间戳填充文件名。
GetFile -> ReplaceText -> PutFile
GetFile处理器会将每个文件作为单独的流文件引入。从那里, ReplaceText可以做一些简单的事情,比如使用正则表达式来切换名称的顺序,以及 PutFile将内容写回文件系统。当 GetFile第一次触发时,它会将 n 个流文件发送到连接/队列到 ReplaceText .如果您希望它以线性方式而不是并行方式等待并执行操作,您可以将成功队列的背压设置为 1 flowfile 以防止前面的处理器( GetFile )运行,直到队列再次为空。

场景2(多个flowfile必须组合在一起并一起操作)

在这里你会想使用 MergeContent将多个流文件收集到一个中。您可以将 bin 阈值设置为 n 个流文件和 MergeContent处理器仅在达到传入流文件的最小数量时才会传输成功流文件。您还可以按属性分类,因此如果您从异构输入源读取数据,您仍然可以基于共同特征关联相关联的数据片段。
Wait 的替代方案& Notify
此外,您可以使用 Notify处理器将触发流文件发送到相应的 Wait处理器将“内容”流文件“释放”到所需的目的地。同样,上面链接的 Koji 的文章通过示例流程和一些屏幕截图详细解释了这一点。

我希望这至少能给你一个方向。如果没有更多上下文,我仍然觉得您正在尝试解决非 NiFi 问题,或者可能可以调整您的数据流模型以更好地支持流式思维。如果您有更多信息,我很乐意扩展答案。

关于apache-nifi - 使用 NiFi Flowfiles 作为事件通知器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44035238/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com