gpt4 book ai didi

java - 使用 Spark Dataset API 读取多行 json

转载 作者:行者123 更新时间:2023-12-02 11:27:28 25 4
gpt4 key购买 nike

我想使用 join() 方法对两个数据集执行连接。但我无法理解需要如何指定条件或连接列名称。

public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.master("spark://10.127.153.198:7077")
.getOrCreate();

List<String> list = Arrays.asList("partyId");

Dataset<Row> df = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\alert.json");
Dataset<Row> df2 = spark.read().text("C:\\Users\\phyadavi\\LearningAndDevelopment\\Spark-Demo\\data1\\contract.json");

df.join(df2,JavaConversions.asScalaBuffer(list)).show();


// df.join(df2, "partyId").show();

}

当我执行上述代码时,出现此错误

Exception in thread "main" org.apache.spark.sql.AnalysisException: USING column `partyId` cannot be resolved on the left side of the join. The left-side columns: [value];
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90$$anonfun$apply$56.apply(Analyzer.scala:1977)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1976)
at org.apache.spark.sql.catalyst.analysis.Analyzer$$anonfun$90.apply(Analyzer.scala:1975)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$commonNaturalJoinProcessing(Analyzer.scala:1975)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1961)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$$anonfun$apply$31.applyOrElse(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1958)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveNaturalAndUsingJoin$.apply(Analyzer.scala:1957)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
at scala.collection.immutable.List.foldLeft(List.scala:84)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
at org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
at scala.collection.immutable.List.foreach(List.scala:381)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:62)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:50)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:63)
at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2822)
at org.apache.spark.sql.Dataset.join(Dataset.scala:775)
at org.apache.spark.sql.Dataset.join(Dataset.scala:748)
at com.cisco.cdx.batch.JsonDataReader.main(JsonDataReader.java:27)

两个 JSON 都有“partyId”列。请帮忙。

数据:

两个 JSON 都有“partyId”列。但是,当我加入两个数据集时,spark 无法找到该列。我在这里缺少什么吗?

Alerts.json

{
"sourcePartyId": "SmartAccount_700001",
"sourceSubPartyId": "",
"partyId": "700001",
"managedndjn": "BIZ_KEY_999001",
"neAlert": {
"data1": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
}],
"daa2": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
}],
"data3": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
"ndjn": "999001",
}],
"advisory": [{
"sni": "c1f44bb6-e429-11e7-9afc-64609ee945d1",
"ndjn": "999001",
}]
}
}

Contracts.json

{

"sourceSubPartyId": "",
"partyId": "700001",
"neContract": {
"serialNumber": "FCH2013V245",
"productId": "FS4000-K9",
"coverageInfo": [
{
"billToCity": "Delhi",
"billToCountry": "India",
"billToPostalCode": "260001",
"billToProvince": "",
"slaCode": "1234",
}
]
}
}

但是,当我以下面的方式阅读时,我可以打印数据。

JavaRDD<Tuple2<String, String>> javaRDD = spark.sparkContext().wholeTextFiles("C:\\\\Users\\\\phyadavi\\\\LearningAndDevelopment\\\\Spark-Demo\\\\data1\\\\alert.json", 1).toJavaRDD();
List<Tuple2<String, String>> collect = javaRDD.collect();
collect.forEach(x -> {
System.out.println(x._1);
System.out.println(x._2);
});

最佳答案

问题是您正在尝试使用 spark.read().text() 读取文本文件

如果您想将 json 文件直接读取到数据帧,则需要使用

spark.read().json()

如果数据是多行的,那么您需要添加选项

spark.read.option("multiline", "true").json()

这就是您无法访问 join 中的列的原因

另一种方法是读取文本文件并将其转换为 JSON

val jsonRDD = sc.wholeTextFiles("path to json").map(x => x._2)

spark.sqlContext.read.json(jsonRDD)
.show(false)

关于java - 使用 Spark Dataset API 读取多行 json,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49509580/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com