gpt4 book ai didi

scala - 使用spark解析NiFi数据包

转载 作者:行者123 更新时间:2023-12-04 02:08:03 27 4
gpt4 key购买 nike

我正在使用 Apache NiFi 和 Apache Spark 为大学做一个小项目。我想使用 NiFi 创建一个从 HDFS 读取 TSV 文件的工作流,并使用 Spark Streaming 我可以处理这些文件并将我需要的信息存储在 MySQL 中。我已经在 NiFi 中创建了我的工作流,并且存储部分已经在工作了。问题是我无法解析 NiFi 包,所以我可以使用它们。

文件包含这样的行:

linea1File1 TheReceptionist 653 Entertainment   424 13021   4.34    1305    744 DjdA-5oKYFQ NxTDlnOuybo c-8VuICzXtU

其中每个空格是一个制表符(“\t”)

这是我在 Spark 中使用 Scala 的代码:

 val ssc = new StreamingContext(config, Seconds(10))
val packet = ssc.receiverStream(new NiFiReceiver(conf, StorageLevel.MEMORY_ONLY))
val file = packet.map(dataPacket => new String(dataPacket.getContent, StandardCharsets.UTF_8))

直到这里,我才能在单个字符串中获取我的整个文件(7000 多行)...不幸的是,我无法将该字符串分成多行。我需要按行获取整个文件,这样我就可以在一个对象中解析它,对其应用一些操作并存储我想要的内容

谁能帮我解决这个问题?

最佳答案

每个数据包都将是来自 NiFi 的一个流文件的内容,因此如果 NiFi 从 HDFS 中获取一个包含很多行的 TSV 文件,那么所有这些行都将在一个数据包中。

如果没有看到您的 NiFi 流很难说,但您可能会使用行数为 1 的 SplitText 在 NiFi 开始流式传输之前在 NiFi 中拆分您的 TSV。

关于scala - 使用spark解析NiFi数据包,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41549678/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com