gpt4 book ai didi

hadoop - 使用 java 在 Apache Spark 中进行多行输入

转载 作者:可可西里 更新时间:2023-11-01 14:22:49 25 4
gpt4 key购买 nike

我已经查看了此站点上已经提出的其他类似问题,但没有得到满意的答案。

我是 Apache spark 和 hadoop 的新手。我的问题是我有一个输入文件 (35GB),其中包含对在线购物网站商品的多行评论。文件中给出的信息如下所示:

productId: C58500585F
product: Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.

这是一个审查 block 。有成千上万个这样的 block ,由空行分隔。我从这里需要的是 productId、userId 和分数,所以我过滤了 JavaRDD 以包含我需要的行。所以它看起来像下面这样:

productId: C58500585F
userId: A3NM6WTIAE
score: 2.0

代码:

SparkConf conf = new SparkConf().setAppName("org.spark.program").setMaster("local");
JavaSparkContext context = new JavaSparkContext(conf);

JavaRDD<String> input = context.textFile("path");

JavaRDD<String> requiredLines = input.filter(new Function<String, Boolean>() {
public Boolean call(String s) throws Exception {
if(s.contains("productId") || s.contains("UserId") || s.contains("score") || s.isEmpty() ) {
return false;
}
return true;
}
});

现在,我需要将这三行作为我不知道如何读取的一对(key, value) 的一部分来读取。两个评论 block 之间只有一个空行

我浏览了几个网站,但没有找到解决我的问题的方法。任何人都可以帮我解决这个问题吗?非常感谢!如果您需要更多信息,请告诉我。

最佳答案

继续我之前的评论,textinputformat.record.delimiter 可以在这里使用。如果唯一的分隔符是空行,则该值应设置为 "\n\n"

考虑这个测试数据:

productId: C58500585F
product: Nun Toy
product/price: 5.99
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Bought it for a relative. Was not impressive.

productId: ABCDEDFG
product: Teddy Bear
product/price: 6.50
userId: A3NM6WTIAE
profileName: Heather
helpfulness: 0/1
score: 2.0
time: 1624609
summary: not very much fun
text: Second comment.

productId: 12345689
product: Hot Wheels
product/price: 12.00
userId: JJ
profileName: JJ
helpfulness: 1/1
score: 4.0
time: 1624609
summary: Summarized
text: Some text

然后代码(在 Scala 中)看起来像这样:

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
val conf = new Configuration
conf.set("textinputformat.record.delimiter", "\n\n")
val raw = sc.newAPIHadoopFile("test.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)

val data = raw.map(e => {
val m = e._2.toString
.split("\n")
.map(_.split(":", 2))
.filter(_.size == 2)
.map(e => (e(0), e(1).trim))
.toMap

(m("productId"), m("userId"), m("score").toDouble)
})

输出是:

data.foreach(println)
(C58500585F,A3NM6WTIAE,2.0)
(ABCDEDFG,A3NM6WTIAE,2.0)
(12345689,JJ,4.0)

不确定您想要输出什么,所以我只是将它变成了一个 3 元素元组。此外,如果需要,解析逻辑肯定可以变得更高效,但这应该会给你一些工作空间。

关于hadoop - 使用 java 在 Apache Spark 中进行多行输入,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40037883/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com