- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
NiFi 新手!
我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示某种类型的事件已经开始。
在 NiFi 中,我还有其他方法可以指示一组事件已经开始和结束吗?例如,如果我有三个读取数据的处理器,并且我想知道第一个处理器即将被触发并且最后一个处理器已经完成。无论如何我可以这样做吗?如果处理器继续运行,我希望能够一次性将从处理器 1 读取的数据分组到处理器 3。为了更清楚地说明这一点
Begin
Processor1
Processor2
Processor3
End
Begin
Processor1
Processor2
Processor3
End
...
最佳答案
我将把这个答案分成几个部分,因为这里发生了很多事情。
I was wondering if there is a way to send an empty flowfile with attributes on the flowfile in NiFi? I'd like to use this as a trigger to indicate that a type of event has Started.
GenerateFlowFile
处理器允许您以常规运行计划或使用 CRON 计划发送空(或填充)流文件。您可以将其与
UpdateAttribute
结合使用处理器向流文件添加任意静态或动态属性。
In NiFi is there any other way for me to indicate that a set of events have started and finished? For instance, if i have three processors that read in data and i would like to know that the first processor is about to be triggered and that the last processor has finished. Is there anyway for me to do this?
GetFile
即将成功检索文件”,那么这并不容易。可以使用您自己的自定义处理器扩展处理器并覆盖
onTrigger()
在
DistributedMapCacheClientService
中存储一些值的方法另一个处理器可以接听。或者我猜你可以将逻辑包装在
ExecuteScript
中。处理器并编写自定义通知代码。我不确定这里的目标——谁会收到有关此状态更改的通知?它是另一个处理器、人类观察者还是外部服务?
If the processors continue to run, i would like to be able to group the data read from processor 1 to processor 3 in one pass. To make this more clear
Begin Processor1 Processor2 Processor3 End Begin Processor1 Processor2
Processor3 End ...
Wait
可以实现您的要求。和
Notify
处理器。 Koji Kawamura 写了一篇很好的文章来描述它们的使用
here .
GetFile
为了简单起见。
Lastname_YYYY-MM-DD-HH-mm-ss.txt
使用它写入的时间戳填充文件名。
GetFile -> ReplaceText -> PutFile
GetFile
处理器会将每个文件作为单独的流文件引入。从那里,
ReplaceText
可以做一些简单的事情,比如使用正则表达式来切换名称的顺序,以及
PutFile
将内容写回文件系统。当
GetFile
第一次触发时,它会将 n 个流文件发送到连接/队列到
ReplaceText
.如果您希望它以线性方式而不是并行方式等待并执行操作,您可以将成功队列的背压设置为
1
flowfile 以防止前面的处理器(
GetFile
)运行,直到队列再次为空。
MergeContent
将多个流文件收集到一个中。您可以将 bin 阈值设置为 n 个流文件和
MergeContent
处理器仅在达到传入流文件的最小数量时才会传输成功流文件。您还可以按属性分类,因此如果您从异构输入源读取数据,您仍然可以基于共同特征关联相关联的数据片段。
Wait
的替代方案&
Notify
Notify
处理器将触发流文件发送到相应的
Wait
处理器将“内容”流文件“释放”到所需的目的地。同样,上面链接的 Koji 的文章通过示例流程和一些屏幕截图详细解释了这一点。
关于apache-nifi - 使用 NiFi Flowfiles 作为事件通知器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44035238/
例如,如果我有一个指定为隔离的 GetFile 处理器,那么来自该处理器的流文件如何分布在集群节点上? 是否需要添加任何额外的工作/处理器? 最佳答案 在今天的 Apache NiFi 中,跨集群的负
我正在使用 Apache NiFi 来摄取和预处理一些 CSV 文件,但是在长时间运行时,它总是失败。错误总是一样的: FlowFile Repository failed to update 在日志
是否可以在 @OnStopped 注释上发送流文件? 基本上,我想编写自定义处理器,它可以在处理器停止时在 flowFile 中发送一个属性。 有什么建议吗? 我在下面尝试: ProcessSessi
我需要从 NiFi 执行 HTTP Post,但我不希望/不需要请求携带所有 FlowFile 的内容。 有没有办法传递 FlowFile 的属性但不完整内容? 最佳答案 如果你的Http Post的
NiFi 新手! 我想知道是否有办法在 NiFi 中发送带有流文件属性的空流文件?我想用它作为触发器来指示某种类型的事件已经开始。 在 NiFi 中,我还有其他方法可以指示一组事件已经开始和结束吗?例
我需要执行类似的操作:sed '1d' simple.tsv > noHeader.tsv 这将从我的大流文件(> 1 GB)中删除第一行。 问题是 - 我需要在我的流程文件上执行它,所以它是: se
我试图将原始流文件作为输入发送到下一个处理器,但最终收到错误,我对 Nifi 很陌生,并且在 Java 方面也有一些经验。 public void onTrigger(ProessorContext
我有一个工作流程,其中两个或多个输入已对其执行集合操作(并集、补集等)以生成单个输出。我希望自己编写一个处理器来执行设置逻辑,但是否有可能同时处理不同来源的多个流文件并同时处理它们? 最佳答案 N
我在 Centos 7 上使用 Apache NiFi 0.4.1 和 Java 7,并尝试根据一些传入数据创建流文件。由于生成的数据量很大,我遇到了“OutOfMemoryError:Java 堆空
我正在尝试做一些相当简单的事情,从传入的 FlowFile 中读取 i9 PDF 表单,将其中的名字和姓氏解析为 JSON,然后将 JSON 输出到传出的 FlowFile。 我没有找到关于如何执行此
nifi 术语和流文件处理的新手。 找到了一种方便的方法来处理传入的 xml 并使用 XmlSlurper 对其进行解析,但是对于传入流文件的以下 GroovyScript 收到警告- 流程: 处理器
有没有办法在 NiFi 的自定义处理器中同时写入不同的流?例如,我有第三方库使用像这样工作的 API 进行重要处理: public void process(InputStream in, Outpu
我有名为 (1,3,4,5 等) 的流文件,我使用这个 ${filename} 属性来调用在线服务,然后我得到了很大的响应并将其逐行拆分,但最后我需要合并我的流文件根据他们的名字,我认为合并内容无法正
我是 NIFI 的新手,想将数据从 Kafka 推送到 S3 存储桶。我正在使用 PutS3Object 处理器,如果我将 Bucket 值硬编码为 mphdf/orderEvent,则可以将数据推送
我正在从以下 JSON/AVRO 架构生成随机数据: { "type" : "record", "namespace" : "test", "name" : "metro_data",
我正在从以下 JSON/AVRO 架构生成随机数据: { "type" : "record", "namespace" : "test", "name" : "metro_data",
我在我的自定义 NiFi 处理器中从远程 API 服务获取不同时间范围的数据。 我有时间范围全局计数器,每次迭代都会更新(我正在使用计时器驱动的调度策略)。 当计数器大于最大值时,我想仅从请求(ses
我试图在我的 python 脚本中获取流文件的属性,我已经完成了以下操作: class TransformCallback(StreamCallback): def __init__(self
这是关于Nifi中MergeContent处理器的问题。目前,我需要一次性将所有流文件与一个特定属性结合起来。 但是发生的事情是因为有太多的流文件具有相同的属性,处理器生成几个与属性合并的不同流文件,
我有一个创建一些新属性/内容的流程。我想从流文件中获取一个属性,并将其添加到文件的 JSON 内容中。 我可以使用AttributesToJSON,但这只会覆盖文件内容。 我的流程示例如下: Upda
我是一名优秀的程序员,十分优秀!