- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
刚开始使用 nifi。需要设计方面的帮助。我正在尝试在 HDFS 目录中使用虚拟 csv 文件(目前)创建一个简单的流,并将一些文本数据添加到每个流文件中的每条记录。
传入文件:
dummy1.csv
dummy2.csv
dummy3.csv
内容:
"Eldon Base for stackable storage shelf, platinum",Muhammed MacIntyre,3,-213.25,38.94,35,Nunavut,Storage & Organization,0.8
"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",BarryFrench,293,457.81,208.16,68.02,Nunavut,Appliances,0.58
"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",Barry French,293,46.71,8.69,2.99,Nunavut,Binders and Binder Accessories,0.39
...
期望的输出:
d17a3259-0718-4c7b-bee8-924266aebcc7,Mon Jun 04 16:36:56 EDT 2018,Fellowes Recycled Storage Drawers,Allen Rosenblatt,11137,395.12,111.03,8.64,Northwest Territories,Storage & Organization,0.78
25f17667-9216-4f1d-b69c-23403cd13464,Mon Jun 04 16:36:56 EDT 2018,Satellite Sectional Post Binders,Barry Weirich,11202,79.59,43.41,2.99,Northwest Territories,Binders and Binder Accessories,0.39
ce0b569f-5d93-4a54-b55e-09c18705f973,Mon Jun 04 16:36:56 EDT 2018,Deflect-o DuraMat Antistatic Studded Beveled Mat for Medium Pile Carpeting,Doug Bickford,11456,399.37,105.34,24.49,Northwest Territories,Office Furnishings,0.61
(这可能是实现我想要达到的目标的一种糟糕方法,但我在某处看到 uuid 在生成唯一 session ID 时是最好的选择。所以想到将传入数据中的每一行提取到流文件并生成uuid)
但不知何故,如您所见,数据顺序困惑了。前 3 行在输出中不相同。但是,我正在使用的测试数据(50000 个条目)似乎在其他行中有数据。多次测试通常显示数据顺序在第 2001 行之后发生变化。
是的,我确实在这里搜索了类似的问题,并尝试在合并中使用碎片整理方法,但它没有用。如果有人能解释这里发生了什么,我将不胜感激,我如何才能以相同的方式为每条记录使用唯一的 session_id、时间戳来获取数据。是否需要更改或修改某些参数以获得正确的输出?如果还有更好的方法,我愿意接受建议。
最佳答案
首先感谢您如此详尽详尽的回复。我想你消除了我对处理器如何工作的很多疑虑!
The ordering of the merge is only guaranteed in defragment mode because it will put the flow files in order according to their fragment index. I'm not sure why that wouldn't be working, but if you could create a template of a flow with sample data that showed the problem it would be helpful to debug.
我将尝试使用干净的模板再次复制此方法。可能是一些参数问题和 HDFS 编写器无法写入。
I'm not sure if the intent of your flow is to just re-merge the original CSV that was split, or to merge together several different CSVs. Defragment mode will only re-merge the original CSV, so if ListHDFS picked up 10 CSVs, after splitting and re-merging, you should again have 10 CSVs.
是的,这正是我所需要的。将数据拆分并加入到相应的文件中。我还没有明确地(还)需要再次加入输出。
The approach of splitting a CSV down to 1 line per flow file to manipulate each line is a common approach, however it won't perform very well if you have many large CSV files. A more efficient approach would be to try and manipulate the data in place without splitting. This can generally be done with the record-oriented processors.
In this case you would need to define a schema for your CSV which included all the columns in your data, plus the session id and timestamp. Then using an UpdateRecord processor you would use record path expressions like /session_id = ${UUID()} and /timestamp = ${now()}. This would stream the content line by line and update each record and write it back out, keeping it all as one flow file.
这看起来很有希望。你能分享一个简单的模板,从 hdfs 中拉取文件>processing>write hdfs files but without splitting吗?
由于限制,我不愿意分享模板。但是让我看看我是否可以创建一个通用模板,我会分享
谢谢你的智慧! :)
关于hadoop - Apache Nifi MergeContent 输出数据不一致?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50702622/
我在 docker 中运行 NiFi,所有相关目录都安装为卷。我正在尝试修改 nifi.properties 文件中的一些设置,特别是添加自定义属性文件。但是,当我重新启动 NiFi 时,某些属性会恢
我们有多个团队 nifi 应用程序在同一台 nifi 机器上运行...有什么方法可以记录特定于我的应用程序的日志吗?此外,默认情况下 nifi-app.log 文件很难跟踪问题,公告板仅显示 5 分钟
有了这个功能,现在有两个执行引擎---无状态和有状态,但我不确定它们分别适合哪些场景? 当我想方便地更新一个或多个参数时,使用steteless执行引擎和命令行?如果我需要查看流程状态,在Nifi U
这个问题说明了一切。我怎样才能做以下事情之一: 如何限制在集群范围内为一个处理器运行的并发任务数? 我运行的节点是否有任何唯一的短 ID?我可以使用这些 ID 附加到要加载的数据库表名(请参阅下面的详
我在 HDF 2.1.1 的集群模式下使用 NIFI 1.1.0,并且禁用了数据来源,知道如何启用它吗? 在我的独立版本中它是默认启用的。 最佳答案 您的独立实例和集群之间的主要区别在于您的集群是安全
我正在尝试将一个非常简单的多部分表单发布到 api。我在 apache Nifi 中看不到任何这样做的方法,因为它似乎只有一个表单数据输入。在这里和 Nifi 论坛上似乎有很多关于此的现有问题,但没有
随着流程在开发、测试和生产阶段的进展,我们正在努力找出更新处理器配置的最佳方法。当流部署到特定环境时,我们真的希望避免在处理器中操纵主机、端口等引用。至少在我们的例子中,我们将有不同的主机用于 Ela
我对 Nifi 及其功能以及它的适当用例有疑问。 我读过 Nifi 的真正目标是创建一个允许基于流的处理的空间。在玩弄 Nifi 之后,我也开始意识到它能够以对我有用的方式对数据进行建模/塑造。 Ni
我们有多个(50 多个)nifi 流,它们基本上都做同样的事情:从数据库中提取一些数据,将一些列附加到 parquet 并上传到 hdfs。它们仅在细节上有所不同,例如要运行的 sql 查询或它们在
我一直在尝试 google 和搜索堆栈以寻找答案,但一直找不到。 使用 NiFi,是否可以在之前的作业失败时停止进程? 我们有需要处理的用户数据,但数据是按顺序构造的,因此如果作业失败,我们需要停止运
我正在从事一个大量使用 Apache NiFi v1.10.0 的项目。我厌倦了点击数百个流程组来应用基本相同的小修复。 我最近发现了远程进程组,我想知道是否有办法将 NiFi 实例连接到自身并以这种
我使用的是 Nifi 0.4.1 版本。我正在编写自定义代码以将 CSV 转换为 avro 格式。我已经创建了类文件并能够生成 nar 文件。将 nar 文件放在 lib 目录中并重新启动 nifi
我正在尝试重新启动 NiFi 并出现以下异常。 2016-04-22 09:27:30,672 WARN [main] org.apache.nifi.web.server.JettyServer F
根据我在使用 NiFi 构建一些数据库摄取 PoC 后的理解,整个数据流作为流文件流运行。并且在任何特定时间,执行控制可以同时在一个或多个处理器上。 所以我真的很困惑如何针对任何故障调试复杂的数据流。
我想在我的 Nifi 处理器中引用一个环境变量(一个 linux 环境变量)。我尝试通过直接在处理器属性中引用 ${MY_VARIABLE_NAME} 来使用表达式语言。但这似乎不起作用。这可能吗?如
我是 nifi 的新手,我试图了解(因为它看起来很多基于 GUI)是否有一种方法可以在 Nifi 上自动缩放,以及如何使用 xml Nifi 模板并将其部署到集群。 本质上,我们试图做的是使用 Nif
我正在使用 Apache NiFi 来摄取和预处理一些 CSV 文件,但是在长时间运行时,它总是失败。错误总是一样的: FlowFile Repository failed to update 在日志
我正在为我的数据流开发新的 Nifi 处理器。我在 eclipse 中进行代码更改,创建新的 .nar 文件并将其复制到 Nifi lib 以进行测试。 在 nar 更新中,Nifi 需要重新启动,这
在 NiFi 中,存在从 MQTT(ConsumeMQTT)消费并发布到 HDFS 路径(PutHDFS)的数据流。我需要在将消耗的数据推送到 HDFS 路径之前引入 60 分钟的延迟。发现 Cont
我是 apache NIFI 的新手。我有点想知道保存按钮在哪里。我尝试了我在 youtube 上看到的教程中的示例。我想保存我创建的所有处理器以供将来引用。我没有看到任何保存按钮。以后可以保存我的工
我是一名优秀的程序员,十分优秀!