gpt4 book ai didi

hadoop - Apache Nifi MergeContent 输出数据不一致?

转载 作者:可可西里 更新时间:2023-11-01 15:01:50 25 4
gpt4 key购买 nike

刚开始使用 nifi。需要设计方面的帮助。我正在尝试在 HDFS 目录中使用虚拟 csv 文件(目前)创建一个简单的流,并将一些文本数据添加到每个流文件中的每条记录。

传入文件:

dummy1.csv
dummy2.csv
dummy3.csv

内容:

"Eldon Base for stackable storage shelf, platinum",Muhammed MacIntyre,3,-213.25,38.94,35,Nunavut,Storage & Organization,0.8
"1.7 Cubic Foot Compact ""Cube"" Office Refrigerators",BarryFrench,293,457.81,208.16,68.02,Nunavut,Appliances,0.58
"Cardinal Slant-D Ring Binder, Heavy Gauge Vinyl",Barry French,293,46.71,8.69,2.99,Nunavut,Binders and Binder Accessories,0.39
...

期望的输出:

d17a3259-0718-4c7b-bee8-924266aebcc7,Mon Jun 04 16:36:56 EDT 2018,Fellowes Recycled Storage Drawers,Allen Rosenblatt,11137,395.12,111.03,8.64,Northwest Territories,Storage & Organization,0.78
25f17667-9216-4f1d-b69c-23403cd13464,Mon Jun 04 16:36:56 EDT 2018,Satellite Sectional Post Binders,Barry Weirich,11202,79.59,43.41,2.99,Northwest Territories,Binders and Binder Accessories,0.39
ce0b569f-5d93-4a54-b55e-09c18705f973,Mon Jun 04 16:36:56 EDT 2018,Deflect-o DuraMat Antistatic Studded Beveled Mat for Medium Pile Carpeting,Doug Bickford,11456,399.37,105.34,24.49,Northwest Territories,Office Furnishings,0.61

流程 enter image description here拆分文本- enter image description here替换文本- enter image description hereMergeContent-

enter image description here enter image description here

(这可能是实现我想要达到的目标的一种糟糕方法,但我在某处看到 uuid 在生成唯一 session ID 时是最好的选择。所以想到将传入数据中的每一行提取到流文件并生成uuid)

但不知何故,如您所见,数据顺序困惑了。前 3 行在输出中不相同。但是,我正在使用的测试数据(50000 个条目)似乎在其他行中有数据。多次测试通常显示数据顺序在第 2001 行之后发生变化。

是的,我确实在这里搜索了类似的问题,并尝试在合并中使用碎片整理方法,但它没有用。如果有人能解释这里发生了什么,我将不胜感激,我如何才能以相同的方式为每条记录使用唯一的 session_id、时间戳来获取数据。是否需要更改或修改某些参数以获得正确的输出?如果还有更好的方法,我愿意接受建议。

最佳答案

首先感谢您如此详尽详尽的回复。我想你消除了我对处理器如何工作的很多疑虑!

The ordering of the merge is only guaranteed in defragment mode because it will put the flow files in order according to their fragment index. I'm not sure why that wouldn't be working, but if you could create a template of a flow with sample data that showed the problem it would be helpful to debug.

我将尝试使用干净的模板再次复制此方法。可能是一些参数问题和 HDFS 编写器无法写入。

I'm not sure if the intent of your flow is to just re-merge the original CSV that was split, or to merge together several different CSVs. Defragment mode will only re-merge the original CSV, so if ListHDFS picked up 10 CSVs, after splitting and re-merging, you should again have 10 CSVs.

是的,这正是我所需要的。将数据拆分并加入到相应的文件中。我还没有明确地(还)需要再次加入输出。

The approach of splitting a CSV down to 1 line per flow file to manipulate each line is a common approach, however it won't perform very well if you have many large CSV files. A more efficient approach would be to try and manipulate the data in place without splitting. This can generally be done with the record-oriented processors.

  1. 我纯粹出于本能地使用了这种方法,并没有意识到这是一种常用方法。有时数据文件可能非常大,这意味着单个文件中有超过一百万条记录。这不会是集群中 i/o 的问题吗?因为这意味着每个记录=一个流文件=一个唯一的 uuid。 nifi 可以处理的流文件数量是多少? (我知道这取决于集群配置,并将尝试从 hdp 管理员那里获取有关集群的更多信息)
  2. “尝试在不拆分的情况下就地操作数据”有什么建议?你能举一个例子或模板或处理器来使用吗?

In this case you would need to define a schema for your CSV which included all the columns in your data, plus the session id and timestamp. Then using an UpdateRecord processor you would use record path expressions like /session_id = ${UUID()} and /timestamp = ${now()}. This would stream the content line by line and update each record and write it back out, keeping it all as one flow file.

这看起来很有希望。你能分享一个简单的模板,从 hdfs 中拉取文件>processing>write hdfs files but without splitting吗?

由于限制,我不愿意分享模板。但是让我看看我是否可以创建一个通用模板,我会分享

谢谢你的智慧! :)

关于hadoop - Apache Nifi MergeContent 输出数据不一致?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50702622/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com