- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要执行类似的操作:sed '1d' simple.tsv > noHeader.tsv
这将从我的大流文件(> 1 GB)中删除第一行。
问题是 - 我需要在我的流程文件上执行它,所以它是:
sed '1d' myFlowFile > myFlowFile
问题是:我应该如何配置 ExecuteStreamCommand 处理器,以便它在我的流文件上运行命令并将其返回到我的流文件?如果 sed 不是最佳选择,我可以考虑采用其他方式(例如 tail)
谢谢,米哈尔
编辑2(解决方案):
下面是最终的 ExecuteStreamCommand 配置,它可以满足我的需要(从流文件中删除第一行)。@Andy - 非常感谢所有宝贵的提示。
最佳答案
迈克尔,
我想确保我正确理解您的问题,因为我认为有更好的解决方案。
问题:
您已将 1GB TSV 加载到 NiFi 中,并且想要删除第一行。
解决方案:
如果您的文件较小,最好的解决方案是使用 ReplaceText
具有以下处理器属性的处理器:
^.*\n
<- 空字符串这将删除第一行,而无需将 NiFi 中的 1GB 内容发送到命令行,然后重新获取结果。不幸的是,要使用正则表达式,您需要设置一个最大缓冲区大小,这意味着需要将整个内容读入堆内存才能执行此操作。
对于 1GB 文件,如果您知道第一行的确切值,您应该尝试 ModifyBytes
它允许您从流文件内容的开头和/或结尾修剪字节计数。然后,您可以简单地指示处理器删除内容的前 n 字节。由于 NiFi 的写时复制内容存储库,您仍将拥有约 2GB 的数据,但它使用 8192B 缓冲区大小以流式传输方式进行。
我最好的建议是使用 ExecuteScript
处理器。该处理器允许您用各种语言(Groovy、Python、Ruby、Lua、JS)编写自定义代码并使其在流文件上执行。使用如下所示的 Groovy 脚本,您可以删除第一行并以流式传输方式复制其余部分,这样堆就不会受到不必要的负担。
我使用 1MB 文件进行了测试,每个流程文件大约需要 1.06 秒(MacBook Pro 2015、16 GB RAM、OS X 10.11.6)。在更好的机器上,您显然会获得更好的吞吐量,并且可以将其扩展到更大的文件。
def flowfile = session.get()
if (!flowfile) return
try {
// Here we are reading from the current flowfile content and writing to the new content
flowfile = session.write(flowfile, { inputStream, outputStream ->
def bufferedReader = new BufferedReader(new InputStreamReader(inputStream))
// Ignoring the first line
def ignoredFirstLine = bufferedReader.readLine()
def bufferedWriter = new BufferedWriter(new OutputStreamWriter(outputStream))
def line
int i = 0
// While the incoming line is not empty, write it to the outputStream
while ((line = bufferedReader.readLine()) != null) {
bufferedWriter.write(line)
bufferedWriter.newLine()
i++
}
// By default, INFO doesn't show in the logs and WARN will appear in the processor bulletins
log.warn("Wrote ${i} lines to output")
bufferedReader.close()
bufferedWriter.close()
} as StreamCallback)
session.transfer(flowfile, REL_SUCCESS)
} catch (Exception e) {
log.error(e)
session.transfer(flowfile, REL_FAILURE)
}
顺便说一句,一般来说,NiFi 的一个好做法是在可能的情况下将巨大的文本文件拆分为较小的组件流文件(使用类似 SplitText
)以获得并行处理的好处。如果 1GB 输入是视频,则这不适用,但正如您提到的 TSV,我认为很可能将初始流文件分割成更小的部分并并行操作它们(或者甚至发送到集群中的其他节点负载平衡)可能有助于您的性能。
编辑:
我意识到我没有回答你原来的问题——如何将流文件的内容放入 ExecuteStreamCommand
处理器命令行调用。如果您想对某个属性的值进行操作,可以使用 Expression Language 引用该属性值。语法${attribute_name}
在参数字段中。但是,由于内容无法从 EL 引用,并且您不想通过将 1GB 内容移动到属性中来破坏堆,因此最好的解决方案是使用 PutFile
将内容写入文件。 ,运行sed
针对提供的文件名命令并将其写入另一个文件,然后使用 GetFile
将这些内容读回 NiFi 中的流文件中。
编辑2:
这是一个template它演示了使用 ExecuteStreamCommand
两者都有 rev
和sed
针对流文件内容并将输出放入新流文件的内容中。您可以运行流程并监控logs/nifi-app.log
查看输出或使用数据来源查询来检查每个处理器执行的修改。
关于apache-nifi - NiFi - 如何在 ExecuteStreamCommand 中引用 flowFile?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42443101/
我在 docker 中运行 NiFi,所有相关目录都安装为卷。我正在尝试修改 nifi.properties 文件中的一些设置,特别是添加自定义属性文件。但是,当我重新启动 NiFi 时,某些属性会恢
我们有多个团队 nifi 应用程序在同一台 nifi 机器上运行...有什么方法可以记录特定于我的应用程序的日志吗?此外,默认情况下 nifi-app.log 文件很难跟踪问题,公告板仅显示 5 分钟
有了这个功能,现在有两个执行引擎---无状态和有状态,但我不确定它们分别适合哪些场景? 当我想方便地更新一个或多个参数时,使用steteless执行引擎和命令行?如果我需要查看流程状态,在Nifi U
这个问题说明了一切。我怎样才能做以下事情之一: 如何限制在集群范围内为一个处理器运行的并发任务数? 我运行的节点是否有任何唯一的短 ID?我可以使用这些 ID 附加到要加载的数据库表名(请参阅下面的详
我在 HDF 2.1.1 的集群模式下使用 NIFI 1.1.0,并且禁用了数据来源,知道如何启用它吗? 在我的独立版本中它是默认启用的。 最佳答案 您的独立实例和集群之间的主要区别在于您的集群是安全
我正在尝试将一个非常简单的多部分表单发布到 api。我在 apache Nifi 中看不到任何这样做的方法,因为它似乎只有一个表单数据输入。在这里和 Nifi 论坛上似乎有很多关于此的现有问题,但没有
随着流程在开发、测试和生产阶段的进展,我们正在努力找出更新处理器配置的最佳方法。当流部署到特定环境时,我们真的希望避免在处理器中操纵主机、端口等引用。至少在我们的例子中,我们将有不同的主机用于 Ela
我对 Nifi 及其功能以及它的适当用例有疑问。 我读过 Nifi 的真正目标是创建一个允许基于流的处理的空间。在玩弄 Nifi 之后,我也开始意识到它能够以对我有用的方式对数据进行建模/塑造。 Ni
我们有多个(50 多个)nifi 流,它们基本上都做同样的事情:从数据库中提取一些数据,将一些列附加到 parquet 并上传到 hdfs。它们仅在细节上有所不同,例如要运行的 sql 查询或它们在
我一直在尝试 google 和搜索堆栈以寻找答案,但一直找不到。 使用 NiFi,是否可以在之前的作业失败时停止进程? 我们有需要处理的用户数据,但数据是按顺序构造的,因此如果作业失败,我们需要停止运
我正在从事一个大量使用 Apache NiFi v1.10.0 的项目。我厌倦了点击数百个流程组来应用基本相同的小修复。 我最近发现了远程进程组,我想知道是否有办法将 NiFi 实例连接到自身并以这种
我使用的是 Nifi 0.4.1 版本。我正在编写自定义代码以将 CSV 转换为 avro 格式。我已经创建了类文件并能够生成 nar 文件。将 nar 文件放在 lib 目录中并重新启动 nifi
我正在尝试重新启动 NiFi 并出现以下异常。 2016-04-22 09:27:30,672 WARN [main] org.apache.nifi.web.server.JettyServer F
根据我在使用 NiFi 构建一些数据库摄取 PoC 后的理解,整个数据流作为流文件流运行。并且在任何特定时间,执行控制可以同时在一个或多个处理器上。 所以我真的很困惑如何针对任何故障调试复杂的数据流。
我想在我的 Nifi 处理器中引用一个环境变量(一个 linux 环境变量)。我尝试通过直接在处理器属性中引用 ${MY_VARIABLE_NAME} 来使用表达式语言。但这似乎不起作用。这可能吗?如
我是 nifi 的新手,我试图了解(因为它看起来很多基于 GUI)是否有一种方法可以在 Nifi 上自动缩放,以及如何使用 xml Nifi 模板并将其部署到集群。 本质上,我们试图做的是使用 Nif
我正在使用 Apache NiFi 来摄取和预处理一些 CSV 文件,但是在长时间运行时,它总是失败。错误总是一样的: FlowFile Repository failed to update 在日志
我正在为我的数据流开发新的 Nifi 处理器。我在 eclipse 中进行代码更改,创建新的 .nar 文件并将其复制到 Nifi lib 以进行测试。 在 nar 更新中,Nifi 需要重新启动,这
在 NiFi 中,存在从 MQTT(ConsumeMQTT)消费并发布到 HDFS 路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入 60 分钟的延迟。发现 Cont
我是 apache NIFI 的新手。我有点想知道保存按钮在哪里。我尝试了我在 youtube 上看到的教程中的示例。我想保存我创建的所有处理器以供将来引用。我没有看到任何保存按钮。以后可以保存我的工
我是一名优秀的程序员,十分优秀!