gpt4 book ai didi

java - 合并两个在 Apache spark 中具有不同列名的数据集

转载 作者:塔克拉玛干 更新时间:2023-11-03 05:01:02 25 4
gpt4 key购买 nike

我们需要合并两个具有不同列名的数据集,数据集之间没有公共(public)列。

我们尝试了几种方法,两种方法都没有产生结果。请告诉我们如何使用 Apache spark Java 合并两个数据集

输入数据集1

"405-048011-62815", "CRC Industries",

"630-0746","Dixon value",

"4444-444","3M INdustries",

"555-55","Dixon coupling valve"

输入数据集 2

"222-2222-5555", "Tata",

"7777-88886","WestSide",

"22222-22224","Reliance",

"33333-3333","V industries"

预期是

    ----------label1----|------sentence1------|------label2---|------sentence2-----------
| 405-048011-62815 | CRC Industries | 222-2222-5555 | Tata|
| 630-0746 | Dixon value | 7777-88886 | WestSide|
-------------------------------------------------------------------------------------

`

    List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"),
RowFactory.create("630-0746","Dixon value"),
RowFactory.create("4444-444","3M INdustries"),
RowFactory.create("555-55","Dixon coupling valve"));

StructType schema = new StructType(new StructField[] {new StructField("label1", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) });

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema);

List<String> listStrings = new ArrayList<String>();
listStrings.add("405-048011-62815");
listStrings.add("630-0746");

Dataset<Row> matchFound1=sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new)));
matchFound1.show();
listStrings.clear();
listStrings.add("222-2222-5555");
listStrings.add("7777-88886");

List<Row> data2 = Arrays.asList(
RowFactory.create("222-2222-5555", "Tata"),
RowFactory.create("7777-88886","WestSide"),
RowFactory.create("22222-22224","Reliance"),
RowFactory.create("33333-3333","V industries"));

StructType schema2 = new StructType(new StructField[] {new StructField("label2", DataTypes.StringType, false,Metadata.empty()),
new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) });

Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2);

Dataset<Row> matchFound2=sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new)));
matchFound2.show();

//Approach 1
Dataset<Row> matchFound3=matchFound1.select(matchFound1.col("label1"),matchFound1.col("sentence1"),matchFound2.col("label2"),
matchFound2.col("sentence2"));
System.out.println("After concat");
matchFound3.show();

//Approach 2
Dataset<Row> matchFound4=matchFound1.filter(concat((col("label1")),matchFound1.col("sentence1"),matchFound2.col("label2"),
matchFound2.col("sentence2")));
System.out.println("After concat 2");
matchFound4.show();`

每种方法的错误如下

方法一错误

----------
org.apache.spark.sql.AnalysisException: resolved attribute(s) label2#10,sentence2#11 missing from label1#0,sentence1#1 in operator !Project [label1#0, sentence1#1, label2#10, sentence2#11];;
!Project [label1#0, sentence1#1, label2#10, sentence2#11]
+- Filter label1#0 IN (405-048011-62815,630-0746)
+- LocalRelation [label1#0, sentence1#1]


----------
Error for each of the approaches are as follows
Approach 2 error
org.apache.spark.sql.AnalysisException: filter expression 'concat(`label1`, `sentence1`, `label2`, `sentence2`)' of type string is not a boolean.;;
!Filter concat(label1#0, sentence1#1, label2#10, sentence2#11)
+- Filter label1#0 IN (405-048011-62815,630-0746)
+- LocalRelation [label1#0, sentence1#1]

最佳答案

希望这对你有用

DF

val pre: Array[String] = Array("CRC Industries", "Dixon value" ,"3M INdustries" ,"Dixon coupling valve")
val rea: Array[String] = Array("405048011-62815", "630-0746", "4444-444", "555-55")
val df1 = sc.parallelize( rea zip pre).toDF("label1","sentence1")

val preasons2: Array[String] = Array("Tata", "WestSide","Reliance", "V industries")
val reasonsI2: Array[String] = Array( "222-2222-5555", "7777-88886", "22222-22224", "33333-3333")
val df2 = sc.parallelize( reasonsI2 zip preasons2 ).toDF("label2","sentence2")

字符串索引器

导入 org.apache.spark.ml.feature.StringIndexer

val indexer = new StringIndexer()
.setInputCol("label1")
.setOutputCol("label1Index")

val indexed = indexer.fit(df1).transform(df1)
indexed.show()

val indexer1 = new StringIndexer()
.setInputCol("label2")
.setOutputCol("label2Index")

val indexed1 = indexer1.fit(df2).transform(df2)
indexed1.show()

加入

    val rnd_reslt12 = indexed.join(indexed1 , indexed.col("label1Index")===indexed1.col("label2Index")).drop(indexed.col("label1Index")).drop(indexed1.col("label2Index"))
rnd_reslt12.show()

+---------------+--------------------+-------------+------------+
| label1| sentence1| label2| sentence2|
+---------------+--------------------+-------------+------------+
| 630-0746| Dixon value|222-2222-5555| Tata|
| 4444-444| 3M INdustries| 22222-22224| Reliance|
| 555-55|Dixon coupling valve| 33333-3333|V industries|
|405048011-62815| CRC Industries| 7777-88886| WestSide|
+---------------+--------------------+-------------+------------+

关于java - 合并两个在 Apache spark 中具有不同列名的数据集,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43753761/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com