gpt4 book ai didi

apache-nifi - NiFi - 如何在 ExecuteStreamCommand 中引用 flowFile?

转载 作者:行者123 更新时间:2023-12-03 07:44:29 24 4
gpt4 key购买 nike

我需要执行类似的操作:sed '1d' simple.tsv > noHeader.tsv

这将从我的大流文件(> 1 GB)中删除第一行。

问题是 - 我需要在我的流程文件上执行它,所以它是:

sed '1d' myFlowFile > myFlowFile

问题是:我应该如何配置 ExecuteStreamCommand 处理器,以便它在我的流文件上运行命令并将其返回到我的流文件?如果 sed 不是最佳选择,我可以考虑采用其他方式(例如 tail)

ExecuteStreamCommand processor

谢谢,米哈尔

编辑2(解决方案):

下面是最终的 ExecuteStreamCommand 配置,它可以满足我的需要(从流文件中删除第一行)。@Andy - 非常感谢所有宝贵的提示。 ExecuteStreamCommand - remove 1st line from the flow

最佳答案

迈克尔,

我想确保我正确理解您的问题,因为我认为有更好的解决方案。

问题:

您已将 1GB TSV 加载到 NiFi 中,并且想要删除第一行。

解决方案:

如果您的文件较小,最好的解决方案是使用 ReplaceText 具有以下处理器属性的处理器:

  • 搜索值:^.*\n
  • 重置值(value): <- 空字符串

这将删除第一行,而无需将 NiFi 中的 1GB 内容发送到命令行,然后重新获取结果。不幸的是,要使用正则表达式,您需要设置一个最大缓冲区大小,这意味着需要将整个内容读入堆内存才能执行此操作。

对于 1GB 文件,如果您知道第一行的确切值,您应该尝试 ModifyBytes 它允许您从流文件内容的开头和/或结尾修剪字节计数。然后,您可以简单地指示处理器删除内容的前 n 字节。由于 NiFi 的写时复制内容存储库,您仍将拥有约 2GB 的数据,但它使用 8192B 缓冲区大小以流式传输方式进行。

我最好的建议是使用 ExecuteScript 处理器。该处理器允许您用各种语言(Groovy、Python、Ruby、Lua、JS)编写自定义代码并使其在流文件上执行。使用如下所示的 Groovy 脚本,您可以删除第一行并以流式传输方式复制其余部分,这样堆就不会受到不必要的负担。

我使用 1MB 文件进行了测试,每个流程文件大约需要 1.06 秒(MacBook Pro 2015、16 GB RAM、OS X 10.11.6)。在更好的机器上,您显然会获得更好的吞吐量,并且可以将其扩展到更大的文件。

def flowfile = session.get()
if (!flowfile) return

try {
// Here we are reading from the current flowfile content and writing to the new content
flowfile = session.write(flowfile, { inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))

// Ignoring the first line
def ignoredFirstLine = bufferedReader.readLine()

def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
def line

int i = 0

// While the incoming line is not empty, write it to the outputStream
while ((line = bufferedReader.readLine()) != null) {
bufferedWriter.write(line)
bufferedWriter.newLine()
i++
}

// By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
log.warn("Wrote ${i} lines to output")

bufferedReader.close()
bufferedWriter.close()
} as StreamCallback)

session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
log.error(e)
session.transfer(flowfile, REL_FAILURE)
}

顺便说一句,一般来说,NiFi 的一个好做法是在可能的情况下将巨大的文本文件拆分为较小的组件流文件(使用类似 SplitText )以获得并行处理的好处。如果 1GB 输入是视频,则这不适用,但正如您提到的 TSV,我认为很可能将初始流文件分割成更小的部分并并行操作它们(或者甚至发送到集群中的其他节点负载平衡)可能有助于您的性能。

编辑:

我意识到我没有回答你原来的问题——如何将流文件的内容放入 ExecuteStreamCommand处理器命令行调用。如果您想对某个属性的值进行操作,可以使用 Expression Language 引用该属性值。语法${attribute_name}参数字段中。但是,由于内容无法从 EL 引用,并且您不想通过将 1GB 内容移动到属性中来破坏堆,因此最好的解决方案是使用 PutFile 将内容写入文件。 ,运行sed针对提供的文件名命令并将其写入另一个文件,然后使用 GetFile将这些内容读回 NiFi 中的流文件中。

编辑2:

这是一个template它演示了使用 ExecuteStreamCommand两者都有 revsed针对流文件内容并将输出放入新流文件的内容中。您可以运行流程并监控logs/nifi-app.log查看输出或使用数据来源查询来检查每个处理器执行的修改。

ExecuteStreamCommand Example

ExecuteStreamCommand Configuration

关于apache-nifi - NiFi - 如何在 ExecuteStreamCommand 中引用 flowFile?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42443101/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com